<@ULVA73B9P> when I have many flows that are extre...
# ask-marvin
c
@Marvin when I have many flows that are extremely similar except for one input, what is the best way to consolidate these and not duplicate code?
m
thought for 394.4 seconds
Great question. In Prefect 3.x, the best way to avoid duplicate code when flows only differ by one input is to parameterize a single flow and manage the variations at deployment or runtime. Here are the common, scalable patterns: 1) Single flow with a parameter (most DRY) - Put all shared logic in one flow and pass the differing value as a parameter.
Copy code
from prefect import flow, task
from prefect.logging import get_run_logger

@task
def do_work(dataset: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Processing dataset={dataset}")
    # ... your logic ...

@flow
def etl(dataset: str):
    do_work(dataset)
2) Multiple deployments of the same flow with different default parameters - Keep one code path; create distinct deployments with parameter overrides and their own schedules/work pools. Code-based deployment:
Copy code
from prefect.flows import Flow

# Points to your project source and the flow entrypoint
flow_ref = Flow.from_source(".", entrypoint="flows.py:etl")

flow_ref.deploy(
    name="etl-us",
    work_pool_name="k8s-pool",
    parameters={"dataset": "us"},
    cron="0 * * * *",   # optional schedule
)

flow_ref.deploy(
    name="etl-eu",
    work_pool_name="k8s-pool",
    parameters={"dataset": "eu"},
)
YAML-based deployment (then run
prefect deploy
):
Copy code
name: my-project
deployments:
  - name: etl-us
    entrypoint: flows.py:etl
    work_pool:
      name: k8s-pool
    parameters:
      dataset: "us"
    schedule:
      cron: "0 * * * *"

  - name: etl-eu
    entrypoint: flows.py:etl
    work_pool:
      name: k8s-pool
    parameters:
      dataset: "eu"
Docs: prefect.yaml deployments CLI help verified: run
prefect deploy --help
3) Fan-out to many inputs at runtime (no need for many deployments) - If you need to run the same flow for a list of inputs, you can: a) Refactor the logic into tasks and submit concurrently within a single flow
Copy code
from prefect import flow, task
  from prefect.task_runners import ThreadPoolTaskRunner

  @task
  def work(x: str):
      # process x
      return x

  @flow(task_runner=ThreadPoolTaskRunner())
  def run_for_many(values: list[str]):
      futures = [work.submit(v) for v in values]
      return [f.result() for f in futures]
b) Or, trigger multiple flow runs via a deployment using run_deployment (each as its own run)
Copy code
from prefect import flow
  from prefect.deployments import run_deployment

  @flow
  def fan_out(values: list[str]):
      runs = []
      for v in values:
          runs.append(
              run_deployment(
                  name="etl/etl-us",          # flow_name/deployment_name
                  parameters={"dataset": v},
                  as_subflow=True             # keep lineage to the parent run
              )
          )
      return [r.id for r in runs]
4) Centralize the default via Variables (for environment-specific values) - Useful when the “one input” is configuration (e.g., region, bucket, connection). You can keep a single deployment and override at runtime or via Variables:
Copy code
from prefect import flow
from prefect.variables import Variable

@flow
def etl(dataset: str | None = None):
    dataset = dataset or Variable.get("default_dataset", default="us")
    # ...
Docs: create deployments Docs: global concurrency limits (if you’ll fan-out broadly) Quick clarification to tailor the recommendation: - Do you want distinct schedules/visibility per variant (e.g., “etl-us” and “etl-eu”) or just a single run that processes many inputs? - Is the differing input static (known ahead of time) or dynamic per run? If you share that, I can sketch the exact layout (prefect.yaml or code) for your case.