Chris Fesler
10/05/2025, 11:42 PMMarvin
10/05/2025, 11:42 PMMarvin
10/05/2025, 11:46 PMretries
, retry_delay_seconds
, and timeout_seconds
on tasks that call external systems.
- Use caching for expensive/idempotent tasks.
- Concurrency and mapping
- Use a task runner (threads/processes) inside a flow for local parallelism.
- Use tags to enforce global concurrency limits across all flows.
- Data passing
- Pass small data between tasks. For large payloads, store in durable storage (e.g., S3) and pass references.
- Logging/observability
- Use get_run_logger()
inside tasks/flows; add artifacts for key diagnostics/tables/links.
Suggested repo layout (v1 that evolves cleanly)
- Starting simple (matches your plan)
- One file per small flow (with its tasks), plus one orchestrator module.
- Add tests alongside flows.
- Evolution path
- If duplication grows, extract shared, “pure” utilities to lib/
(no Prefect imports), and optionally components/
for reusable Prefect tasks/subflows.
Example layout
your-project/
prefect.yaml # project config for deployments
pyproject.toml # deps and tool config
flows/
ingest_dataset.py # small flow + its tasks
transform_users.py # another small flow + its tasks
orchestrators/
daily_pipeline.py # top-level orchestrator calling subflows
lib/ # optional later: pure utilities
io.py
transforms.py
tests/
test_ingest_dataset.py
README.md
Concrete patterns
- Small flow with clear task boundaries, retries, caching, and logging
from datetime import timedelta
from typing import Any, Iterable
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
@task(
name="fetch_api",
retries=3,
retry_delay_seconds=[5, 15, 30],
timeout_seconds=30,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
tags=["api"]
)
def fetch_api(url: str, token: str) -> dict[str, Any]:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Fetching {url}")
# ... perform request, raise on non-200s ...
return {"items": []}
@task(name="parse_payload", tags=["parse"])
def parse_payload(resp: dict[str, Any]) -> list[dict[str, Any]]:
# ... extract/validate/normalize ...
return resp.get("items", [])
@task(name="write_to_db", retries=2, retry_delay_seconds=10, timeout_seconds=60, tags=["db"])
def write_to_db(records: Iterable[dict[str, Any]]) -> int:
# ... bulk upsert / transactional write ...
return len(list(records))
@flow(name="ingest_dataset")
def ingest_dataset(url: str, token: str) -> int:
raw = fetch_api(url, token)
parsed = parse_payload(raw)
written = write_to_db(parsed)
return written
- Orchestrator flow coordinating subflows and parallelism
```
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
from flows.ingest_dataset import ingest_dataset
@flow(name="daily_pipeline", task_runner=ThreadPoolTaskRunner(max_workers=8))Marvin
10/05/2025, 11:46 PM- Concurrency limits by tag
- Tag your tasks (`tags=["api"]`, `tags=["db"]`) as above, then set tag-based concurrency limits in the UI or API to throttle globally across all flows.
- Docs: <https://docs-3.prefect.io/v3/how-to-guides/workflows/tag-based-concurrency-limits|Tag-based concurrency limits>
- Deployments (Prefect 3.x)
- CLI: run from your project root with a `prefect.yaml` present.
- Entrypoint points at the flow definition: `./orchestrators/daily_pipeline.py:daily_pipeline`
prefect deploy ./orchestrators/daily_pipeline.py:daily_pipeline -n daily-pipeline
```
- Programmatic alternative (when you’re ready): flow.from_source(...).deploy(...)
- Note: In Prefect 3.x, don’t use Deployment.build_from_flow()
or prefect deployment build
.
When to split tasks vs keep together
- Split when:
- There’s an external call (HTTP, DB, file system) that benefits from retries/timeouts.
- You want to run steps in parallel (e.g., per-file, per-record shard).
- The step is a natural checkpoint with its own logging/artifacts/metrics.
- The step’s output is a good candidate for caching or persistence.
- Keep together when:
- It’s quick, pure-Python glue that only serves to move data between two lines.
- Splitting would create a lot of boilerplate without adding resilience or parallelism.
Useful references
- Workflow design patterns
- Retries
- Caching concepts
- Task mapping at scale
- Prefect 3 GA announcement
A couple of clarifying questions to tailor this further:
- Roughly how many items might you process in parallel in the orchestrator (tens, hundreds, thousands)?
- Where will you run workers (local, Docker, Kubernetes)? This can influence task-runner choice and concurrency settings.Chris Fesler
10/05/2025, 11:52 PMMarvin
10/05/2025, 11:52 PMMarvin
10/05/2025, 11:53 PMflowchart TD
%% Orchestrator
subgraph F0[flow: daily_pipeline]
A0([start]) --> B0{for each URL}
B0 -->|submit| C0[ingest_dataset(url)]
C0 -. repeats for all urls .- C0
end
%% Subflow
subgraph F1[flow: ingest_dataset]
T1[fetch_api]
T2[parse_payload]
T3[write_to_db]
T1 --> T2 --> T3
end
%% Visual hint that the orchestrator calls the subflow
C0 -->|calls| T1
Minimal template you can copy for new flows
flowchart LR
subgraph F[flow: <name>]
T1[<task 1>] --> T2[<task 2>] --> T3[<task 3>]
%% add parallel branches if needed:
T2 --> B1[branch A]
T2 --> B2[branch B]
end
Alternatives (if Mermaid isn’t your thing)
- Graphviz DOT (huge ecosystem of renderers)
digraph G {
subgraph cluster_daily_pipeline {
label="flow: daily_pipeline";
A0 -> B0 -> C0;
}
subgraph cluster_ingest_dataset {
label="flow: ingest_dataset";
T1 -> T2 -> T3;
}
C0 -> T1 [label="calls"];
}
- PlantUML Activity (great for forks/joins)
@startuml
title daily_pipeline
start
:For each URL;
fork
:ingest_dataset(url);
fork again
:ingest_dataset(url);
end fork
stop
@enduml
Notes
- Prefect UI already renders your flow graph during runs; that’s handy for screenshots/links, but there’s no built-in export-to-Mermaid/DOT in Prefect 3.x today.
- If you want a single source of truth, you can keep a small Mermaid file per flow in your repo (e.g., docs/diagrams/flow_name.mmd) and update alongside code.
Quick links
- Mermaid: mermaid.js.org
- Graphviz DOT: Graphviz DOT language
- PlantUML Activity: PlantUML activity diagrams
- D2 (another clean text-diagram language): d2lang.com
If you want, I can turn your current flow layout into a Mermaid snippet you can paste into your README—just share the flow/task names you want to show.