Ax
10/13/2023, 1:30 PMMarvin
10/13/2023, 1:30 PMget
method on the FlowRun
object. However, the prefect.context.get_run_context().flow_run
is not designed to retrieve the results of subflows directly.
You might want to consider the following options:
1. If you're using run_deployment
to run a pre-existing deployment as a subflow of the calling flow, it's similar to calling the flow object itself in a parent flow with something like asyncio.gather
. The subflows will run according to the infrastructure associated with that deployment. When you call the subflow()
object itself, it will run on the calling (parent) flow’s infrastructure as a process. [^1^]
2. If you're encountering errors with subflows after upgrading Prefect, it might be due to using the subflow function instead of run_deployment
. [^2^]
3. You could visualize the return value tracked using viz_return_value_tracked.visualize()
. This might not directly give you the results, but it could help in debugging and understanding the flow of data. [^3^]
I hope this helps. Or at least distracts you from the existential dread of your coding problems.
[^1^]: How can i run multiple subflows or child flows in parallel
[^2^]: How can i run multiple subflows or child flows in parallel
[^3^]: Prefect Concepts: FlowsAx
10/13/2023, 1:32 PMMarvin
10/13/2023, 1:33 PMAx
10/13/2023, 1:35 PMMarvin
10/13/2023, 1:35 PMFailed(message='1/1 states failed.', type=FAILED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type list'))
, indicates that one of your states has failed.
The UnpersistedResult
part suggests that you're trying to retrieve a result that hasn't been persisted (or saved) yet. This could happen if you're trying to access the result of a task that hasn't completed yet, or if the task failed and therefore didn't produce a result.
Here are a couple of things you could try:
1. Make sure all your tasks are completing successfully. You can do this by adding error handling to your tasks and logging any exceptions that occur. This will help you identify if a particular task is causing the problem.
2. Ensure that you're not trying to access the result of a task before it has completed. If you're using Prefect's get
method to retrieve task results, make sure the task has finished executing before you call get
.
3. If you're using Prefect's result handlers to persist task results, make sure they're set up correctly. You might be getting this error if Prefect is unable to save your task results.
I hope this helps. If not, well, at least you can take comfort in the fact that you're not alone. Every programmer has to deal with error messages. It's like a rite of passage. Welcome to the club.Ax
10/13/2023, 1:36 PMMarvin
10/13/2023, 1:36 PMpython
# Initialize an empty list to store subflow names
subflow_names = []
# Define your subflows
@task
def subflow1():
# Do something
pass
@task
def subflow2():
# Do something
pass
# Call your subflows in your parent flow and update the list of subflow names
with Flow('Parent Flow') as flow:
result1 = subflow1()
subflow_names.append('subflow1')
result2 = subflow2()
subflow_names.append('subflow2')
# Now, subflow_names contains the names of the subflows that have been called
print(subflow_names)
This will print:
python
['subflow1', 'subflow2']
I hope this helps. If not, well, at least you can take comfort in the fact that even robots like me have trouble remembering names sometimes.Ax
10/13/2023, 1:38 PMMarvin
10/13/2023, 1:38 PMFailed(message='1/1 states failed.', type=FAILED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type list'))
, indicates that one of your states has failed.
If you want to get the names of the subflows that have failed, you can do so by inspecting the Failed
state of the parent flow. The Failed
state should contain information about which tasks (and by extension, subflows) failed.
Here's an example of how you might do this:
python
from prefect import Flow, Task
class MyTask(Task):
def run(self):
raise Exception("This task failed!")
flow = Flow("My Flow", tasks=[MyTask()])
state = flow.run()
if state.is_failed():
failed_tasks = [task for task, task_state in state.result.items() if task_state.is_failed()]
failed_subflows = [task.name for task in failed_tasks if isinstance(task, Flow)]
print(f"The following subflows failed: {failed_subflows}")
In this example, failed_subflows
will contain the names of any subflows that failed.
I hope this helps. If not, well, at least you can take comfort in the fact that even robots like me have trouble understanding error messages sometimes.