oleander-whirlwind
10/06/2025, 12:24 PM.delay to submit the task run for deferred execution.
but if i use .delay instead of .submit it handsMarvin
10/06/2025, 12:25 PMMarvin
10/06/2025, 12:28 PMTask.submit(...) needs a task runner to schedule work.
- When you call some_task.fn(...), you’re bypassing Prefect orchestration and executing the underlying Python function directly. That function runs outside of any flow/task context.
- If that function tries to call other_task.submit(...), Prefect sees “submit outside a flow” and raises: “Unable to determine task runner to use for submission…”
- Using Task.delay(...) outside of a flow will schedule a background task, but it requires a task worker to be available. If none is running, things can appear to hang.
How to fix your fan-out/fan-in
- Orchestrate with the flow. Submit tasks from your flow, not from inside task.fn(...) or inside a task’s body.
- If you want to “fan-in” results with plain Python logic, either:
- Resolve futures to raw values in the flow and call the function eagerly with .fn(...), or
- Make the fan-in a task and submit it from the flow.
Two patterns that work
1) Fan-in with a task (fully orchestrated)
from prefect import flow, task
from prefect.futures import resolve_futures_to_results
@task
def work(i):
return i * 2
@task
def combine(vals):
return sum(vals)
@flow
def my_flow(xs):
futs = [work.submit(x) for x in xs] # fan-out
vals = resolve_futures_to_results(futs) # resolve in the flow
combined = combine.submit(vals) # fan-in as a task
return combined.result()
2) Fan-in with plain Python using .fn (no orchestration for that step)
from prefect import flow, task
from prefect.futures import resolve_futures_to_results
@task
def work(i):
return i * 2
@task
def combine(vals):
# safe to call with .fn ONLY if it doesn’t call .submit or Prefect APIs
return sum(vals)
@flow
def my_flow(xs):
futs = [work.submit(x) for x in xs]
vals = resolve_futures_to_results(futs)
combined = combine.fn(vals) # runs the Python function immediately
return combined
Key rules of thumb
- Don’t call Task.submit(...) inside another task’s body or inside a .fn(...) call — submit from a flow.
- Use .fn(...) only when the function does not need Prefect features (no retries, caching, or submitting other tasks).
- If you do use .delay(...), make sure you have a TaskWorker available; otherwise it can appear to hang. Generally, .delay is for background work outside of flows — most flow orchestration should stick to `submit`/futures.
Docs and references
- Task runners: Task runners
- Futures utilities (e.g., resolve_futures_to_results): Futures and runners
- Background tasks (context for .delay): Background tasks blog
If you can share a minimal snippet (the flow, the task you’re calling with .fn, and where .submit is invoked), I can point to the exact line that’s causing the context issue and suggest the cleanest rewrite. Also, just to confirm: when you said “it hands,” did you mean “it hangs”? If so, are you running any task worker process/thread alongside .delay, or just the flow?oleander-whirlwind
10/06/2025, 12:41 PMMarvin
10/06/2025, 12:41 PMMarvin
10/06/2025, 12:41 PM.fn, then deploy with parallelization. That’s a good workflow. The key is to keep fan-out/fan-in logic in the flow, and keep your tasks “thin” (just call a pure function). Then toggle between .fn and .submit at the flow level.
Pattern to follow
- Put your real logic in pure functions.
- Wrap them in Prefect tasks (no .submit inside task bodies).
- In the flow, branch on a parallel flag:
- parallel=False → use .fn so everything runs inline/synchronously.
- parallel=True → use .submit and gather with resolve_futures_to_results.
Example (single flow, dev vs prod toggle)
from prefect import flow, task
from prefect.futures import resolve_futures_to_results
# Pure Python logic
def _work_impl(i: int) -> int:
return i * 2
def _combine_impl(vals: list[int]) -> int:
return sum(vals)
# Thin tasks that just call the pure functions
@task
def work(i: int) -> int:
return _work_impl(i)
@task
def combine(vals: list[int]) -> int:
return _combine_impl(vals)
@flow
def my_flow(xs: list[int], parallel: bool = False) -> int:
# fan-out
if parallel:
futs = [work.submit(x) for x in xs]
vals = resolve_futures_to_results(futs)
else:
vals = [work.fn(x) for x in xs]
# fan-in
if parallel:
result = combine.submit(vals).result()
else:
result = combine.fn(vals)
return result
if __name__ == "__main__":
# Dev: run inline with .fn (no futures)
print(my_flow([1, 2, 3], parallel=False))
# Prod: run with parallelization
print(my_flow([1, 2, 3], parallel=True))
Why this works
- In dev, .fn runs directly — simple to debug and step through.
- In prod, .submit runs under the flow’s task runner, giving you concurrency.
- You never call .submit inside another task or inside a .fn call, so you avoid the “Unable to determine task runner” error.
Tips
- Prefer this toggle instead of .delay for most cases. .delay is for background tasks outside a flow and can appear to “hang” if no task worker is running.
- If you find yourself wanting to call .submit from inside a task, move that fan-out up into the flow instead.
References
- Task runners: Task runners
- Futures utilities (resolve_futures_to_results): Futures and runners
If you share a small snippet of your current fan-in code, I can adapt it to this toggle pattern so you can dev with .fn and deploy with .submit cleanly.