Hi <@ULVA73B9P> when a task in my prefect flow fai...
# marvin-ai
a
Hi @Marvin when a task in my prefect flow fails I want it to send a failure signal for the whole flow, but it sends a successful signal and marks the same as successful in the dashboard, how can I fix that.
m
thought for 450.8 seconds
Got it — this usually happens when a task fails but the flow never “consumes” that failure, so the flow itself finishes without raising an exception. In Prefect 3.x, a flow will enter a Failed state only if: - the flow function raises an exception, or - you resolve a failed task future (e.g., call
.result()
on it) which raises in the flow. Common causes and fixes: - You submit tasks but don’t resolve them - Symptom:
t = my_task.submit(...)
and you don’t call
t.result()
or otherwise use
t
. - Fix: resolve the future(s) so failures raise and fail the flow.
Copy code
f = my_task.submit(...)
    f.result()  # raises on failure and fails the flow
- Multiple submitted tasks - Wait alone doesn’t raise; you must resolve results.
Copy code
futs = [my_task.submit(x) for x in items]
    # Option A: resolve and raise on any failure
    from prefect.futures import resolve_futures_to_results
    results = resolve_futures_to_results(futs)  # raises if any failed

    # Option B: explicitly call .result() on each
    results = [f.result() for f in futs]  # raises if any failed
- Async flows/tasks - Ensure you await the future or its result.
Copy code
@flow
    async def my_flow():
        f = my_task.submit(...)
        await f.result()  # raises if failed
- You’re catching exceptions and not re-raising - If a task catches its own exception and returns a value, Prefect sees it as “Completed”. Either let the task raise, or re-raise in the flow:
Copy code
@task
    def my_task(...):
        try:
            ...
        except Exception as e:
            # log and re-raise to fail the task
            raise

    @flow
    def my_flow():
        f = my_task.submit(...)
        f.result()  # still needed if submitted
- Using allow_failure -
prefect.allow_failure(...)
suppresses failure propagation. Don’t use it if you want the flow to fail. If you accidentally wrapped a future with it, remove that or explicitly call
.result()
on the underlying future to raise. - Mapped tasks (fan-out) - Treat them as a list of futures and resolve them:
Copy code
futs = [my_task.submit(i) for i in range(n)]
    # Raises if any one failed
    [f.result() for f in futs]
- Checking state explicitly (if you need custom logic)
Copy code
f = my_task.submit(...)
  if f.state.is_failed():
      # This will raise the original exception
      f.result()
If you can share a small snippet of your flow showing how you’re calling the task (e.g., using
.submit
, async/await, any try/except), I can point to the exact line to change.
a
@Marvin can I do the same without using the
Copy code
r = task.submit()
r.result()
m
thought for 295.7 seconds
Do you want to run it sequentially, or still run it concurrently without keeping a variable around? In Prefect 3.x: - If you call the task directly (
task(...)
), it runs sequentially and returns the result (no concurrency). -
task.submit(...)
returns a PrefectFuture for concurrent execution. If you don’t store or use that future, the flow won’t automatically wait for it. Ways to avoid
r = task.submit(...)
while still doing “the same”: - Pass the future directly to another task (the flow will wait because the future is used downstream):
Copy code
downstream_result = downstream_task(task.submit(x))
- Wait inline on a collection without naming each future:
Copy code
from prefect.futures import wait
done, not_done = wait([task.submit(x), task.submit(y), task.submit(z)])
- Consume futures as they complete:
Copy code
from prefect.futures import as_completed
for f in as_completed(task.submit(x) for x in xs):
    print(f.result())
- Use map (no per-item variables), then resolve:
Copy code
futs = task.map(xs)
results = futs.result()
Key point: Don’t “fire-and-forget” submit calls—if a future isn’t passed to another task, returned from the flow, or explicitly waited on, the flow may exit before that work completes. Docs for more detail: - Tasks: Tasks - Concurrency: Run work concurrently - Futures API: prefect.futures