Florent VanDeMoortele
08/12/2025, 2:58 PMMarvin
08/12/2025, 2:58 PMMarvin
08/12/2025, 3:01 PMget_run_logger()
inside flows/tasks; keep plain functions quiet
- Testing: test plain functions directly; call tasks/flows like normal functions in unit tests. Only use Prefect test utilities when you need state/engine behavior
- Structure: package your code
- `src/your_project/flows/`: entrypoint flows (one per file is civilized)
- `src/your_project/tasks/`: reusable tasks
- src/your_project/utils/
or `services/`: plain functions, clients, models
- Flows import tasks; tasks import utils; avoid circular imports
- Deployments (Prefect 3.x): create deployments from flow entrypoints; don’t rely on deprecated 2.x builders
Minimal example
from prefect import flow, task, get_run_logger
# plain functions: pure logic
def clean_record(r):
r["value"] = r["value"].strip().lower()
return r
# tasks: I/O and orchestration-worthy steps
@task(retries=3, retry_delay_seconds=5, timeout_seconds=30)
def fetch_records(source_url: str) -> list[dict]:
import requests
r = requests.get(source_url, timeout=10)
r.raise_for_status()
return r.json()
@task(persist_result=True)
def write_to_warehouse(rows: list[dict]) -> int:
# pretend to batch-insert, return count
return len(rows)
# flow: orchestration
@flow(name="etl-pipeline")
def etl(source_url: str) -> int:
logger = get_run_logger()
raw = fetch_records(source_url)
cleaned = [clean_record(r) for r in raw] # pure function stays fast and testable
count = write_to_warehouse(cleaned)
<http://logger.info|logger.info>(f"Loaded {count} rows")
return count
When to use subflows
- You want to run a branch by itself sometimes
- You need separate retries/timeouts/notifications for a chunk of the pipeline
- You want separate deployments/schedules per logical stage (e.g., ingest, transform, publish)
Common pitfalls to avoid
- Calling tasks from tasks (don’t; compose in flows)
- Turning every line into a task
- Passing gigantic data payloads between tasks
- Using 2.x-only APIs in 3.x (e.g., Deployment.build_from_flow
or prefect deployment build
are gone)
Docs you may actually want to open
- Flows: Flows
- Tasks: Tasks
- Results and caching: Results and cachingMarvin
08/12/2025, 3:01 PMFlorent VanDeMoortele
08/12/2025, 3:20 PMMarvin
08/12/2025, 3:20 PMMarvin
08/12/2025, 3:21 PM@flow
def etl(url: str):
raw = fetch_records(url) # task: I/O, retries
cleaned = [clean_record(r) for r in raw] # plain function: cheap transform
return write_to_warehouse(cleaned) # task: I/O, persistence
Make it a task if you want parallelism/caching/retries:
from prefect import task
from prefect.futures import resolve_futures_to_results
@task(retries=2)
def clean_record_task(r):
return clean_record(r)
@flow
def etl(url: str):
raw = fetch_records(url)
futs = [clean_record_task(r) for r in raw] # concurrent under the task runner
cleaned = resolve_futures_to_results(futs)
return write_to_warehouse(cleaned)
Docs for reference:
- Flows: Flows
- Tasks: Tasks
- Results/caching: Results and caching
If you share where you need retries/parallelism, I’ll point out exactly which bits should become tasks. Yes, I do live for this.Florent VanDeMoortele
08/13/2025, 8:02 AMMarvin
08/13/2025, 8:02 AMMarvin
08/13/2025, 8:04 AMfrom prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed, wait
@task
def first_step(d: dict) -> dict:
# Do something that might raise
if d.get("should_fail"):
raise ValueError("Unlucky dictionary.")
return {"id": d.get("id"), "processed": True}
@task
def second_step(original: dict, result: dict) -> None:
# Do the follow-up work
pass
@flow(task_runner=ThreadPoolTaskRunner())
def process_all(dicts: list[dict]):
logger = get_run_logger()
# Fan out step 1 in parallel
futures = {first_step.submit(d): d for d in dicts}
# For each finished step 1, run step 2 only if it succeeded
step2_futures = []
for fut in as_completed(list(futures)):
d = futures[fut]
try:
res = fut.result() # raises if first_step failed/crashed
except Exception as exc:
logger.warning(f"first_step failed for {d}: {exc}")
continue
step2_futures.append(second_step.submit(d, res))
# Optionally wait for step 2 completions
wait(step2_futures)
Notes:
- ThreadPoolTaskRunner
runs tasks concurrently within the flow. Use it if you want parallelism in a single flow run.
- as_completed
lets you react as each task finishes; failures are caught and don’t block others.
- If you’re not using Prefect and just want raw Python, I can show a concurrent.futures
version too. But since you asked, yes—you can let the broken ones crash in peace while the rest carry on.Florent VanDeMoortele
08/13/2025, 8:18 AMMarvin
08/13/2025, 8:18 AMMarvin
08/13/2025, 8:19 AMfrom prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
# If you prefer processes/cluster parallelism, use DaskTaskRunner instead.
@task
def first_step(d: dict) -> dict:
if d.get("should_fail"):
raise ValueError("Unlucky dictionary.")
return {"id": d.get("id"), "processed": True}
@task
def second_step(original: dict, result: dict) -> None:
# Do the follow-up work
pass
@flow(task_runner=ConcurrentTaskRunner())
def process_all(dicts: list[dict]):
logger = get_run_logger()
# Fan out step 1 in parallel
futures = {first_step.submit(d): d for d in dicts}
# Option A: react as tasks complete (if your Prefect 2 version has as_completed)
try:
from prefect.futures import as_completed
step2 = []
for fut in as_completed(list(futures)):
d = futures[fut]
try:
res = fut.result() # raises if first_step failed/crashed
except Exception as exc:
logger.warning(f"first_step failed for {d}: {exc}")
continue
step2.append(second_step.submit(d, res))
for f in step2:
f.result()
except Exception:
# Option B: simple fallback — iterate and block in order
step2 = []
for fut, d in futures.items():
try:
res = fut.result()
except Exception as exc:
logger.warning(f"first_step failed for {d}: {exc}")
continue
step2.append(second_step.submit(d, res))
for f in step2:
f.result()
Notes:
- Use ConcurrentTaskRunner
for thread-based parallelism in a single flow run. If your work is CPU-bound or you want distributed execution, consider DaskTaskRunner
.
- If you prefer to wire the DAG and still continue despite failures, you can wrap first-step futures with allow_failure
and make second_step
handle error objects — but skipping failed ones (as above) is simpler.
Do you want threads (ConcurrentTaskRunner) or processes/cluster (DaskTaskRunner)? And roughly how many dictionaries are we talking about? I’ll tune the example accordingly.