Syméon del Marmol
09/18/2025, 9:59 PMMarvin
09/18/2025, 9:59 PMMarvin
09/18/2025, 10:04 PM.delay()
/ .result_async()
): Fire-and-forget (or awaitable) single task runs that execute on a pool of task workers. Great for web backends and services that need to fan out lots of lightweight jobs across machines with low overhead.
- Flows + deployments: Full orchestration around a flow run (packaged code, infrastructure, schedules, retries, DAG of tasks). Best for end-to-end pipelines and reproducible/managed execution.
When background tasks shine
- Horizontal fan-out across machines: Each .delay()
creates a task run that any available task worker can pick up — perfect for spreading work across many workers.
- Low overhead / low latency: No need to spin up a full flow run or container per unit of work.
- Web/service integration: You can kick off tasks from your app and optionally await them with await future.result_async()
using asyncio.gather
, just like you’re doing.
- Fine-grained control: Dynamically name runs with .with_options(name=...)
, set per-task retries/timeouts, and get per-task visibility in the UI.
- Simple concurrency: You can fan out by looping and calling .delay()
for each unit of work.
Limitations of background tasks (vs flows)
- No flow-level graph: You get standalone task runs; there’s no DAG, no subflows, no sequential dependencies between them unless you orchestrate that in code.
- Less packaging/isolation: You rely on your task workers’ environment to run the task; deployments give you a declared environment (e.g., Docker/Kubernetes) and source pinning.
- Coarser result management: Great for single units, but flows are better for coordinating many steps, shared state, assets, and lineage across a run.
- Scheduling/CI/CD: Deployments are the unit of scheduling, promotion, and repeatability; background tasks are ad-hoc.
When flows + deployments are better
- End-to-end pipelines with dependencies and retries at multiple levels (task and flow).
- Reproducible environments (e.g., containerized) and source control baked into deployments.
- Schedules, parameters, automations, and clear operational semantics for teams.
- Parallelism within a single flow run’s resources is sufficient (e.g., threads/processes on the same worker).
Execution model differences
- Background tasks:
- Dispatched to task workers; can scale across many machines.
- Each .delay()
returns a `PrefectFuture`; await with await future.result_async()
.
- Task-level options (retries, timeouts, caching) apply.
- Names from .with_options(name="...")
show in the UI to help with observability.
- Cancellable individually from the UI/API.
- Tasks within a flow run:
- Execute within the flow run’s infrastructure (typically one container/pod/VM) using the flow’s task runner (e.g., threads/processes).
- Concurrency is bounded by that one environment — it won’t naturally spread across multiple workers unless you launch multiple flow runs (e.g., sub-deployments).
Common patterns
- Background fan-out from a service (what you’re doing)
@task(retries=3, timeout_seconds=600)
def work(item): ...
# In your async web handler or service:
futures = [
work.with_options(name=f"work-{i}").delay(i)
for i in items
]
results = await asyncio.gather(*(f.result_async() for f in futures))
- Fan-out within a flow run (parallelism on one machine)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def work(x): ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=16))
def pipeline(items):
futures = [work(x) for x in items] # returns Prefect futures
return [f.result() for f in futures]
How to chooseMarvin
09/18/2025, 10:04 PM.with_options(name=...)
+ .delay()
+ asyncio.gather
is ideal for scalable, low-latency fan-out from a backend.
- If you need stronger isolation or packaging guarantees for certain jobs, consider wrapping that part in a dedicated deployment and calling it (you can mix both approaches).
More info
- Background tasks overview: Blog: Background tasks in Prefect
- Background tasks docs (Prefect 3): Docs: Background tasks
If you share your workload profile (task duration, volume, payload sizes), I can help fine-tune worker sizing, concurrency, and whether to mix in deployments for specific steps.Syméon del Marmol
09/18/2025, 10:25 PMMarvin
09/18/2025, 10:25 PMSyméon del Marmol
09/18/2025, 10:27 PMMarvin
09/18/2025, 10:27 PMMarvin
09/18/2025, 10:28 PMfrom prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def do_cpu_bound(x): ...
@flow(task_runner=ProcessPoolTaskRunner(max_workers=8))
def my_flow(items):
futs = [do_cpu_bound(i) for i in items]
return [f.result() for f in futs]
- Background fan-out from an app (your current pattern):
@task(retries=2, timeout_seconds=600)
def work(item): ...
futures = [work.with_options(name=f"work-{i}").delay(i) for i in items]
results = await asyncio.gather(*(f.result_async() for f in futures))
Docs
- Background tasks: Docs: Background tasks
- Task runners (threads vs processes): Prefect 3 docs (see task runners under flows/tasks)
If your underlying concern is CPU-bound work or memory isolation, say the word and I’ll suggest a concrete setup (e.g., ProcessPoolTaskRunner vs more task workers vs mixing deployments for specific steps).Marvin
09/18/2025, 10:30 PMSyméon del Marmol
09/18/2025, 10:31 PMMarvin
09/18/2025, 10:31 PMMarvin
09/18/2025, 10:33 PMtask.with_options(name=...).delay(...)
to fan out work to task workers. You can optionally await fut.result_async()
to gather results.
- Independence: Each background task run is independent of the parent flow run.
- Cancellation does not cascade: cancelling the flow run will not automatically cancel the background task runs you launched.
- Failures do not propagate unless you explicitly await their results (then your flow can fail based on those outcomes).
- Concurrency/isolation: Background tasks run in task worker processes (shared with other background tasks). They do not use the flow’s task runner. Their concurrency is governed by your task worker limits and how many workers you’re running.
- UI lineage: These task runs won’t appear as child tasks of your flow DAG; they are separate task runs. Use names/tags for correlation.
Starting a flow (or deployment) from a background task
- Preferred: Use run_deployment(...)
to start a real flow run with your deployment’s infra, packaging, and retries.
- Tip: If you want the new flow run to be independent, set as_subflow=False
. If you want it to appear nested under a parent flow run, use as_subflow=True
from within a flow; from a background task there is no parent flow, so treat it as independent.
- Alternative: Call a @flow
function directly (e.g., await my_flow(params)
if it’s async). This runs the flow in the current worker process, immediately, without going through a work pool or deployment.
- Pros: low latency; no queueing.
- Cons: no container/infra isolation; concurrency limited to that worker process; you’re tying up the background task worker while the flow runs.
How this differs from “running a flow directly, as a classical function”
- Calling a @flow
function directly:
- Runs immediately in-process with the current Python interpreter (no work pool or deployment infra).
- Creates a flow run in Prefect (if connected to the API), and tasks inside it use the configured task runner (threads/processes) within that one process.
- No scheduling or queueing; parallelism is bounded by the process and chosen task runner.
- Running a deployment:
- Schedules a flow run to be picked up by a work pool/worker that provisions the configured infra (often a fresh container/VM/pod).
- Better isolation, reproducibility, and repeatability; good for long-running or resource-heavy work.
Practical patterns
- From a deployed flow, background fan-out and await:
@task(retries=2, timeout_seconds=600)
def work(item): ...
@flow
async def driver(items):
futs = [work.with_options(name=f"work-{i}").delay(i) for i in items]
results = await asyncio.gather(*(f.result_async() for f in futs))
return results
- From a background task, start a deployment (recommended for isolation):
from prefect.deployments import run_deployment
@task
def orchestrate():
# Fire-and-forget
run_deployment(name="project/flow", parameters={"x": 1}, as_subflow=False)
Caveats to keep in mind when mixing
- Cancellation doesn’t cascade across boundaries (flow → background task, or background task → deployment) unless you wire it yourself.
- Observability: independent runs won’t appear as DAG children; use names/tags to correlate.
- Backpressure: if you await
lots of background tasks inside a flow run, you’re keeping the flow’s infra alive while you wait. That’s fine when you need results; otherwise consider letting them run asynchronously and checking outcomes later.Marvin
09/18/2025, 10:33 PM