<@ULVA73B9P> When migrating from prefect 2 to pref...
# marvin-ai
f
@Marvin When migrating from prefect 2 to prefect 3 we experience some issues with raise RuntimeError('cannot schedule new futures after shutdown') Here is the code where it is happening if load_params.run_el: load_future = extract_load_data.with_options( retries=load_params.run_el_retries, retry_delay_seconds=load_params.run_el_retries_delay_seconds, task_run_name=f"dlt run fra {transfer.source_name} {transfer.source_object} til {transfer.target_name} {transfer.target_table_siste}", ).submit( transfer=transfer, load_params=load_params_dict, write_disposition=load_params.dlt_write_disposition, refresh=load_params.dlt_refresh, logging=load_params.logging, wait_for=[check_future], ) futures.append(load_future) rows_extracted = load_future.result() if rows_extracted == 0: logger.warning( f"Ingen data har blitt lastet fra kilde" f"til {transfer.target_table_siste}." ) # historiser if load_params.load_historized_table: historize_future = load_lz_table.submit( transfer=transfer, replace_existing=load_params.replace_historized_table, load_params=load_params_dict, wait_for=[load_future], ) futures.append(historize_future)
m
thought for 201.3 seconds
That error means the task runner’s executor has already been shut down when you try to schedule more work. In Prefect 3, this typically happens if: - you call
task.submit()
outside a running flow context (e.g., from a task body or after the flow returned), or - all running work was awaited and the runner context was torn down before you tried to submit more tasks. In your snippet, one likely trigger is calling
load_future.result()
and then submitting
historize_future
afterward. That should work inside a flow, but it will fail if this code is actually inside a task or after the flow has exited. A few checks and fixes: Questions to confirm - Is this code inside a
@flow
function (and not inside a
@task
)? - Which line raises the error exactly? The
historize_future = load_lz_table.submit(...)
line? - Are you calling
.result()
on futures outside the flow function (e.g., after the flow returns)? - Are you using a custom task runner (ThreadPool, Dask, etc.)? Safe patterns in Prefect 3 - Keep all
submit(...)
and
.result()
calls inside a flow function’s execution. - If you can, submit all downstream tasks before blocking on
.result()
. This keeps the runner nursery open while you decide what to do next. - If you must branch on results, it’s fine to call
.result()
in-flow, then submit more tasks—so long as you’re still inside the same flow context. Example refactor This version avoids submitting after a potential teardown point and still allows branching:
Copy code
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner

@task
def extract_load_data(...):
    ...

@task
def load_lz_table(...):
    ...

@flow(task_runner=ThreadPoolTaskRunner())
def orchestrate(transfer, load_params, ...):
    logger = get_run_logger()

    check_future = ...  # wherever this comes from

    load_future = extract_load_data.with_options(
        retries=load_params.run_el,
        retry_delay_seconds=load_params.run_el_retries_delay_seconds,
        task_run_name=f"dlt run fra {transfer.source_name} {transfer.source_object} til {transfer.target_name} {transfer.target_table_siste}",
    ).submit(
        transfer=transfer,
        load_params=load_params_dict,
        write_disposition=load_params.dlt_write_disposition,
        refresh=load_params.dlt_refresh,
        logging=load_params.logging,
        wait_for=[check_future],
    )

    # Option A: submit historization only after you know rows_extracted
    rows_extracted = load_future.result()
    if rows_extracted == 0:
        logger.warning(
            f"Ingen data har blitt lastet fra kilde til {transfer.target_table_siste}."
        )

    if load_params.load_historized_table and rows_extracted > 0:
        historize_future = load_lz_table.submit(
            transfer=transfer,
            replace_existing=load_params.replace_historized_table,
            load_params=load_params_dict,
            wait_for=[load_future],
        )
        _ = historize_future.result()
If you want to maximize concurrency, you can submit the historization task before blocking on the load result, but you’ll need to accept that it’ll run regardless or add a separate guard/cancel step. Other common gotchas - Submitting from inside a task: In Prefect 3, schedule tasks from flows. If you need nested orchestration, use a subflow (
@flow
) instead of a task to schedule more tasks. - Calling
.result()
after the flow function returns: that’s outside the runner; you’ll get exactly the error you’re seeing. - Dask users: ensure you’re on the Prefect 3 Dask integration and set a task runner in the flow, e.g.,
@flow(task_runner=DaskTaskRunner())
from the
prefect-dask
collection. Docs, for when you need something to read while it breaks - Running work concurrently in Prefect 3: Run work concurrently
- Futures in Prefect 3: Futures API If you can share: - whether this code is in a
@flow
or
@task
, - the exact stack trace (with the line number), - and your task runner configuration, I can point you at the precise fix rather than waving a ThreadPool around and hoping for the best.
f
How can I get similar behaviour of a flow as one gets for a task.submit()
@Marvin How can I get similar behaviour of a flow as one gets for a task.submit()
m
thought for 298.9 seconds
Short answer: flows don’t have a
.submit()
in Prefect 3. If you want “submit-like” non-blocking behavior, use one of these patterns: 1) Wrap the subflow call in a task and submit the task - You’ll get a
PrefectFuture
you can
.result()
later, just like a task.
Copy code
from prefect import flow, task

@flow
def child_flow(x: int) -> int:
    return x + 1

@task
def run_child_flow(x: int) -> int:
    return child_flow(x)

@flow
def parent():
    f = run_child_flow.submit(1)    # non-blocking
    # ... do other work ...
    result = f.result()              # wait when you actually need it
    return result
2) Trigger a deployment and wait later - Fire-and-forget a separate flow run, then wait for it when you’re ready.
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow
def parent():
    fr = run_deployment(
        name="my-project/child-flow",   # your deployment name
        parameters={"x": 1},
    )
    # ... do other work ...
    state = wait_for_flow_run(fr.id)    # block when you need to
    result = state.result()             # get the child flow's return
    return result
3) From plain Python (not inside a flow), “submit” flows to a runner - Useful for fanning out many flow runs from an app or service.
Copy code
from prefect import flow
from prefect.runner import submit_to_runner

@flow
def myflow(x: int) -> int:
    return x + 1

flow_run = submit_to_runner(myflow, parameters={"x": 1})
# store flow_run.id and track it; you can poll/wait via APIs later
Notes - Calling a flow like
child_flow(...)
is synchronous; it will run and return a value, not a future. - If you need concurrency control and futures, wrap the flow call in a
@task
and use
.submit()
(pattern 1) or treat flows as deployments (pattern 2). - Avoid scheduling new work after the flow context exits; that’s how you get the “cannot schedule new futures after shutdown” error, and nobody enjoys that. Docs you may actually want to read - Run work concurrently: How to run work concurrently - Futures API: Futures API If you tell me which context you’re in (inside a flow vs. external Python; need child runs in separate infra vs. same process), I’ll point you to the exact pattern.