<@ULVA73B9P> Why do some flow transition to a comp...
# ask-marvin
r
@Marvin Why do some flow transition to a complete state while some tasks are crashed or still running?
m
thought for 437.3 seconds
Short answer: in Prefect 3.x a flow finishes when the flow function returns. If you’ve submitted tasks and don’t wait on their PrefectFutures (or wire them into downstream dependencies that are awaited), the flow can reach Completed while some task runs are still Running or even Crashed. Why this happens - Flow state is determined by the flow function’s execution, not by “all tasks must finish.” If the flow returns without awaiting certain tasks, Prefect does not implicitly wait for or cancel those “orphaned” futures. - A task’s failure/crash only propagates to the flow if you resolve its future (e.g., call
future.result()
or otherwise cause the exception to be raised) or if it’s upstream of something you do await. - Prefect does not automatically cancel outstanding tasks when a flow ends; they may continue to run or appear as Running/Crash in the UI. - In volatile infrastructure (e.g., autoscaling K8s) you can also see “ghost” runs when infrastructure is evicted—this can leave tasks as Running/Crash even after the parent flow has ended. How to make your flow wait and fail correctly - Explicitly wait for futures before the flow returns: - Block until all finish and raise if any failed:
Copy code
from prefect import flow, task
    from prefect.futures import resolve_futures_to_results, wait

    @task
    def long(): ...
    @task
    def short(): ...

    @flow
    def my_flow():
        f1 = long.submit()
        f2 = short.submit()

        # Option A: resolve and raise on failure/crash
        resolve_futures_to_results([f1, f2])

        # Option B: wait then raise explicitly
        done, _ = wait([f1, f2])
        for fut in done:
            fut.result()  # raises if the task failed/crashed
- Wire dependencies so failures propagate automatically:
Copy code
@flow
  def seq_flow():
      a = long.submit()
      b = short.submit(wait_for=[a])  # b won’t run if a fails
      b.result()  # propagate b’s outcome to the flow
- For child flows/deployments: ensure you wait on whatever handle/state they return so the parent does not finish early. - If you need “fail-fast” behavior when any submitted task fails, collect futures and either: -
resolve_futures_to_results([...])
(simple, raises on failure), or -
wait([...])
then call
.result()
on the completed futures. Notes and references - Prefect states overview: Prefect states - Working with concurrency and futures (submit, wait, resolve): Run work concurrently - Example/report showing flows completing while a long-running task keeps going and the lack of “cancel all on exit”: Have a chance to terminate all flow’s tasks on flow exit - Sequential task runner and
wait_for
patterns: Tasks still running with Sequential Task Runner when using submit - “Ghost run” context (runner heartbeats in volatile infra): Heartbeat for runners / ghost runs If you can share a small code snippet or clarify: - Are you using
task.submit(...)
without awaiting/waiting on the futures? - Are the “still running” tasks in the same flow run, or are they child flows/deployments? - What task runner/infra are you using (process, threads, Dask, Kubernetes)? I can suggest the exact wait/resolve pattern to ensure the flow doesn’t Complete early.