Alex Robson
09/02/2025, 2:56 PMMarvin
09/02/2025, 2:56 PMMarvin
09/02/2025, 3:06 PMprefect task-run logs <TASK_RUN_ID>
.
2) ProcessPoolTaskRunner pickling issues
- If you are using @flow(task_runner=ProcessPoolTaskRunner())
, all task inputs/outputs must be picklable and task functions must be defined at module top-level (no lambdas/inner functions).
- Symptoms often look like dependent tasks failing “instantly” once they try to materialize an upstream result and you’ll see errors like “TypeError: cannot pickle …” or “Can’t pickle local object …”
- Fixes:
- Ensure tasks and everything they pass between each other are picklable (no open DB connections, locks, client objects, etc.).
- If your workload is I/O bound, switch to threads to avoid pickling constraints:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
...
- For CPU-bound work, keep process pools but only pass simple/serializable data between tasks.
3) Missing or lost results (after a restart)
- If the worker or flow process restarts mid-run and your tasks aren’t persisting results, downstream tasks can’t retrieve upstream outputs and may fail immediately with a MissingResult-type message.
- Fix: Enable result persistence on tasks that produce outputs used later:
from prefect import task
@task(persist_result=True)
def produce():
...
Consider configuring a durable result store if your runs may outlive a single process.
4) Mixing .submit()
and direct calls in ways that force eager execution
- In Prefect 3.x, use .submit()
to schedule tasks and pass futures to downstream tasks. Only call .result()
when you truly need the value.
- A safe pattern:
a_future = a.submit()
b_future = b.submit(a_future) # pass the future, not a_future.result()
If you call a task like a normal function, it runs immediately and errors propagate early.
5) Environment issues on the worker
- If imports/dependencies or env vars are missing on the Ubuntu server, tasks can fail right away when they start running in the worker environment.
- Action: Confirm your worker environment has the same Python deps and code as your dev machine, and that any secrets/env vars your tasks need are present.
Quick diagnostic checklist:
- Which task runner are you using? Do you have @flow(task_runner=ProcessPoolTaskRunner())
in your code?
- What is the exact state message/traceback for the first failing dependent task?
- Can you share a minimal example of two tasks where the first returns a value and the second consumes it?
- What Prefect version are you on (prefect version
)?
If you can paste one of the failing task’s state messages (or a minimal code snippet), I can pinpoint which of the above it is and suggest a precise fix.Alex Robson
09/02/2025, 3:11 PMfrom prefect import flow, task
@task
def task_a():
print("Running task A")
return "A's result"
@task
def task_b():
print("Running task B")
return "B's result"
@task
def no_op_task(*dependencies):
# The 'dependencies' tuple will contain the results from task_a and task_b,
# but the code doesn't use it.
print("Running no-op task, waiting for dependencies to finish...")
# This task will run only after task_a and task_b are complete.
@flow
def my_flow():
a_future = task_a.submit()
b_future = task_b.submit()
# The no_op_task will wait for both a_future and b_future to resolve
# before it starts.
no_op_task.submit(a_future, b_future)
# Running the flow
if __name__ == "__main__":
my_flow()
And this is the error on the dependent task
Crash detected! Execution was cancelled by the runtime environment.
Marvin
09/02/2025, 3:11 PMMarvin
09/02/2025, 3:12 PMjournalctl -u <your-worker-service>
for SIGTERM/OOM.
2) Nested multiprocessing/signal interaction
- If you set @flow(task_runner=ProcessPoolTaskRunner())
, your flow will create more processes inside a process-launched job. That combo can be fragile with signals and can present exactly as “cancelled by runtime”.
- Quick test: switch the flow to threads to rule out multiprocessing issues:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def task_a():
print("Running task A")
return "A's result"
@task
def task_b():
print("Running task B")
return "B's result"
@task
def no_op_task(*dependencies):
print("Running no-op task...")
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
a = task_a.submit()
b = task_b.submit()
no_op_task.submit(a, b)
if __name__ == "__main__":
my_flow()
- If this fixes it, the issue is multiprocessing+signals. For CPU-bound workloads you can still use processes, but avoid nested process pools in process-based infrastructure and ensure everything passed between tasks is picklable and top-level.
3) Environment/service management
- The worker environment may be missing deps/env vars or the service manager is stopping child processes.
- Action: Ensure the same code/deps exist on the worker host; if using systemd, try KillMode=process
for the worker service to avoid killing children on reload; confirm no resource limits (MemoryMax, etc.).
Diagnostics to run now
- Look at the first crashed task’s logs and state message:
- prefect task-run logs <TASK_RUN_ID>
- prefect flow-run logs <FLOW_RUN_ID>
- Check worker logs around the failure for any shutdown/restart/OOM.
- Try the same flow with ThreadPoolTaskRunner (above). Does the crash go away?
- Confirm Prefect version (prefect version
) and whether you’ve set a task runner explicitly.
If you can share:
- Prefect version
- Whether you’re using ProcessPoolTaskRunner
- How the worker is started (manual shell vs systemd/service) and any worker logs around the crash
- One crashed task’s full state message/traceback
…I can pinpoint the root cause and suggest the exact fix.Alex Robson
09/02/2025, 3:34 PMMarvin
09/02/2025, 3:34 PMMarvin
09/02/2025, 3:35 PMfrom prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
...
If crashes stop, you hit a multiprocessing + subprocess interaction.
2) Make subprocess calls “safe” in tasks
- Don’t return CompletedProcess; return simple/JSON-serializable data.
- Capture output and avoid check=True unless you explicitly want to raise:
import subprocess
@task
def run_cmd():
res = subprocess.run(
["bash", "-lc", "your-command"],
capture_output=True,
text=True,
check=False,
start_new_session=True # isolates signal handling
)
return {"code": res.returncode, "out": res.stdout, "err": res.stderr}
- If you need a failure on non-zero exit, raise a clear exception yourself so downstream tasks fail with an UpstreamTaskError (not a crash):
if res.returncode != 0:
raise RuntimeError(f"Command failed ({res.returncode}): {res.stderr}")
3) Avoid nested process pools
- If you are using ProcessPoolTaskRunner, consider switching to ThreadPoolTaskRunner when your tasks call subprocesses. Threads work well for I/O-bound shell calls and avoid the extra process layer.
4) Persist results if runs may be interrupted
- If your worker might restart, enable result persistence on tasks whose outputs are consumed later:
from prefect import task
@task(persist_result=True)
def produce():
...
5) Inspect logs around the crash
- Check worker/host logs for signals, restarts, or OOM around the time of the crash.
- Grab Prefect logs to confirm what happened first:
- prefect flow-run logs <FLOW_RUN_ID>
- prefect task-run logs <TASK_RUN_ID>
If you can share:
- Whether you’re using ProcessPoolTaskRunner in the flow
- A snippet of the subprocess.run you’re calling
- One crashed task’s full state message/logs
I can give you a precise recommendation (e.g., threads vs processes, specific subprocess.run flags, or service configuration tweaks).Alex Robson
09/02/2025, 3:43 PMfrom prefect import get_run_logger, task, flow
from dotenv import load_dotenv
from prefect.task_runners import ThreadPoolTaskRunner
import time
@task
def task_a():
logger = get_run_logger()
<http://logger.info|logger.info>("Running task A...")
return "A's result"
@task
def task_b():
logger = get_run_logger()
<http://logger.info|logger.info>("Running task B...")
return "B's result"
@task
def no_op_task(*dependencies):
logger = get_run_logger()
<http://logger.info|logger.info>("Running no-op task...")
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
a = task_a.submit()
b = task_b.submit()
no_op_task.submit(a, b)
if __name__ == "__main__":
my_flow()
This doesn't
from prefect import get_run_logger, task, flow
from dotenv import load_dotenv
from prefect.task_runners import ThreadPoolTaskRunner
import time
@task
def task_a():
logger = get_run_logger()
<http://logger.info|logger.info>("Running task A...")
time.sleep(5)
return "A's result"
@task
def task_b():
logger = get_run_logger()
<http://logger.info|logger.info>("Running task B...")
return "B's result"
@task
def no_op_task(*dependencies):
logger = get_run_logger()
<http://logger.info|logger.info>("Running no-op task...")
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
a = task_a.submit()
b = task_b.submit()
no_op_task.submit(a, b)
if __name__ == "__main__":
my_flow()
Marvin
09/02/2025, 3:43 PMMarvin
09/02/2025, 3:46 PMtime.sleep(5)
keeps task_a
running after the flow has already returned, so Prefect cancels the work and you see “Execution was cancelled by the runtime environment.”
In Prefect 3.x, always wait for the futures you care about before the flow returns. Easiest fixes:
Option A: Block on the last task
from prefect import get_run_logger, task, flow
from prefect.task_runners import ThreadPoolTaskRunner
import time
@task
def task_a():
logger = get_run_logger()
<http://logger.info|logger.info>("Running task A...")
time.sleep(5)
return "A's result"
@task
def task_b():
logger = get_run_logger()
<http://logger.info|logger.info>("Running task B...")
return "B's result"
@task
def no_op_task(*dependencies):
logger = get_run_logger()
<http://logger.info|logger.info>("Running no-op task...")
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
a = task_a.submit()
b = task_b.submit()
noop = no_op_task.submit(a, b)
noop.result() # wait so the flow does not exit early
if __name__ == "__main__":
my_flow()
Option B: Wait on all futures (if you have many)
from prefect.futures import wait
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
a = task_a.submit()
b = task_b.submit()
noop = no_op_task.submit(a, b)
wait([a, b, noop])
Option C: Return a future’s result from the flow (the flow will block on it)
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
a = task_a.submit()
b = task_b.submit()
return no_op_task.submit(a, b).result()
That should eliminate the crash. If you still see it after waiting on the futures, let me know how you’re starting the worker (systemd, Docker, etc.) and share the worker logs around the failure time so we can check for restarts or signals.Alex Robson
09/02/2025, 4:02 PMwait(futures)
?Marvin
09/02/2025, 4:02 PMMarvin
09/02/2025, 4:02 PMprefect.futures.wait
and it takes a list of `PrefectFuture`s. Example:
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import wait
@task
def task_a():
get_run_logger().info("A")
return "A"
@task
def task_b():
get_run_logger().info("B")
return "B"
@task
def no_op(*deps):
get_run_logger().info("noop")
@flow(task_runner=ThreadPoolTaskRunner())
def my_flow():
futures = []
futures.append(task_a.submit())
futures.append(task_b.submit())
futures.append(no_op.submit(futures[0], futures[1]))
# Block until they’re all done (or raise on crash/failure if you call .result() later)
done = wait(futures)
# Optionally access results
for f in done.done:
_ = f.result()
if __name__ == "__main__":
my_flow()
Notes:
- wait
returns a DoneAndNotDoneFutures
object with .done
and .not_done
sets.
- You can pass timeout=<seconds>
if you want to cap total waiting time.
- If you want the flow to fail when any task fails, either iterate and call .result()
on the futures (which will raise) or explicitly check states.Alex Robson
09/04/2025, 8:01 PM@flow(name="Sample Processing")
def sample_flow():
futures = []
a_flow_future = a_flow.submit()
## Chances stuff
z_task_future = tasks.z_task.submit()
futures.append(z_task_future)
b_flow_future = b_flow.submit(a_flow_future)
c_flow_future = c_flow.submit(a_flow_future)
d_flow_future = d_flow.submit(b_flow_future,c_flow_future)
wait(futures)
@Marvin Debug this flow. Is this done properly?Marvin
09/04/2025, 8:01 PMMarvin
09/04/2025, 8:08 PM.submit()
to run tasks and subflows concurrently is supported in Prefect 3.x. Passing Prefect futures as inputs to downstream tasks/subflows is also correct; Prefect will resolve them to results when needed.
- Your dependency chain a -> (b, c) -> d is modeled correctly by passing futures.
Fixes and suggestions
- Import wait
from prefect.futures
(it’s not available from the top-level prefect
import).
- If you intend to wait for everything before the flow exits, include all relevant futures in your wait, not just z_task_future
. Alternatively, return a downstream future’s result to implicitly wait for its entire dependency chain.
- Prefer named arguments when passing futures to subflows/tasks to match their signatures.
- Optional: set a task runner if you want local concurrency control (threads/processes).
Cleaned up example
from prefect import flow
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
# assuming a_flow, b_flow, c_flow, d_flow are @flow-decorated
# and tasks.z_task is a @task
@flow(name="Sample Processing", task_runner=ThreadPoolTaskRunner())
def sample_flow():
a_future = a_flow.submit()
z_future = tasks.z_task.submit()
# If b_flow takes the result of a_flow as, say, "a_result", pass it by name
b_future = b_flow.submit(a_future) # e.g., b_flow.submit(a_result=a_future)
c_future = c_flow.submit(a_future) # e.g., c_flow.submit(a_result=a_future)
d_future = d_flow.submit(b_future, c_future) # e.g., d_flow.submit(b=b_future, c=c_future)
# Option 1: explicitly wait for everything you care about:
wait([z_future, b_future, c_future, d_future])
# Option 2 (common): return a result to implicitly wait on its chain
# return d_future.result()
Notes
- You do not have to call wait()
for Prefect to run the submitted work; Prefect will track submitted tasks/subflows that are part of the flow run. Use wait()
if you want to block at a specific point, manage timeouts, or ensure “extra” work (like z_task
) finishes before returning if it’s not part of your returned dependency chain.
- Avoid calling .result()
too early on upstream futures if you want to preserve concurrency; do it at the end (or only when you actually need the value).
If you’re seeing a specific error or unexpected behavior (e.g., NameError for wait
, flows running sequentially, or type mismatches in parameters), share the error and the signatures of a_flow/b_flow/c_flow/d_flow
and I’ll pinpoint the fix.
Helpful docs
- Futures API (wait, as_completed): Prefect Futures API
- Task runners and concurrency: Task RunnersAlex Robson
09/04/2025, 8:10 PMAlex Robson
09/04/2025, 8:11 PMMarvin
09/04/2025, 8:11 PMMarvin
09/04/2025, 8:13 PMFlow
object doesn’t have .submit()
. A couple possibilities:
- You’re on Prefect 2.x (flows don’t support .submit()
there).
- Or your subflows aren’t decorated as Prefect flows.
Quick check: what Prefect version are you running? You can verify with:
prefect version
or
python -c "import prefect; print(prefect.__version__)"
If you’re on Prefect 3.x
- Ensure your subflows are defined with @flow
. In 3.x, both tasks and flows can be submitted concurrently.
- Import wait
from prefect.futures
.
Example:
from prefect import flow
from prefect.futures import wait
@flow
def a_flow(): ...
@flow
def b_flow(a_result): ...
@flow
def c_flow(a_result): ...
@flow
def d_flow(b_result, c_result): ...
# tasks.z_task is a @task
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_future = a_flow.submit()
b_future = b_flow.submit(a_future) # or b_flow.submit(a_result=a_future)
c_future = c_flow.submit(a_future) # or c_flow.submit(a_result=a_future)
d_future = d_flow.submit(b_future, c_future) # or d_flow.submit(b_result=b_future, c_result=c_future)
# Either explicitly wait
wait([z_future, b_future, c_future, d_future])
# Or return a result to implicitly wait on the whole chain
# return d_future.result()
If you’re on Prefect 2.x
Flows do not have .submit()
. Use one of these patterns:
- Synchronous subflow calls (simple and common):
@flow
def sample_flow():
z_future = tasks.z_task.submit()
a_result = a_flow()
b_result = b_flow(a_result)
c_result = c_flow(a_result)
d_result = d_flow(b_result, c_result)
# optionally wait for z if you care it finishes before exit
z_future.result()
return d_result
- Concurrent “subflows” by wrapping each subflow call in a task, then submitting the tasks:
from prefect import task, flow
@task
def run_a(): return a_flow()
@task
def run_b(a): return b_flow(a)
@task
def run_c(a): return c_flow(a)
@task
def run_d(b, c): return d_flow(b, c)
@flow
def sample_flow():
z_future = tasks.z_task.submit()
a_future = run_a.submit()
b_future = run_b.submit(a_future)
c_future = run_c.submit(a_future)
d_future = run_d.submit(b_future, c_future)
return d_future.result()
- Or trigger separate flow runs via deployments (asynchronous orchestration):
from prefect.deployments import run_deployment
@flow
def sample_flow():
fr = run_deployment(name="a-flow/a-deployment", as_subflow=True)
# run_deployment returns a FlowRun; you can poll/wait if desired
Docs/resources
- Futures API (3.x): Prefect Futures API
- Design patterns discussion: Workflow design patterns
If you share your Prefect version and how a_flow/b_flow/c_flow/d_flow
are defined, I can tailor the exact fix.Alex Robson
09/04/2025, 8:17 PMMarvin
09/04/2025, 8:17 PMMarvin
09/04/2025, 8:19 PMFlow
objects do not have a .submit()
method. That’s why you’re seeing:
AttributeError: 'Flow' object has no attribute 'submit'
How to fix it
You have a few solid options depending on whether you truly need the subflows to run concurrently or just want to keep them modular.
Option A — Keep subflows, run them sequentially (simple)
Call subflows like normal functions; only tasks have `.submit()`:
from prefect import flow
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_result = a_flow()
b_result = b_flow(a_result)
c_result = c_flow(a_result)
d_result = d_flow(b_result, c_result)
# ensure z finishes if you need it before exit
z_future.result()
return d_result
Option B — Run subflows concurrently via wrapper tasks (recommended if you want concurrency across subflows)
Wrap each subflow call in a @task
and submit those tasks. This preserves your subflow organization but uses task futures for concurrency.
from prefect import flow, task
from prefect.futures import wait
@task
def run_a():
return a_flow()
@task
def run_b(a):
return b_flow(a)
@task
def run_c(a):
return c_flow(a)
@task
def run_d(b, c):
return d_flow(b, c)
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_future = run_a.submit()
b_future = run_b.submit(a_future)
c_future = run_c.submit(a_future)
d_future = run_d.submit(b_future, c_future)
# Explicitly wait for what you care about
wait([z_future, d_future])
return d_future.result()
Option C — Kick off separate flow runs via deployments (advanced, decoupled)
If a_flow
, b_flow
, etc. are deployed, you can launch them as subflow runs and wait on them:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_run = run_deployment(name="a-flow/a-deployment", as_subflow=True)
wait_for_flow_run(a_run.id)
a_result = a_run.state.result() # if you need the result
b_run = run_deployment(name="b-flow/b-deployment", parameters={"a_result": a_result}, as_subflow=True)
c_run = run_deployment(name="c-flow/c-deployment", parameters={"a_result": a_result}, as_subflow=True)
wait_for_flow_run(b_run.id)
wait_for_flow_run(c_run.id)
d_run = run_deployment(name="d-flow/d-deployment",
parameters={"b_result": b_run.state.result(), "c_result": c_run.state.result()},
as_subflow=True)
wait_for_flow_run(d_run.id)
z_future.result()
return d_run.state.result()
Notes
- Only tasks have `.submit()`; flows are called like normal functions.
- If you need concurrency, wrap flow calls in tasks (Option B) or use deployments (Option C).
- Import wait
from prefect.futures
when you want to block at specific points: Prefect Futures API.
If you can share how a_flow
, b_flow
, c_flow
, and d_flow
are defined (signatures) and whether you need them to run concurrently, I can tailor the best pattern for your case.Alex Robson
09/04/2025, 8:21 PMMarvin
09/04/2025, 8:21 PMMarvin
09/04/2025, 8:22 PMFlow.submit
. In Prefect 3.x, flows do not have a .submit()
method. Only tasks have .submit()
.
How to achieve what you want
- Sequential subflows (simple):
from prefect import flow
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_result = a_flow()
b_result = b_flow(a_result)
c_result = c_flow(a_result)
d_result = d_flow(b_result, c_result)
z_future.result() # ensure z completes if needed
return d_result
- Concurrent “subflows” via wrapper tasks (common pattern for concurrency):
from prefect import flow, task
from prefect.futures import wait
@task
def run_a(): return a_flow()
@task
def run_b(a): return b_flow(a)
@task
def run_c(a): return c_flow(a)
@task
def run_d(b, c): return d_flow(b, c)
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_future = run_a.submit()
b_future = run_b.submit(a_future)
c_future = run_c.submit(a_future)
d_future = run_d.submit(b_future, c_future)
wait([z_future, d_future])
return d_future.result()
- Launch separate flow runs via deployments (decoupled, if each flow has its own deployment):
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow(name="Sample Processing")
def sample_flow():
z_future = tasks.z_task.submit()
a_run = run_deployment(name="a-flow/a-deployment", as_subflow=True)
wait_for_flow_run(a_run.id)
a_result = a_run.state.result()
b_run = run_deployment(name="b-flow/b-deployment", parameters={"a_result": a_result}, as_subflow=True)
c_run = run_deployment(name="c-flow/c-deployment", parameters={"a_result": a_result}, as_subflow=True)
wait_for_flow_run(b_run.id)
wait_for_flow_run(c_run.id)
d_run = run_deployment(
name="d-flow/d-deployment",
parameters={"b_result": b_run.state.result(), "c_result": c_run.state.result()},
as_subflow=True,
)
wait_for_flow_run(d_run.id)
z_future.result()
return d_run.state.result()
Helpful docs
- Futures (for wait
, result()
): Prefect Futures API
- Running deployments programmatically: run_deployment
If you tell me whether you need true parallelism between your subflows or just modular structure (plus whether each subflow is deployed), I can recommend the best of the three patterns above for your case. And sorry for the earlier confusion around .submit()
on flows!Alex Robson
09/04/2025, 8:35 PMMarvin
09/04/2025, 8:35 PMMarvin
09/04/2025, 8:36 PM@task
for units of work
- Call task.submit(...)
to start them
- Pass futures to downstream tasks
- Only call .result()
(or wait([...])
) when you need the values
Minimal pattern
from prefect import flow, task
from prefect.futures import wait
@task
def step_a():
...
@task
def step_b():
...
@task
def step_c(a, b):
...
@flow
def sample_flow():
a_future = step_a.submit()
b_future = step_b.submit()
# a and b run concurrently
c_future = step_c.submit(a_future, b_future) # depends on a and b
# Block only when you need results
return c_future.result()
Control local parallelism (optional)
- IO-bound work: use threads
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def sample_flow():
...
- CPU-bound work: use processes
from prefect.task_runners import ProcessPoolTaskRunner
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def sample_flow():
...
Tips
- Don’t call .result()
on upstream futures too early—that serializes execution.
- Use wait([f1, f2, ...])
if you want to block at a specific point or enforce a timeout.
- Flows themselves don’t have `.submit()`; that’s only for tasks.
Docs for reference
- Futures API (wait, result, as_completed): Prefect Futures API
- Task runners (threads vs processes): Task Runners
If you tell me whether your workload is IO- or CPU-bound, I can suggest the best runner and max_workers
settings.Alex Robson
09/04/2025, 8:37 PMMarvin
09/04/2025, 8:37 PMMarvin
09/04/2025, 8:38 PMfrom prefect import flow, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
@task
def run_a():
return a_flow()
@task
def run_b(a):
return b_flow(a)
@task
def run_c(a):
return c_flow(a)
@task
def run_d(b, c):
return d_flow(b, c)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def sample_processing():
# run A, then B and C concurrently, then D
a_f = run_a.submit()
b_f = run_b.submit(a_f)
c_f = run_c.submit(a_f)
d_f = run_d.submit(b_f, c_f)
# block only when needed
return d_f.result()
- Notes:
- Only tasks have .submit()
.
- Choose ThreadPoolTaskRunner
(I/O bound) or ProcessPoolTaskRunner
(CPU bound).
- Docs: Task Runners, Futures API
2) Concurrent subflows as separate flow runs via deployments (distributed, scalable)
- Give each subflow its own deployment and launch them with run_deployment
. They’ll run on your workers concurrently. You can optionally wait and collect results.
Example:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
def orchestrator():
# Kick off A as a subflow run
a_run = run_deployment(name="a-flow/a-deployment", as_subflow=True)
wait_for_flow_run(a_run.id)
a_result = a_run.state.result() # If you need A's result
# Kick off B and C concurrently
b_run = run_deployment(name="b-flow/b-deployment", parameters={"a_result": a_result}, as_subflow=True)
c_run = run_deployment(name="c-flow/c-deployment", parameters={"a_result": a_result}, as_subflow=True)
wait_for_flow_run(b_run.id)
wait_for_flow_run(c_run.id)
d_run = run_deployment(
name="d-flow/d-deployment",
parameters={"b_result": b_run.state.result(), "c_result": c_run.state.result()},
as_subflow=True,
)
wait_for_flow_run(d_run.id)
return d_run.state.result()
- Notes:
- This uses your work pool/worker(s) for real parallelism across infrastructure.
- Make sure results you pass between runs are serializable; consider enabling persistence if results are large.
- API references: run_deployment, wait_for_flow_run
Which route fits you best?
- Want simple concurrency within one process? Use wrapper tasks (Option 1).
- Want fully decoupled runs scaling across workers? Use deployments (Option 2).
If you tell me whether you need in-process vs distributed parallelism (and whether each subflow already has a deployment), I can tailor the exact code for your case.Alex Robson
09/04/2025, 8:42 PMMarvin
09/04/2025, 8:42 PMMarvin
09/04/2025, 8:43 PMrun_deployment
) so they run as separate flow runs that workers execute concurrently.
Rules of thumb
- Need simple, in-process parallelism for tasks? Use a task runner on the flow:
- IO-bound → ThreadPoolTaskRunner
- CPU-bound → ProcessPoolTaskRunner
- Need to scale out across machines or isolate workloads? Use deployments + a work pool/worker(s).
- You can combine both: workers for concurrent flow runs; task runners for concurrency inside each run.
Docs
- Task runners: Task Runners
- Workers and work pools (deployments): Work Pools and WorkersAlex Robson
09/04/2025, 8:56 PMMarvin
09/04/2025, 8:56 PMMarvin
09/04/2025, 8:57 PMrun_deployment(name=...)
expects the target flow to have a deployment with that name (format: "<flow-name>/<deployment-name>"). Without a deployment, there’s nothing for workers to pick up.
- Deployments package your flow plus infrastructure settings (work pool, env, image, storage, schedules). Workers poll your work pool and execute scheduled flow runs for those deployments.
When you do NOT need deployments
- If you are orchestrating subflows within a single flow run and are okay staying in-process, you can call subflows like regular functions or wrap them in tasks for concurrency. No deployments required for that pattern.
When you DO need deployments
- If you want to fan out subflows as separate flow runs executed by workers (for isolation or horizontal scaling), each subflow must have its own deployment. Then your orchestrator calls run_deployment(...)
for each.
Creating deployments in Prefect 3.x
- From code:
from prefect import flow
@flow
def a_flow(): ...
# Create and push a deployment for a_flow
a_flow.from_source("<https://github.com/org/repo>", entrypoint="path/to/file.py:a_flow") \
.deploy(name="a-deployment", work_pool_name="my-pool")
- Or use the CLI:
prefect deploy
Then follow the prompts to pick flows and configure the deployment(s).
Docs
- Deployments overview: Deployments in Prefect 3
- Programmatic deploy: Python deployment API
- run_deployment
reference: run_deployment
If you share how you’re storing your code (local, Git, image) and your work pool type, I can show the exact deploy snippet or prefect deploy
steps for your setup.Alex Robson
09/04/2025, 8:58 PMMarvin
09/04/2025, 8:58 PMMarvin
09/04/2025, 8:58 PMfrom prefect import flow, task
from prefect.futures import wait
from prefect.task_runners import ThreadPoolTaskRunner
# Your subflows
@flow
def a_flow(): ...
@flow
def b_flow(a): ...
@flow
def c_flow(a): ...
@flow
def d_flow(b, c): ...
# Thin task wrappers that call the subflows
@task
def run_a():
return a_flow()
@task
def run_b(a):
return b_flow(a)
@task
def run_c(a):
return c_flow(a)
@task
def run_d(b, c):
return d_flow(b, c)
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def orchestrator():
# Kick off A
a_f = run_a.submit()
# B and C depend on A; they’ll run concurrently
b_f = run_b.submit(a_f)
c_f = run_c.submit(a_f)
# D depends on B and C
d_f = run_d.submit(b_f, c_f)
# Block only when you need the result
return d_f.result()
Notes
- Choose ThreadPoolTaskRunner
for IO-bound work, ProcessPoolTaskRunner
for CPU-bound work.
- Don’t call .result()
on upstream futures too early or you’ll serialize execution.
- If you want to process results as they finish, use as_completed
from prefect.futures
.
Docs
- Futures (wait, result, as_completed): Prefect Futures API
- Task runners: Task RunnersAlex Robson
09/05/2025, 12:35 AMMarvin
09/05/2025, 12:35 AMMarvin
09/05/2025, 12:43 AM.submit
. Concurrency is driven by the task runners.
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
import time, random
@task
def do_work(x: int, delay: float | None = None) -> int:
logger = get_run_logger()
delay = delay or random.uniform(0.5, 2.0)
<http://logger.info|logger.info>(f"Task {x}: sleeping {delay:.2f}s")
time.sleep(delay)
result = x * x
<http://logger.info|logger.info>(f"Task {x}: done -> {result}")
return result
# Subflow: runs its tasks concurrently
@flow(name="child-subflow", task_runner=ThreadPoolTaskRunner(max_workers=4))
def child_subflow(name: str, items: list[int]) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"[{name}] launching {len(items)} tasks concurrently")
# Calling tasks returns PrefectFutures; no .submit needed
task_futures = [do_work(i) for i in items]
total = 0
# React to task results as they complete
for fut in as_completed(task_futures):
total += fut.result()
<http://logger.info|logger.info>(f"[{name}] total = {total}")
return total
# Parent flow: launches multiple subflows concurrently
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=8))
def parent_flow() -> int:
logger = get_run_logger()
<http://logger.info|logger.info>("Launching child subflows concurrently")
# Calling subflows returns PrefectFlowRunFutures; still no .submit
subflow_futures = [
child_subflow(name="A", items=list(range(0, 5))),
child_subflow(name="B", items=list(range(5, 10))),
child_subflow(name="C", items=list(range(10, 15))),
]
child_totals = []
for fut in as_completed(subflow_futures):
child_totals.append(fut.result())
grand_total = sum(child_totals)
<http://logger.info|logger.info>(f"Child totals: {child_totals} -> grand total: {grand_total}")
return grand_total
if __name__ == "__main__":
parent_flow()
Notes:
- Replace ThreadPoolTaskRunner
with ProcessPoolTaskRunner
if your work is CPU-bound.
- Concurrency is controlled by the max_workers
in each flow’s task_runner
.
- as_completed
lets you handle results as each task/subflow finishes; wait
is also available if you prefer to block until all are done.
Docs for more details:
- Task runners: Prefect 3 Task Runners
- Concurrency utilities: prefect.futures (source)
- Global concurrency limits (optional): Global concurrency limits
Quick check: are you targeting I/O-bound or CPU-bound work, and do you have any global concurrency limits configured in your workspace? I can tailor the runners and limits accordingly.Alex Robson
09/05/2025, 2:23 AMMarvin
09/05/2025, 2:23 AMMarvin
09/05/2025, 2:23 AM.submit
needed). The parent flow schedules multiple subflows and processes results as they complete.
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
import time
# A simple subflow that sleeps, then returns a value
@flow(name="child-subflow")
def child_subflow(name: str, delay: float) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"[{name}] starting (sleep {delay}s)")
time.sleep(delay)
msg = f"[{name}] done"
<http://logger.info|logger.info>(msg)
return msg
# Parent flow runs subflows concurrently by using a task runner
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=3))
def parent_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Launching child subflows concurrently")
# Calling subflows returns PrefectFlowRunFutures; no .submit required
subflow_futures = [
child_subflow("A", 2.0),
child_subflow("B", 1.0),
child_subflow("C", 3.0),
]
results = []
for fut in as_completed(subflow_futures):
results.append(fut.result())
<http://logger.info|logger.info>(f"Results (completion order): {results}")
return results
if __name__ == "__main__":
parent_flow()
Notes:
- Concurrency is controlled by the parent flow’s task_runner
(ThreadPoolTaskRunner
here). Increase max_workers
to allow more concurrent subflows.
- You can swap to ProcessPoolTaskRunner
if your subflows are CPU-bound.
Docs:
- Task runners: Prefect 3 Task RunnersAlex Robson
09/05/2025, 2:24 AMMarvin
09/05/2025, 2:24 AMMarvin
09/05/2025, 2:25 AM.submit
is used; dependencies are expressed by passing futures as inputs.
from prefect import flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
import time, random
@flow(name="producer-subflow")
def producer(name: str, delay: float | None = None) -> int:
logger = get_run_logger()
d = delay or random.uniform(0.5, 2.0)
<http://logger.info|logger.info>(f"[{name}] starting (sleep {d:.2f}s)")
time.sleep(d)
value = len(name) * 10
<http://logger.info|logger.info>(f"[{name}] done -> {value}")
return value
@flow(name="dependent-subflow")
def dependent(name: str, a: int, b: int) -> int:
logger = get_run_logger()
result = a + b
<http://logger.info|logger.info>(f"[{name}] got a={a}, b={b} -> sum={result}")
return result
@flow(name="parent-flow", task_runner=ThreadPoolTaskRunner(max_workers=4))
def parent_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Launching upstream subflows concurrently")
# Start three subflows concurrently
a_fut = producer("A", 2.0)
b_fut = producer("B", 1.0)
c_fut = producer("C", 3.0)
# Launch a dependent subflow that requires A and B.
# Passing futures as arguments creates dependencies; D will start
# only after A and B complete, but this call returns a future immediately.
d_fut = dependent("D", a=a_fut, b=b_fut)
# You can continue orchestrating; for example, react to completions as they happen
results = {}
for fut in as_completed([c_fut, d_fut]):
val = fut.result()
key = "C" if fut is c_fut else "D"
results[key] = val
<http://logger.info|logger.info>(f"Subflow {key} completed with result={val}")
<http://logger.info|logger.info>(f"Final results: {results}")
return results
if __name__ == "__main__":
parent_flow()
Key points:
- Concurrency is controlled by the parent flow’s task_runner
(ThreadPoolTaskRunner
here). Increase max_workers
to allow more concurrent subflows.
- Dependencies are created by passing upstream futures (a_fut
, b_fut
) as parameters to the dependent subflow (dependent
). Prefect waits for those inputs before starting the dependent run.
- Use as_completed
(or wait
) to coordinate and process results in real time.
Docs:
- Task runners: Prefect 3 Task Runners
- Futures utilities: prefect.futures sourceAlex Robson
09/05/2025, 2:27 AMMarvin
09/05/2025, 2:27 AMMarvin
09/05/2025, 2:30 AM.submit()
method.
How to run flows concurrently instead:
- Inside a flow: just call subflows like regular functions. Each call returns a PrefectFlowRunFuture and will run concurrently when the parent flow uses a task runner (e.g., ThreadPoolTaskRunner). You can coordinate with `as_completed`/`wait`.
- Docs: Task runners, source for futures: prefect.futures
- Outside a flow (e.g., from a script or service): use prefect.runner.submit_to_runner(...)
to submit one or many flow runs concurrently, then optionally wait_for_submitted_runs()
to block until they finish.
- Source: prefect.runner module
Example: submit flows concurrently from a plain Python script
from prefect import flow
from prefect.runner import submit_to_runner, wait_for_submitted_runs
@flow
def my_flow(n: int) -> int:
return n * 2
# Submit multiple flow runs concurrently
runs = submit_to_runner(
my_flow,
parameters=[{"n": i} for i in range(5)], # one run per parameter set
)
# Optionally block until all submitted runs complete
wait_for_submitted_runs()
print("Submitted runs:", runs) # FlowRun objects with IDs/states
If you tell me whether you want to launch flows from inside another flow or from a standalone script/service, I can tailor the example further.Alex Robson
09/05/2025, 1:08 PMMarvin
09/05/2025, 1:08 PMMarvin
09/05/2025, 1:11 PMprefect.deployments.run_deployment(...)
.
Key points:
- It creates a new flow run from an existing deployment and returns a FlowRun object.
- Set as_subflow=True
(default) to show the triggered run as a subflow in the parent’s UI; set False
to keep it independent.
- To run multiple deployments concurrently from a flow, wrap run_deployment
in a Prefect task and use a task runner so calls happen in parallel.
Example: run multiple deployments concurrently from a parent flow
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.deployments import run_deployment
from prefect.futures import as_completed
@task
def trigger_deployment(name: str, params: dict | None = None) -> str:
fr = run_deployment(
name=name,
parameters=params,
as_subflow=True, # show as subflow in UI (optional)
flow_run_name=f"triggered-{name.replace('/', '-')}"
)
# You can inspect fr.state here if needed
return str(fr.id)
@flow(name="orchestrator", task_runner=ThreadPoolTaskRunner(max_workers=5))
def orchestrator():
futures = [
trigger_deployment.submit("flow-a/dev", {"x": 1}),
trigger_deployment.submit("flow-b/dev", {"x": 2}),
trigger_deployment.submit("flow-c/dev", {"x": 3}),
]
run_ids = []
for fut in as_completed(futures):
run_ids.append(fut.result())
return run_ids
if __name__ == "__main__":
orchestrator()
Notes:
- If you call run_deployment
directly in a loop inside a flow, it will run serially. Wrapping it in a @task
and using a task runner (threads/processes) enables concurrency.
- Use work_queue_name
, idempotency_key
, or scheduled_time
on run_deployment
as needed.
References:
- How to trigger deployments: Run deployments
- API (signature): prefect.deployments.run_deployment (source)
- Task runners: Task runnersAlex Robson
09/19/2025, 3:14 PMMarvin
09/19/2025, 3:14 PMMarvin
09/19/2025, 3:22 PMlocation /prefect/ {
proxy_pass <http://prefect-server-ui/;> # serves the UI; server mounted with PREFECT_SERVER_UI_SERVE_BASE=/prefect
proxy_set_header Host $host;
}
location /prefect/api/ {
proxy_pass <http://prefect-server/api/;> # forward to Prefect API root
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
Docs and references:
- Self-hosted Prefect (3.x)
- Settings reference
If you can share what reverse proxy you’re using (Nginx, Traefik, etc.) and how you’re running Prefect (docker-compose, k8s), I can tailor the exact config.Alex Robson
09/19/2025, 3:37 PMMarvin
09/19/2025, 3:37 PMMarvin
09/19/2025, 3:37 PM# UI (static assets and SPA routes)
location /prefect/ {
proxy_pass <http://prefect-server>; # note: NO trailing slash
proxy_set_header Host $host;
}
# API (includes websockets for logs/events)
location /prefect/api/ {
proxy_pass <http://prefect-server/api/;>
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
- If you use a trailing slash like proxy_pass http://prefect-server/; under location /prefect/, Nginx will strip the /prefect prefix and you’ll see exactly what you’re seeing (assets requested at /assets/...).
Verify after restart:
- Load the UI at http://data.example.com/prefect and check that network requests for assets are to /prefect/assets/...
- You can also inspect effective settings:
prefect config view --show-defaults
Docs and examples:
- Self-hosted Prefect (3.x)
- Reverse proxy example (community)
If you share how you’re running Prefect (docker/k8s/bare metal) and which proxy (Nginx/Traefik/etc.), I can tailor the exact env and proxy rules.