<@ULVA73B9P> Write functions that can't fail to co...
# ask-marvin
s
@Marvin Write functions that can't fail to complete executing (since when the Worker process dies, the next Worker to pick up the task will get the function's Event History and use the events to re-run the code until it's in the same state)
m
thought for 1079.3 seconds
It sounds like you’re describing Temporal’s deterministic replay model. Prefect 3.x does not replay a function’s event history after a worker crash. Instead, Prefect provides first-class patterns for retries, idempotency, result persistence, caching, and checkpointing so your code can safely resume and avoid doing work twice. If you’re on Prefect 3.x, here’s how to achieve “can’t fail to complete” behavior in practice: Key patterns - Make tasks idempotent - Use idempotency keys and upserts (unique constraints) in your DB or storage. - Check “already done?” before doing side effects (S3/object store, DB rows). - Persist results and use caching -
persist_result=True
plus a
result_storage
so results survive worker restarts. -
cache_key_fn
and
cache_expiration
to skip recomputation for the same inputs. - Use retries with backoff and jitter -
retries
,
retry_delay_seconds
, and
retry_jitter_factor
for robustness. - Add explicit checkpoints around side effects - Write durable checkpoints (e.g., a progress file or DB row) after milestones; on retry/re-run, read the checkpoint and continue from it. - Keep tasks small and idempotent - Break long jobs into chunks; each chunk can be retried safely. Examples 1) Idempotent task with persisted results, caching, and retries
Copy code
from datetime import timedelta
from prefect import flow, task
from prefect.results import ResultStore
from prefect.filesystems import LocalFileSystem

# Persist results to a durable location
result_store = ResultStore(result_storage=LocalFileSystem(basepath="./.prefect/results"))

def stable_cache_key(ctx, params):
    # derive a cache key from inputs (ensure it’s stable and deterministic)
    return f"sum:{params['x']}:{params['y']}"

@task(
    retries=5,
    retry_delay_seconds=[5, 10, 20, 30, 60],  # escalating backoff
    retry_jitter_factor=0.2,
    persist_result=True,
    result_storage=result_store,
    cache_key_fn=stable_cache_key,
    cache_expiration=timedelta(days=1),
    log_prints=True,
)
def add(x: int, y: int) -> int:
    # pure, idempotent computation
    return x + y

@flow
def my_flow():
    total = add(1, 2)
    print("total:", total)

if __name__ == "__main__":
    my_flow()
2) Exactly-once side effect via idempotency key (DB upsert)
Copy code
from prefect import flow, task
import sqlite3

@task(retries=3, retry_delay_seconds=5, log_prints=True)
def upsert_order(order_id: str, payload: dict):
    conn = sqlite3.connect("orders.db")
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS orders (
            order_id TEXT PRIMARY KEY,
            payload  TEXT
        )
    """)
    # Idempotent: same order_id won't insert twice; updates are safe
    cur.execute("""
        INSERT INTO orders(order_id, payload)
        VALUES(?, json(?))
        ON CONFLICT(order_id) DO UPDATE SET payload=excluded.payload
    """, (order_id, str(payload)))
    conn.commit()
    conn.close()

@flow
def write_once_flow():
    upsert_order("job-123", {"amount": 100, "currency": "USD"})
3) Chunked processing with durable checkpoints ``` import json from pathlib import Path from prefect import flow, task CHECKPOINT = Path("./.checkpoints/myjob.json") def load_checkpoint(): if CHECKPOINT.exists(): return json.loads(CHECKPOINT.read_text()) return {"done": []} def save_checkpoint(state): CHECKPOINT.parent.mkdir(parents=True, exist_ok=True) CHECKPOINT.write_text(json.dumps(state)) @task(retries=5, retry_delay_seconds=[5,10,20,40], log_prints=True) def process_items(items: list[str], job_id: str): state = load_checkpoint() done = set(state["done"]) for item in items: if item in done: continue # Side effect (make idempotent at the sink) # e.g., upsert with a unique (job_id, item) key, or check existence first do_side_effect(item, job_id) # checkpoint progress; safe to crash after this, work won’t repeat
done.add(item) save_checkpoint({"done": list(done)}) def do_side_effect(item, job_id): # Example: write a file named by (job_id, item) if it doesn't exist, or DB upsert out = Path(f"./out/{job_id}-{item}.txt") if not out.exists(): out.parent.mkdir(parents=True, exist_ok=True) out.write_text("done") @flow def chunked_flow(): items = [f"row-{i}" for i in range(100)] process_items(items, job_id="job-456") ``` Notes and guidance - Prefect does not deterministically replay function code from an event history after a worker crash. Instead, design tasks so retries are safe and fast, and use durable checkpoints to resume work. - Prefer unique constraints, upserts, and idempotency keys at sinks (DBs, object stores, message queues) to achieve exactly-once effects. - Persist results for expensive pure computations so retries resume from cached outputs. - For cloud/object storage, check existence or ETag before uploading; write to a temporary key and then atomically rename to finalize. Relevant docs - Results and persistence: Prefect 3.x — Results and persistence - Retries: Prefect 3.x — Retries - Why idempotency matters: The importance of idempotent data pipelines If you were specifically asking about Temporal-style deterministic replay, let me know your constraints and stack—happy to map the model you want to Prefect’s primitives or suggest architecture patterns for your use case.