<@ULVA73B9P> i have a flow in which there are task...
# ask-marvin
r
@Marvin i have a flow in which there are tasks, each of which will persist a result to s3 which shall be used by next task. all 4 tasks should run inside a transaction. if anyone fails, the flow should fail. when i retry the flow, the flow should continue after last failed task, is this possible with transaction and cache? give me a snippet for it
m
thought for 247.2 seconds
Short answer: you can get “resume after the last failed task” with Prefect’s cached results to S3, but that’s at odds with a strict, all-or-nothing transaction. If you use a transaction that only writes on commit, upstream task results won’t be persisted if a downstream task fails—so there’s nothing to resume from. The recommended pattern is to persist each task’s result to S3 with a stable cache key so a rerun will reuse prior successes and only re-execute the failed/downstream tasks. Below is a minimal pattern that: - Persists each task’s result to S3 - Uses a deterministic cache key so reruns reuse prior results - Fails the flow if any task fails (default behavior) - On rerun (same inputs), continues after the last failed task by loading upstream results from S3 Note: install s3fs in your environment so Prefect can read/write to S3 via fsspec (e.g.,
pip install s3fs
). AWS creds can come from your environment (AWS_PROFILE, AWS_ACCESS_KEY_ID, etc.). Links: - Results and caching: Prefect 3 docs: Results - Transactions (trade-offs explained): Prefect 3 docs: Transactions Snippet
Copy code
from datetime import timedelta

from prefect import flow, task
from prefect.filesystems import RemoteFileSystem
from prefect.tasks import task_input_hash

# Configure S3 as the result store
# Requires `s3fs` installed and AWS creds available in the environment
s3_store = RemoteFileSystem(basepath="<s3://my-bucket/prefix/>")

# Each task persists its result and uses a stable cache key based on inputs
common_task_kwargs = dict(
    persist_result=True,
    result_storage=s3_store,
    cache_key_fn=task_input_hash,        # stable across reruns if inputs are the same
    cache_expiration=timedelta(days=30), # optional; adjust as needed
    retries=2,                           # optional: task-level retries
    retry_delay_seconds=30,              # optional
)

@task(**common_task_kwargs)
def step1(x: int) -> dict:
    # ... compute something expensive
    return {"a": x + 1}

@task(**common_task_kwargs)
def step2(a: dict) -> dict:
    # ... compute something using result from step1
    return {"b": a["a"] * 2}

@task(**common_task_kwargs)
def step3(b: dict) -> dict:
    # ... compute something using result from step2
    return {"c": b["b"] - 3}

@task(**common_task_kwargs)
def step4(c: dict) -> dict:
    # ... final step using result from step3
    return {"d": c["c"] ** 2}

@flow
def pipeline(x: int) -> dict:
    # If any step fails, the flow fails by default.
    # On rerun with the same input `x`, Prefect will reuse cached S3 results
    # for any prior steps that have already completed successfully.
    a = step1(x)
    b = step2(a)
    c = step3(b)
    d = step4(c)
    return d

if __name__ == "__main__":
    pipeline(42)
About transactions - If you wrap these four tasks in a transaction with the default
write_on_commit=True
, earlier task results are not committed if a later task fails. That gives you atomicity, but you lose the ability to resume from the middle because there are no persisted results to load. - If you need strict all-or-nothing semantics, use a transaction—but expect reruns to execute all four tasks again. - If your priority is “resume from the last failed step,” rely on the task-level persistence and cache shown above and avoid strict transactional write-on-commit across the four tasks. If you do still want a transaction wrapper for other reasons, you can do: ``` from prefect.transactions import transaction from prefect.results import get_result_store @flow def pipeline_with_txn(x: int) -> dict: # NOTE: With write_on_commit=True (default), you get atomicity, but not resume. # Any failure will roll back staged writes, so a rerun will recompute from step1. with transaction(key=f"pipeline:{x}", store=get_result_store()): a = step1(x) b = step2(a) c = step3(b) d = step4(c)
return d ``` Quick check: do you require strict all-or-nothing semantics, or is “resume from last failed step” the higher priority? I can tailor the snippet accordingly (e.g., stronger cache keys, versioning, or transaction settings) based on your requirements.
r
@Marvin with write_on_commit=True on a transaction, where does the task results get persisted before the commit happens?
m
thought for 129.1 seconds
Short answer: they aren’t persisted anywhere external yet. With write_on_commit=True, Prefect stages task results in the transaction’s in-memory buffer during the run. Nothing is written to your configured ResultStore (e.g., S3) until the transaction commits. If the transaction rolls back or the process crashes before commit, those staged results are discarded and no external writes occur. Implication: - Inside the same run, downstream tasks can still consume upstream results from memory. - There’s no resumable artifact on S3 until commit completes. If you set write_on_commit=False, results are written as they are produced (no atomicity), so a rollback won’t “unwrite” them. Docs: Prefect 3 docs: Transactions