Shubham
09/23/2025, 2:40 PMMarvin
09/23/2025, 2:40 PMMarvin
09/23/2025, 2:58 PMpersist_result=True
plus a result_storage
so results survive worker restarts.
- cache_key_fn
and cache_expiration
to skip recomputation for the same inputs.
- Use retries with backoff and jitter
- retries
, retry_delay_seconds
, and retry_jitter_factor
for robustness.
- Add explicit checkpoints around side effects
- Write durable checkpoints (e.g., a progress file or DB row) after milestones; on retry/re-run, read the checkpoint and continue from it.
- Keep tasks small and idempotent
- Break long jobs into chunks; each chunk can be retried safely.
Examples
1) Idempotent task with persisted results, caching, and retries
from datetime import timedelta
from prefect import flow, task
from prefect.results import ResultStore
from prefect.filesystems import LocalFileSystem
# Persist results to a durable location
result_store = ResultStore(result_storage=LocalFileSystem(basepath="./.prefect/results"))
def stable_cache_key(ctx, params):
# derive a cache key from inputs (ensure it’s stable and deterministic)
return f"sum:{params['x']}:{params['y']}"
@task(
retries=5,
retry_delay_seconds=[5, 10, 20, 30, 60], # escalating backoff
retry_jitter_factor=0.2,
persist_result=True,
result_storage=result_store,
cache_key_fn=stable_cache_key,
cache_expiration=timedelta(days=1),
log_prints=True,
)
def add(x: int, y: int) -> int:
# pure, idempotent computation
return x + y
@flow
def my_flow():
total = add(1, 2)
print("total:", total)
if __name__ == "__main__":
my_flow()
2) Exactly-once side effect via idempotency key (DB upsert)
from prefect import flow, task
import sqlite3
@task(retries=3, retry_delay_seconds=5, log_prints=True)
def upsert_order(order_id: str, payload: dict):
conn = sqlite3.connect("orders.db")
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS orders (
order_id TEXT PRIMARY KEY,
payload TEXT
)
""")
# Idempotent: same order_id won't insert twice; updates are safe
cur.execute("""
INSERT INTO orders(order_id, payload)
VALUES(?, json(?))
ON CONFLICT(order_id) DO UPDATE SET payload=excluded.payload
""", (order_id, str(payload)))
conn.commit()
conn.close()
@flow
def write_once_flow():
upsert_order("job-123", {"amount": 100, "currency": "USD"})
3) Chunked processing with durable checkpoints
```
import json
from pathlib import Path
from prefect import flow, task
CHECKPOINT = Path("./.checkpoints/myjob.json")
def load_checkpoint():
if CHECKPOINT.exists():
return json.loads(CHECKPOINT.read_text())
return {"done": []}
def save_checkpoint(state):
CHECKPOINT.parent.mkdir(parents=True, exist_ok=True)
CHECKPOINT.write_text(json.dumps(state))
@task(retries=5, retry_delay_seconds=[5,10,20,40], log_prints=True)
def process_items(items: list[str], job_id: str):
state = load_checkpoint()
done = set(state["done"])
for item in items:
if item in done:
continue
# Side effect (make idempotent at the sink)
# e.g., upsert with a unique (job_id, item) key, or check existence first
do_side_effect(item, job_id)
# checkpoint progress; safe to crash after this, work won’t repeatMarvin
09/23/2025, 2:58 PM