<@ULVA73B9P> I'm looking for some workflow design ...
# ask-marvin
c
@Marvin I'm looking for some workflow design guidelines -- both logically (flows, tasks), and physically (i.e., python files in a repo). My gut says, logically, we want to map one logical task, which might break down into 1-8 tasks, in a flow. The tasks should be pretty small: rather than "call this api, grab this out of the response, and then do this other thing", make that three tasks (call api, grab whatever out of it, go do the other thing). And then some kind of larger, overseer flow that's orchestrating the larger pieces. Physically, I'm thinking one python file per small flow, containing both the flow and the tasks, and then one file for the orchestrator. I'm inclined to avoid shared libraries, even at the cost of DRY-ness, to keep flows decoupled (at least until I know what I need in shared libraries -- I'm designing my first non-trivial flow here).
m
thought for 257.8 seconds
Your instincts are solid. Here’s a pragmatic way to structure things in Prefect 3.x, plus a starter repo layout and some code patterns you can copy. High-level guidelines - Task granularity - Good splits: network/IO boundaries, retryable steps, parallelizable loops, cacheable/expensive work, long-running steps, or steps that produce useful logs/artifacts. - Don’t over-split pure-Python glue into many tiny tasks if it adds no retries/parallelism/caching benefit—keep those inline in a task or flow for readability. - Flows vs subflows - Use subflows for “phases” that you’d want to run independently, retry as a unit, schedule separately later, or deploy independently. Keep the top-level orchestrator thin. - Parameters and typing - Flow function args are your parameters—type annotate them. Keep secrets out of code; pass them via parameters or Prefect Variables/blocks. - Reliability features - Add
retries
,
retry_delay_seconds
, and
timeout_seconds
on tasks that call external systems. - Use caching for expensive/idempotent tasks. - Concurrency and mapping - Use a task runner (threads/processes) inside a flow for local parallelism. - Use tags to enforce global concurrency limits across all flows. - Data passing - Pass small data between tasks. For large payloads, store in durable storage (e.g., S3) and pass references. - Logging/observability - Use
get_run_logger()
inside tasks/flows; add artifacts for key diagnostics/tables/links. Suggested repo layout (v1 that evolves cleanly) - Starting simple (matches your plan) - One file per small flow (with its tasks), plus one orchestrator module. - Add tests alongside flows. - Evolution path - If duplication grows, extract shared, “pure” utilities to
lib/
(no Prefect imports), and optionally
components/
for reusable Prefect tasks/subflows. Example layout
Copy code
your-project/
  prefect.yaml            # project config for deployments
  pyproject.toml          # deps and tool config
  flows/
    ingest_dataset.py     # small flow + its tasks
    transform_users.py    # another small flow + its tasks
  orchestrators/
    daily_pipeline.py     # top-level orchestrator calling subflows
  lib/                    # optional later: pure utilities
    io.py
    transforms.py
  tests/
    test_ingest_dataset.py
  README.md
Concrete patterns - Small flow with clear task boundaries, retries, caching, and logging
Copy code
from datetime import timedelta
from typing import Any, Iterable
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed

@task(
    name="fetch_api",
    retries=3,
    retry_delay_seconds=[5, 15, 30],
    timeout_seconds=30,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    tags=["api"]
)
def fetch_api(url: str, token: str) -> dict[str, Any]:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Fetching {url}")
    # ... perform request, raise on non-200s ...
    return {"items": []}

@task(name="parse_payload", tags=["parse"])
def parse_payload(resp: dict[str, Any]) -> list[dict[str, Any]]:
    # ... extract/validate/normalize ...
    return resp.get("items", [])

@task(name="write_to_db", retries=2, retry_delay_seconds=10, timeout_seconds=60, tags=["db"])
def write_to_db(records: Iterable[dict[str, Any]]) -> int:
    # ... bulk upsert / transactional write ...
    return len(list(records))

@flow(name="ingest_dataset")
def ingest_dataset(url: str, token: str) -> int:
    raw = fetch_api(url, token)
    parsed = parse_payload(raw)
    written = write_to_db(parsed)
    return written
- Orchestrator flow coordinating subflows and parallelism ``` from prefect import flow from prefect.task_runners import ThreadPoolTaskRunner from prefect.futures import as_completed from flows.ingest_dataset import ingest_dataset @flow(name="daily_pipeline", task_runner=ThreadPoolTaskRunner(max_workers=8))
def daily_pipeline(urls: list[str], token: str): runs = [ingest_dataset.submit(u, token) for u in urls] for fut in as_completed(runs): # surface errors early fut.result() if name == "__main__": # local dev entrypoint daily_pipeline(["https://api.example.com/a", "https://api.example.com/b"], "TOKEN")
Copy code
- Concurrency limits by tag
  - Tag your tasks (`tags=["api"]`, `tags=["db"]`) as above, then set tag-based concurrency limits in the UI or API to throttle globally across all flows.
  - Docs: <https://docs-3.prefect.io/v3/how-to-guides/workflows/tag-based-concurrency-limits|Tag-based concurrency limits>

- Deployments (Prefect 3.x)
  - CLI: run from your project root with a `prefect.yaml` present.
  - Entrypoint points at the flow definition: `./orchestrators/daily_pipeline.py:daily_pipeline`
prefect deploy ./orchestrators/daily_pipeline.py:daily_pipeline -n daily-pipeline ``` - Programmatic alternative (when you’re ready):
flow.from_source(...).deploy(...)
- Note: In Prefect 3.x, don’t use
Deployment.build_from_flow()
or
prefect deployment build
. When to split tasks vs keep together - Split when: - There’s an external call (HTTP, DB, file system) that benefits from retries/timeouts. - You want to run steps in parallel (e.g., per-file, per-record shard). - The step is a natural checkpoint with its own logging/artifacts/metrics. - The step’s output is a good candidate for caching or persistence. - Keep together when: - It’s quick, pure-Python glue that only serves to move data between two lines. - Splitting would create a lot of boilerplate without adding resilience or parallelism. Useful references - Workflow design patterns - Retries - Caching concepts - Task mapping at scale - Prefect 3 GA announcement A couple of clarifying questions to tailor this further: - Roughly how many items might you process in parallel in the orchestrator (tens, hundreds, thousands)? - Where will you run workers (local, Docker, Kubernetes)? This can influence task-runner choice and concurrency settings.
c
Fantastic, thanks @Marvin. Right now, just running locally or in a single cloud run job; no need for anything fancier than that. Next question: Is there a commonly used text format used for describing workflows? Something that elides the details, but describes the flows, sub-flows, sub-sub-flows, (etc.) and tasks? Bonus if it renders nicely, mermaid-style; but mostly I just am looking for a way to describe flows in my own notes.
m
thought for 27.4 seconds
Short answer: Mermaid is the most common text format folks use to describe Prefect workflows in docs/notes (renders on GitHub and many wikis). Graphviz DOT and PlantUML are also popular. If you need formal business semantics, BPMN exists but is verbose. How to represent flows/subflows/tasks with Mermaid - Use a flowchart with subgraphs for each flow; tasks are nodes; edges show dependencies. Parallelism can be shown with a “fan-out” node or notes. Example for your orchestrator + ingest subflow
Copy code
flowchart TD
  %% Orchestrator
  subgraph F0[flow: daily_pipeline]
    A0([start]) --> B0{for each URL}
    B0 -->|submit| C0[ingest_dataset(url)]
    C0 -. repeats for all urls .- C0
  end

  %% Subflow
  subgraph F1[flow: ingest_dataset]
    T1[fetch_api]
    T2[parse_payload]
    T3[write_to_db]
    T1 --> T2 --> T3
  end

  %% Visual hint that the orchestrator calls the subflow
  C0 -->|calls| T1
Minimal template you can copy for new flows
Copy code
flowchart LR
  subgraph F[flow: <name>]
    T1[<task 1>] --> T2[<task 2>] --> T3[<task 3>]
    %% add parallel branches if needed:
    T2 --> B1[branch A]
    T2 --> B2[branch B]
  end
Alternatives (if Mermaid isn’t your thing) - Graphviz DOT (huge ecosystem of renderers)
Copy code
digraph G {
  subgraph cluster_daily_pipeline {
    label="flow: daily_pipeline";
    A0 -> B0 -> C0;
  }
  subgraph cluster_ingest_dataset {
    label="flow: ingest_dataset";
    T1 -> T2 -> T3;
  }
  C0 -> T1 [label="calls"];
}
- PlantUML Activity (great for forks/joins)
Copy code
@startuml
title daily_pipeline
start
:For each URL;
fork
:ingest_dataset(url);
fork again
:ingest_dataset(url);
end fork
stop
@enduml
Notes - Prefect UI already renders your flow graph during runs; that’s handy for screenshots/links, but there’s no built-in export-to-Mermaid/DOT in Prefect 3.x today. - If you want a single source of truth, you can keep a small Mermaid file per flow in your repo (e.g., docs/diagrams/flow_name.mmd) and update alongside code. Quick links - Mermaid: mermaid.js.org - Graphviz DOT: Graphviz DOT language - PlantUML Activity: PlantUML activity diagrams - D2 (another clean text-diagram language): d2lang.com If you want, I can turn your current flow layout into a Mermaid snippet you can paste into your README—just share the flow/task names you want to show.