<@ULVA73B9P> in prefect 2.x, i use `run_deployment...
# ask-marvin
d
@Marvin in prefect 2.x, i use
run_deployment
to start and wait for multiple subflows. what happens if one of the subflows ends with
prefect.flow-run.Crashed
?
m
In Prefect 2.x, when you use
run_deployment
to start multiple subflows, the behavior of the parent flow in relation to subflows ending with
prefect.flow-run.Crashed
is influenced by Prefect's handling of flow states. If a subflow crashes, typically, Prefect would treat this as an immediate failure of the subflow. This is because, generally, Prefect interprets a crash as a terminal failure unless there is a configuration allowing retries at the infrastructure level, such as Kubernetes. However, there's an interesting behavior noted specifically with subflows initiated via
run_deployment
. In such cases, even if the subflow crashes and then retries successfully (potentially managed by external infrastructure like Kubernetes), the parent flow might still be marked as failed due to the initial crash. This situation has been identified as an issue where a subflow crash incorrectly leads the parent flow into a failed state, despite the subflow eventually completing successfully. You can find more on this in the GitHub issue here. To manage such scenarios, using state change hooks can be helpful. State change hooks allow you to execute specific logic when the flow enters certain states, including
Crashed
. This can be beneficial for ensuring proper cleanup or handling when a flow or subflow unexpectedly crashes. Details on state change hooks can be found in the Prefect documentation. By utilizing these mechanisms, you can better control the behavior and outcome of your flow runs, even in cases of failures or crashes.
d
@Marvin my code looks like this. one of the subflows Crashed, but the parent flow still shows Running state in prefect cloud. why would that be?
Copy code
subflow_runs = []
    for source_name in source_names:
        subflow_run = run_deployment(
            name=f"my-flow/my_flow",
            parameters={
                "source_name": source_name,
            },
        )
        subflow_runs.append(subflow_run)
    subflow_run_results = await asyncio.gather(*subflow_runs)
    for subflow_run in subflow_run_results:
        if subflow_run.state_type != StateType.COMPLETED:
            raise RuntimeError(
                f"subflow failed: {subflow_run.name}, with state: {subflow_run.state_type}"
            )
m
The behavior you are experiencing, where the parent flow continues to show a "Running" state even though one of the subflows has crashed, might be related to how
asyncio.gather
handles exceptions and the non-blocking nature of
run_deployment
. Here are some relevant points from the Prefect documentation: 1. Non-blocking Behavior:
run_deployment
initiates subflows, and if not awaited properly, this non-blocking execution might lead to instances where the parent flow doesn't immediately recognize the state of subflows. 2. Exception Propagation in asyncio.gather: The
asyncio.gather()
function will raise exceptions only when awaited as a group, rather than after each task completion. If a subflow crashes and raises an exception, it will not stop the parent flow immediately but will do so when you await the
asyncio.gather()
call. 3. State Linkage: When using
run_deployment
, subflows are treated as linked to the parent flow's state by default. This linked state doesn't independently control suspension or cancellations from the parent flow based on subflow outcomes unless explicitly checked and handled. You need to ensure that your parent flow is explicitly checking and raising exceptions based on the return state of the
asyncio.gather
results, which you seem to be doing. However, the continuation might suggest an issue or delay in state propagation from the subflows to the parent flow due to the non-blocking nature. For more information on managing flow states and behavior with subflows, you can review the Prefect documentation on subflows and deployments. If this doesn't help and you continue to see the inconsistent state, you might want to explore examining logs or state updates in the Prefect UI to ensure each subflow's state is accurately reported and handled. This issue might also require looking into any potential asynchronous processing or state update delays specific to your setup.
d
@Marvin what flow states are considered terminal for
await asyncio.gather()
?
m
In Prefect, when using
await asyncio.gather()
with flow runs, you need to check for specific terminal states to determine when a flow has completed its lifecycle. The typical terminal states for flows in Prefect are: 1. Completed: The flow has successfully finished executing all tasks. 2. Failed: The flow encountered an error and could not complete all its tasks. 3. Cancelled: The flow was intentionally stopped before it could complete execution. 4. Crashed: The flow terminated unexpectedly due to an error or failure in the execution environment. When you use
await asyncio.gather()
, it will collect the results of coroutines or tasks you are awaiting. If any of these states are encountered, Prefect will consider them terminal, concluding the execution for those specific runs. It's important to handle each of these states appropriately in your logic. For instance, you might want to raise an exception or take corrective action if any of the subflows are not in the "Completed" state after
asyncio.gather()
collects its results. For further details on handling task and flow states, you can refer to Prefect's documentation on task runners and states.