Dominick Olivito
10/25/2024, 5:50 PMrun_deployment
to start and wait for multiple subflows. what happens if one of the subflows ends with prefect.flow-run.Crashed
?Marvin
10/25/2024, 5:51 PMrun_deployment
to start multiple subflows, the behavior of the parent flow in relation to subflows ending with prefect.flow-run.Crashed
is influenced by Prefect's handling of flow states.
If a subflow crashes, typically, Prefect would treat this as an immediate failure of the subflow. This is because, generally, Prefect interprets a crash as a terminal failure unless there is a configuration allowing retries at the infrastructure level, such as Kubernetes.
However, there's an interesting behavior noted specifically with subflows initiated via run_deployment
. In such cases, even if the subflow crashes and then retries successfully (potentially managed by external infrastructure like Kubernetes), the parent flow might still be marked as failed due to the initial crash. This situation has been identified as an issue where a subflow crash incorrectly leads the parent flow into a failed state, despite the subflow eventually completing successfully. You can find more on this in the GitHub issue here.
To manage such scenarios, using state change hooks can be helpful. State change hooks allow you to execute specific logic when the flow enters certain states, including Crashed
. This can be beneficial for ensuring proper cleanup or handling when a flow or subflow unexpectedly crashes. Details on state change hooks can be found in the Prefect documentation.
By utilizing these mechanisms, you can better control the behavior and outcome of your flow runs, even in cases of failures or crashes.Dominick Olivito
10/25/2024, 5:53 PMsubflow_runs = []
for source_name in source_names:
subflow_run = run_deployment(
name=f"my-flow/my_flow",
parameters={
"source_name": source_name,
},
)
subflow_runs.append(subflow_run)
subflow_run_results = await asyncio.gather(*subflow_runs)
for subflow_run in subflow_run_results:
if subflow_run.state_type != StateType.COMPLETED:
raise RuntimeError(
f"subflow failed: {subflow_run.name}, with state: {subflow_run.state_type}"
)
Marvin
10/25/2024, 5:53 PMasyncio.gather
handles exceptions and the non-blocking nature of run_deployment
.
Here are some relevant points from the Prefect documentation:
1. Non-blocking Behavior: run_deployment
initiates subflows, and if not awaited properly, this non-blocking execution might lead to instances where the parent flow doesn't immediately recognize the state of subflows.
2. Exception Propagation in asyncio.gather: The asyncio.gather()
function will raise exceptions only when awaited as a group, rather than after each task completion. If a subflow crashes and raises an exception, it will not stop the parent flow immediately but will do so when you await the asyncio.gather()
call.
3. State Linkage: When using run_deployment
, subflows are treated as linked to the parent flow's state by default. This linked state doesn't independently control suspension or cancellations from the parent flow based on subflow outcomes unless explicitly checked and handled.
You need to ensure that your parent flow is explicitly checking and raising exceptions based on the return state of the asyncio.gather
results, which you seem to be doing. However, the continuation might suggest an issue or delay in state propagation from the subflows to the parent flow due to the non-blocking nature.
For more information on managing flow states and behavior with subflows, you can review the Prefect documentation on subflows and deployments.
If this doesn't help and you continue to see the inconsistent state, you might want to explore examining logs or state updates in the Prefect UI to ensure each subflow's state is accurately reported and handled. This issue might also require looking into any potential asynchronous processing or state update delays specific to your setup.Dominick Olivito
10/25/2024, 5:56 PMawait asyncio.gather()
?Marvin
10/25/2024, 5:56 PMawait asyncio.gather()
with flow runs, you need to check for specific terminal states to determine when a flow has completed its lifecycle. The typical terminal states for flows in Prefect are:
1. Completed: The flow has successfully finished executing all tasks.
2. Failed: The flow encountered an error and could not complete all its tasks.
3. Cancelled: The flow was intentionally stopped before it could complete execution.
4. Crashed: The flow terminated unexpectedly due to an error or failure in the execution environment.
When you use await asyncio.gather()
, it will collect the results of coroutines or tasks you are awaiting. If any of these states are encountered, Prefect will consider them terminal, concluding the execution for those specific runs.
It's important to handle each of these states appropriately in your logic. For instance, you might want to raise an exception or take corrective action if any of the subflows are not in the "Completed" state after asyncio.gather()
collects its results.
For further details on handling task and flow states, you can refer to Prefect's documentation on task runners and states.