Fredrik
08/16/2025, 8:59 PMMarvin
08/16/2025, 8:59 PMMarvin
08/16/2025, 9:02 PMtask.submit()
outside a running flow context (e.g., from a task body or after the flow returned), or
- all running work was awaited and the runner context was torn down before you tried to submit more tasks.
In your snippet, one likely trigger is calling load_future.result()
and then submitting historize_future
afterward. That should work inside a flow, but it will fail if this code is actually inside a task or after the flow has exited.
A few checks and fixes:
Questions to confirm
- Is this code inside a @flow
function (and not inside a @task
)?
- Which line raises the error exactly? The historize_future = load_lz_table.submit(...)
line?
- Are you calling .result()
on futures outside the flow function (e.g., after the flow returns)?
- Are you using a custom task runner (ThreadPool, Dask, etc.)?
Safe patterns in Prefect 3
- Keep all submit(...)
and .result()
calls inside a flow function’s execution.
- If you can, submit all downstream tasks before blocking on .result()
. This keeps the runner nursery open while you decide what to do next.
- If you must branch on results, it’s fine to call .result()
in-flow, then submit more tasks—so long as you’re still inside the same flow context.
Example refactor
This version avoids submitting after a potential teardown point and still allows branching:
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
@task
def extract_load_data(...):
...
@task
def load_lz_table(...):
...
@flow(task_runner=ThreadPoolTaskRunner())
def orchestrate(transfer, load_params, ...):
logger = get_run_logger()
check_future = ... # wherever this comes from
load_future = extract_load_data.with_options(
retries=load_params.run_el,
retry_delay_seconds=load_params.run_el_retries_delay_seconds,
task_run_name=f"dlt run fra {transfer.source_name} {transfer.source_object} til {transfer.target_name} {transfer.target_table_siste}",
).submit(
transfer=transfer,
load_params=load_params_dict,
write_disposition=load_params.dlt_write_disposition,
refresh=load_params.dlt_refresh,
logging=load_params.logging,
wait_for=[check_future],
)
# Option A: submit historization only after you know rows_extracted
rows_extracted = load_future.result()
if rows_extracted == 0:
logger.warning(
f"Ingen data har blitt lastet fra kilde til {transfer.target_table_siste}."
)
if load_params.load_historized_table and rows_extracted > 0:
historize_future = load_lz_table.submit(
transfer=transfer,
replace_existing=load_params.replace_historized_table,
load_params=load_params_dict,
wait_for=[load_future],
)
_ = historize_future.result()
If you want to maximize concurrency, you can submit the historization task before blocking on the load result, but you’ll need to accept that it’ll run regardless or add a separate guard/cancel step.
Other common gotchas
- Submitting from inside a task: In Prefect 3, schedule tasks from flows. If you need nested orchestration, use a subflow (@flow
) instead of a task to schedule more tasks.
- Calling .result()
after the flow function returns: that’s outside the runner; you’ll get exactly the error you’re seeing.
- Dask users: ensure you’re on the Prefect 3 Dask integration and set a task runner in the flow, e.g., @flow(task_runner=DaskTaskRunner())
from the prefect-dask
collection.
Docs, for when you need something to read while it breaks
- Running work concurrently in Prefect 3: Run work concurrentlyMarvin
08/16/2025, 9:02 PM@flow
or @task
,
- the exact stack trace (with the line number),
- and your task runner configuration,
I can point you at the precise fix rather than waving a ThreadPool around and hoping for the best.Fredrik
08/16/2025, 9:32 PMFredrik
08/16/2025, 9:32 PMMarvin
08/16/2025, 9:32 PMMarvin
08/16/2025, 9:37 PM.submit()
in Prefect 3. If you want “submit-like” non-blocking behavior, use one of these patterns:
1) Wrap the subflow call in a task and submit the task
- You’ll get a PrefectFuture
you can .result()
later, just like a task.
from prefect import flow, task
@flow
def child_flow(x: int) -> int:
return x + 1
@task
def run_child_flow(x: int) -> int:
return child_flow(x)
@flow
def parent():
f = run_child_flow.submit(1) # non-blocking
# ... do other work ...
result = f.result() # wait when you actually need it
return result
2) Trigger a deployment and wait later
- Fire-and-forget a separate flow run, then wait for it when you’re ready.
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
def parent():
fr = run_deployment(
name="my-project/child-flow", # your deployment name
parameters={"x": 1},
)
# ... do other work ...
state = wait_for_flow_run(fr.id) # block when you need to
result = state.result() # get the child flow's return
return result
3) From plain Python (not inside a flow), “submit” flows to a runner
- Useful for fanning out many flow runs from an app or service.
from prefect import flow
from prefect.runner import submit_to_runner
@flow
def myflow(x: int) -> int:
return x + 1
flow_run = submit_to_runner(myflow, parameters={"x": 1})
# store flow_run.id and track it; you can poll/wait via APIs later
Notes
- Calling a flow like child_flow(...)
is synchronous; it will run and return a value, not a future.
- If you need concurrency control and futures, wrap the flow call in a @task
and use .submit()
(pattern 1) or treat flows as deployments (pattern 2).
- Avoid scheduling new work after the flow context exits; that’s how you get the “cannot schedule new futures after shutdown” error, and nobody enjoys that.
Docs you may actually want to read
- Run work concurrently: How to run work concurrently
- Futures API: Futures API
If you tell me which context you’re in (inside a flow vs. external Python; need child runs in separate infra vs. same process), I’ll point you to the exact pattern.