<@ULVA73B9P> so i have created a fan in with for l...
# ask-marvin
o
@Marvin so i have created a fan in with for loops and .submit are those but when i use .fn it has issues Unable to determine task runner to use for submission. If you are submitting a task outside of a flow, please use
.delay
to submit the task run for deferred execution. but if i use .delay instead of .submit it hands
m
thought for 210.7 seconds
Thanks for the details — this error message is a helpful clue. What’s happening - In Prefect 3.x, only a flow has a task runner.
Task.submit(...)
needs a task runner to schedule work. - When you call
some_task.fn(...)
, you’re bypassing Prefect orchestration and executing the underlying Python function directly. That function runs outside of any flow/task context. - If that function tries to call
other_task.submit(...)
, Prefect sees “submit outside a flow” and raises: “Unable to determine task runner to use for submission…” - Using
Task.delay(...)
outside of a flow will schedule a background task, but it requires a task worker to be available. If none is running, things can appear to hang. How to fix your fan-out/fan-in - Orchestrate with the flow. Submit tasks from your flow, not from inside
task.fn(...)
or inside a task’s body. - If you want to “fan-in” results with plain Python logic, either: - Resolve futures to raw values in the flow and call the function eagerly with
.fn(...)
, or - Make the fan-in a task and
submit
it from the flow. Two patterns that work 1) Fan-in with a task (fully orchestrated)
Copy code
from prefect import flow, task
from prefect.futures import resolve_futures_to_results

@task
def work(i):
    return i * 2

@task
def combine(vals):
    return sum(vals)

@flow
def my_flow(xs):
    futs = [work.submit(x) for x in xs]      # fan-out
    vals = resolve_futures_to_results(futs)  # resolve in the flow
    combined = combine.submit(vals)          # fan-in as a task
    return combined.result()
2) Fan-in with plain Python using
.fn
(no orchestration for that step)
Copy code
from prefect import flow, task
from prefect.futures import resolve_futures_to_results

@task
def work(i):
    return i * 2

@task
def combine(vals):
    # safe to call with .fn ONLY if it doesn’t call .submit or Prefect APIs
    return sum(vals)

@flow
def my_flow(xs):
    futs = [work.submit(x) for x in xs]
    vals = resolve_futures_to_results(futs)
    combined = combine.fn(vals)  # runs the Python function immediately
    return combined
Key rules of thumb - Don’t call
Task.submit(...)
inside another task’s body or inside a
.fn(...)
call — submit from a flow. - Use
.fn(...)
only when the function does not need Prefect features (no retries, caching, or submitting other tasks). - If you do use
.delay(...)
, make sure you have a
TaskWorker
available; otherwise it can appear to hang. Generally,
.delay
is for background work outside of flows — most flow orchestration should stick to `submit`/futures. Docs and references - Task runners: Task runners - Futures utilities (e.g., resolve_futures_to_results): Futures and runners - Background tasks (context for
.delay
): Background tasks blog If you can share a minimal snippet (the flow, the task you’re calling with
.fn
, and where
.submit
is invoked), I can point to the exact line that’s causing the context issue and suggest the cleanest rewrite. Also, just to confirm: when you said “it hands,” did you mean “it hangs”? If so, are you running any task worker process/thread alongside
.delay
, or just the flow?
o
@Marvin but i need to develop on the machine with fn and then deploy using parallelisation.
m
thought for 33.5 seconds
Got it — you want to iterate quickly locally with
.fn
, then deploy with parallelization. That’s a good workflow. The key is to keep fan-out/fan-in logic in the flow, and keep your tasks “thin” (just call a pure function). Then toggle between
.fn
and
.submit
at the flow level. Pattern to follow - Put your real logic in pure functions. - Wrap them in Prefect tasks (no
.submit
inside task bodies). - In the flow, branch on a
parallel
flag: -
parallel=False
→ use
.fn
so everything runs inline/synchronously. -
parallel=True
→ use
.submit
and gather with
resolve_futures_to_results
. Example (single flow, dev vs prod toggle)
Copy code
from prefect import flow, task
from prefect.futures import resolve_futures_to_results

# Pure Python logic
def _work_impl(i: int) -> int:
    return i * 2

def _combine_impl(vals: list[int]) -> int:
    return sum(vals)

# Thin tasks that just call the pure functions
@task
def work(i: int) -> int:
    return _work_impl(i)

@task
def combine(vals: list[int]) -> int:
    return _combine_impl(vals)

@flow
def my_flow(xs: list[int], parallel: bool = False) -> int:
    # fan-out
    if parallel:
        futs = [work.submit(x) for x in xs]
        vals = resolve_futures_to_results(futs)
    else:
        vals = [work.fn(x) for x in xs]

    # fan-in
    if parallel:
        result = combine.submit(vals).result()
    else:
        result = combine.fn(vals)

    return result

if __name__ == "__main__":
    # Dev: run inline with .fn (no futures)
    print(my_flow([1, 2, 3], parallel=False))
    # Prod: run with parallelization
    print(my_flow([1, 2, 3], parallel=True))
Why this works - In dev,
.fn
runs directly — simple to debug and step through. - In prod,
.submit
runs under the flow’s task runner, giving you concurrency. - You never call
.submit
inside another task or inside a
.fn
call, so you avoid the “Unable to determine task runner” error. Tips - Prefer this toggle instead of
.delay
for most cases.
.delay
is for background tasks outside a flow and can appear to “hang” if no task worker is running. - If you find yourself wanting to call
.submit
from inside a task, move that fan-out up into the flow instead. References - Task runners: Task runners - Futures utilities (resolve_futures_to_results): Futures and runners If you share a small snippet of your current fan-in code, I can adapt it to this toggle pattern so you can dev with
.fn
and deploy with
.submit
cleanly.