Abuzar Shakikh
10/27/2025, 1:54 PMMarvin
10/27/2025, 1:54 PMMarvin
10/27/2025, 2:01 PM.result() on it) which raises in the flow.
Common causes and fixes:
- You submit tasks but don’t resolve them
- Symptom: t = my_task.submit(...) and you don’t call t.result() or otherwise use t.
- Fix: resolve the future(s) so failures raise and fail the flow.
f = my_task.submit(...)
f.result() # raises on failure and fails the flow
- Multiple submitted tasks
- Wait alone doesn’t raise; you must resolve results.
futs = [my_task.submit(x) for x in items]
# Option A: resolve and raise on any failure
from prefect.futures import resolve_futures_to_results
results = resolve_futures_to_results(futs) # raises if any failed
# Option B: explicitly call .result() on each
results = [f.result() for f in futs] # raises if any failed
- Async flows/tasks
- Ensure you await the future or its result.
@flow
async def my_flow():
f = my_task.submit(...)
await f.result() # raises if failed
- You’re catching exceptions and not re-raising
- If a task catches its own exception and returns a value, Prefect sees it as “Completed”. Either let the task raise, or re-raise in the flow:
@task
def my_task(...):
try:
...
except Exception as e:
# log and re-raise to fail the task
raise
@flow
def my_flow():
f = my_task.submit(...)
f.result() # still needed if submitted
- Using allow_failure
- prefect.allow_failure(...) suppresses failure propagation. Don’t use it if you want the flow to fail. If you accidentally wrapped a future with it, remove that or explicitly call .result() on the underlying future to raise.
- Mapped tasks (fan-out)
- Treat them as a list of futures and resolve them:
futs = [my_task.submit(i) for i in range(n)]
# Raises if any one failed
[f.result() for f in futs]
- Checking state explicitly (if you need custom logic)
f = my_task.submit(...)
if f.state.is_failed():
# This will raise the original exception
f.result()
If you can share a small snippet of your flow showing how you’re calling the task (e.g., using .submit, async/await, any try/except), I can point to the exact line to change.Abuzar Shakikh
10/28/2025, 6:13 AMr = task.submit()
r.result()Marvin
10/28/2025, 6:14 AMMarvin
10/28/2025, 6:18 AMtask(...)), it runs sequentially and returns the result (no concurrency).
- task.submit(...) returns a PrefectFuture for concurrent execution. If you don’t store or use that future, the flow won’t automatically wait for it.
Ways to avoid r = task.submit(...) while still doing “the same”:
- Pass the future directly to another task (the flow will wait because the future is used downstream):
downstream_result = downstream_task(task.submit(x))
- Wait inline on a collection without naming each future:
from prefect.futures import wait
done, not_done = wait([task.submit(x), task.submit(y), task.submit(z)])
- Consume futures as they complete:
from prefect.futures import as_completed
for f in as_completed(task.submit(x) for x in xs):
print(f.result())
- Use map (no per-item variables), then resolve:
futs = task.map(xs)
results = futs.result()
Key point: Don’t “fire-and-forget” submit calls—if a future isn’t passed to another task, returned from the flow, or explicitly waited on, the flow may exit before that work completes.
Docs for more detail:
- Tasks: Tasks
- Concurrency: Run work concurrently
- Futures API: prefect.futures