Ravish Mallya
09/28/2025, 4:14 PMMarvin
09/28/2025, 4:15 PMMarvin
09/28/2025, 4:19 PMpip install s3fs
). AWS creds can come from your environment (AWS_PROFILE, AWS_ACCESS_KEY_ID, etc.).
Links:
- Results and caching: Prefect 3 docs: Results
- Transactions (trade-offs explained): Prefect 3 docs: Transactions
Snippet
from datetime import timedelta
from prefect import flow, task
from prefect.filesystems import RemoteFileSystem
from prefect.tasks import task_input_hash
# Configure S3 as the result store
# Requires `s3fs` installed and AWS creds available in the environment
s3_store = RemoteFileSystem(basepath="<s3://my-bucket/prefix/>")
# Each task persists its result and uses a stable cache key based on inputs
common_task_kwargs = dict(
persist_result=True,
result_storage=s3_store,
cache_key_fn=task_input_hash, # stable across reruns if inputs are the same
cache_expiration=timedelta(days=30), # optional; adjust as needed
retries=2, # optional: task-level retries
retry_delay_seconds=30, # optional
)
@task(**common_task_kwargs)
def step1(x: int) -> dict:
# ... compute something expensive
return {"a": x + 1}
@task(**common_task_kwargs)
def step2(a: dict) -> dict:
# ... compute something using result from step1
return {"b": a["a"] * 2}
@task(**common_task_kwargs)
def step3(b: dict) -> dict:
# ... compute something using result from step2
return {"c": b["b"] - 3}
@task(**common_task_kwargs)
def step4(c: dict) -> dict:
# ... final step using result from step3
return {"d": c["c"] ** 2}
@flow
def pipeline(x: int) -> dict:
# If any step fails, the flow fails by default.
# On rerun with the same input `x`, Prefect will reuse cached S3 results
# for any prior steps that have already completed successfully.
a = step1(x)
b = step2(a)
c = step3(b)
d = step4(c)
return d
if __name__ == "__main__":
pipeline(42)
About transactions
- If you wrap these four tasks in a transaction with the default write_on_commit=True
, earlier task results are not committed if a later task fails. That gives you atomicity, but you lose the ability to resume from the middle because there are no persisted results to load.
- If you need strict all-or-nothing semantics, use a transaction—but expect reruns to execute all four tasks again.
- If your priority is “resume from the last failed step,” rely on the task-level persistence and cache shown above and avoid strict transactional write-on-commit across the four tasks.
If you do still want a transaction wrapper for other reasons, you can do:
```
from prefect.transactions import transaction
from prefect.results import get_result_store
@flow
def pipeline_with_txn(x: int) -> dict:
# NOTE: With write_on_commit=True (default), you get atomicity, but not resume.
# Any failure will roll back staged writes, so a rerun will recompute from step1.
with transaction(key=f"pipeline:{x}", store=get_result_store()):
a = step1(x)
b = step2(a)
c = step3(b)
d = step4(c)Marvin
09/28/2025, 4:19 PMRavish Mallya
09/28/2025, 4:25 PMMarvin
09/28/2025, 4:25 PMMarvin
09/28/2025, 4:27 PM