Cullen Baker
09/02/2025, 9:53 PMMarvin
09/02/2025, 9:53 PMMarvin
09/02/2025, 9:59 PMfrom prefect import flow, task
from prefect.logging import get_run_logger
@task
def do_work(dataset: str):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Processing dataset={dataset}")
# ... your logic ...
@flow
def etl(dataset: str):
do_work(dataset)
2) Multiple deployments of the same flow with different default parameters
- Keep one code path; create distinct deployments with parameter overrides and their own schedules/work pools.
Code-based deployment:
from prefect.flows import Flow
# Points to your project source and the flow entrypoint
flow_ref = Flow.from_source(".", entrypoint="flows.py:etl")
flow_ref.deploy(
name="etl-us",
work_pool_name="k8s-pool",
parameters={"dataset": "us"},
cron="0 * * * *", # optional schedule
)
flow_ref.deploy(
name="etl-eu",
work_pool_name="k8s-pool",
parameters={"dataset": "eu"},
)
YAML-based deployment (then run prefect deploy
):
name: my-project
deployments:
- name: etl-us
entrypoint: flows.py:etl
work_pool:
name: k8s-pool
parameters:
dataset: "us"
schedule:
cron: "0 * * * *"
- name: etl-eu
entrypoint: flows.py:etl
work_pool:
name: k8s-pool
parameters:
dataset: "eu"
Docs: prefect.yaml deployments
CLI help verified: run prefect deploy --help
3) Fan-out to many inputs at runtime (no need for many deployments)
- If you need to run the same flow for a list of inputs, you can:
a) Refactor the logic into tasks and submit concurrently within a single flow
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def work(x: str):
# process x
return x
@flow(task_runner=ThreadPoolTaskRunner())
def run_for_many(values: list[str]):
futures = [work.submit(v) for v in values]
return [f.result() for f in futures]
b) Or, trigger multiple flow runs via a deployment using run_deployment (each as its own run)
from prefect import flow
from prefect.deployments import run_deployment
@flow
def fan_out(values: list[str]):
runs = []
for v in values:
runs.append(
run_deployment(
name="etl/etl-us", # flow_name/deployment_name
parameters={"dataset": v},
as_subflow=True # keep lineage to the parent run
)
)
return [r.id for r in runs]
4) Centralize the default via Variables (for environment-specific values)
- Useful when the “one input” is configuration (e.g., region, bucket, connection). You can keep a single deployment and override at runtime or via Variables:
from prefect import flow
from prefect.variables import Variable
@flow
def etl(dataset: str | None = None):
dataset = dataset or Variable.get("default_dataset", default="us")
# ...
Docs: create deployments
Docs: global concurrency limits (if you’ll fan-out broadly)
Quick clarification to tailor the recommendation:
- Do you want distinct schedules/visibility per variant (e.g., “etl-us” and “etl-eu”) or just a single run that processes many inputs?
- Is the differing input static (known ahead of time) or dynamic per run?
If you share that, I can sketch the exact layout (prefect.yaml or code) for your case.