Nicholas Gaffney
10/10/2025, 12:44 AMMarvin
10/10/2025, 12:44 AMMarvin
10/10/2025, 12:47 AMNicholas Gaffney
10/10/2025, 1:14 AMMarvin
10/10/2025, 1:14 AMMarvin
10/10/2025, 1:17 AMprefect.deployments.run_deployment and choose whether to wait for the child runs (subflows) to finish or just fire-and-forget.
Option A — Fan out and wait for all runs to finish (subflows)
- Good when the parent flow should succeed/fail based on all children.
- Uses run_deployment(..., parameters=...). By wrapping in a task and submitting, you can get parallel fan-out.
from prefect import flow, task
from prefect.deployments import run_deployment
@task
def trigger_one(value: str):
# pass your parameter to the deployment
return run_deployment(
name="my-flow/my-deployment",
parameters={"item": value}, # adjust to your deployment's parameter name(s)
# as_subflow=True (default) keeps parent/child lineage
)
@flow
def orchestrator():
values = query_for_values() # your query here -> returns a list
futures = [trigger_one.submit(v) for v in values] # fan-out
results = [f.result() for f in futures] # wait for all
return results
Option B — Fire-and-forget (don’t wait for completion)
- Good when you want the orchestrator to just enqueue runs and exit.
- You can either:
- call run_deployment(..., as_subflow=False) to just create runs and return immediately, or
- use the Prefect client to create runs from the deployment (async), then move on.
from prefect import flow, task
from prefect.deployments import run_deployment
@task
def trigger_one(value: str):
# This will create the flow run and return without waiting
fr = run_deployment(
name="my-flow/my-deployment",
parameters={"item": value},
as_subflow=False
)
return fr.id
@flow
def orchestrator():
values = query_for_values()
ids = [trigger_one.submit(v).result() for v in values]
return ids
Controlling parallelism
- If you expect a large list and want to avoid overwhelming your workers, you can add a concurrency limit around the trigger task using Prefect’s concurrency controls, or simply batch the fan-out list.
- Prefect’s concurrency tools: Concurrency concepts
Helpful references
- Deployments concept: Deployments in Prefect 3
- API (creating runs from a deployment): Create flow run from deployment
A couple of quick clarifiers to tailor this:
- Do you want the parent flow to wait for all child runs to complete, or just submit them and exit?
- Roughly how many values are you expecting per run (so we can recommend a sane concurrency/ batching strategy)?
- Are you routing these runs to a specific work pool or work queue that needs job_variables?Nicholas Gaffney
10/10/2025, 1:26 AMMarvin
10/10/2025, 1:26 AMMarvin
10/10/2025, 1:28 AMfrom datetime import datetime
from prefect import flow, task
from prefect.deployments import run_deployment
# Replace with your deployment path "flow_name/deployment_name"
DEPLOYMENT_NAME = "my-flow/my-deployment"
def query_for_rows():
# Return a list of dicts with both the param(s) and job variables you need
# Example shape:
# [
# {"item": "A", "job_vars": {"env": {"TENANT_ID": "abc"}}},
# {"item": "B", "job_vars": {"env": {"TENANT_ID": "xyz"}}},
# ]
return []
def batched(seq, size):
for i in range(0, len(seq), size):
yield seq[i:i+size]
@task
def trigger_one(item_value, job_vars, queue_name=None, window_id=None):
# Use idempotency_key to protect against duplicate submits if the orchestrator retries
idem = f"{DEPLOYMENT_NAME}:{item_value}:{window_id}" if window_id else f"{DEPLOYMENT_NAME}:{item_value}"
fr = run_deployment(
name=DEPLOYMENT_NAME,
parameters={"item": item_value}, # adjust to your flow’s parameter(s)
job_variables=job_vars, # pass through row-specific job variables
as_subflow=False, # fire-and-forget
work_queue_name=queue_name, # set if you route via a specific queue
flow_run_name=f"process-{item_value}", # helpful for traceability
idempotency_key=idem
)
return fr.id
@flow
def orchestrator(queue_name: str | None = None, batch_size: int = 10):
rows = query_for_rows()
if not rows:
# nothing to do — exit quickly
return []
# Optional: identify the 30-min window to de-duplicate within this interval
window_id = datetime.utcnow().strftime("%Y%m%d%H%M") # or round to 30m if you prefer
created_ids = []
for chunk in batched(rows, batch_size):
futs = [trigger_one.submit(r["item"], r["job_vars"], queue_name, window_id) for r in chunk]
# .result() returns quickly because as_subflow=False only creates runs
created_ids.extend(f.result() for f in futs)
return created_ids
Notes
- job_variables: The exact keys depend on your work pool’s base job template (Managed, Docker, Kubernetes, etc.). Open your work pool in the UI to see the available job variables and their names; pass that dict as job_vars per item. Example shapes often include env, image/config fields, and resource settings, but the keys vary by template.
- Concurrency: With a rare max of ~50, simple batching (e.g., 10 at a time) is a safe default. You can tune batch_size to match your pool capacity.
- Waiting vs. not waiting: The above is fire-and-forget. If you want the parent to wait on children, set as_subflow=True and call f.result() after submission; that will wait for subflow completion.
- Routing: If you use a specific work queue, pass work_queue_name.
Helpful docs
- Deployments: Deployments in Prefect 3
- Job variables overview: Customize job variables
- API (create flow run from deployment): Create flow run from deployment
If you share which work pool type you’re using (Managed, Docker, K8s, etc.), I can sketch the exact job_variables structure to pass per item. Also, do you want the orchestrator to wait for children or just enqueue and exit?