<@ULVA73B9P> give me an example of seriesn of task...
# ask-marvin
r
@Marvin give me an example of seriesn of tasks in a flow which are part of a transaction and each task saves a file output to s3 if any failure occurs the rollback should then delete any file saved in s3
m
thought for 308.4 seconds
Great question. S3 isn’t transactional, so the typical pattern is “saga-style” compensation: keep track of what you’ve written and, on failure, delete those keys. Below is a Prefect 3.x example with a series of tasks that each write a file to S3. If any step fails, a rollback task deletes any files that were already written. - Assumes you’ve created an S3Bucket block in Prefect (UI or code) named
my-s3-bucket-block
with your bucket and credentials. - Uses a per-run prefix so you can easily find or clean up all outputs from a single run.
Copy code
from typing import List
from prefect import flow, task, get_run_logger
from prefect.runtime import flow_run
from prefect_aws.s3 import S3Bucket


@task
def write_file_to_s3(s3_block_name: str, key: str, content: bytes) -> str:
    s3 = S3Bucket.load(s3_block_name)
    s3.write_path(path=key, content=content)
    return key


@task(retries=3, retry_delay_seconds=5)
def rollback_delete_keys(s3_block_name: str, keys: List[str]) -> None:
    logger = get_run_logger()
    s3 = S3Bucket.load(s3_block_name)

    for key in keys:
        try:
            s3.delete_path(key)
            <http://logger.info|logger.info>(f"Deleted s3://{s3.bucket_name}/{key}")
        except Exception as exc:
            # Log and continue so we attempt to delete all keys
            logger.warning(f"Failed to delete s3://{s3.bucket_name}/{key}: {exc}")


@flow
def transactional_s3_pipeline(
    s3_block_name: str = "my-s3-bucket-block",
    run_prefix: str | None = None,
) -> List[str]:
    """
    Run three steps that each write to S3. On any failure, delete all prior outputs.
    """
    logger = get_run_logger()

    # Use a unique per-run prefix in S3 for easy grouping/cleanup
    if not run_prefix:
        run_prefix = f"runs/{flow_run.id}"

    uploaded_keys: List[str] = []

    try:
        # Step 1
        k1 = write_file_to_s3.submit(
            s3_block_name,
            f"{run_prefix}/step1.txt",
            b"step 1 data",
        ).result()
        uploaded_keys.append(k1)

        # Step 2
        k2 = write_file_to_s3.submit(
            s3_block_name,
            f"{run_prefix}/step2.txt",
            b"step 2 data",
        ).result()
        uploaded_keys.append(k2)

        # Step 3 (example; replace with your actual task — this could fail)
        k3 = write_file_to_s3.submit(
            s3_block_name,
            f"{run_prefix}/step3.txt",
            b"step 3 data",
        ).result()
        uploaded_keys.append(k3)

        <http://logger.info|logger.info>("All steps completed successfully.")
        return uploaded_keys

    except Exception as exc:
        logger.error(f"Failure detected: {exc}. Rolling back {len(uploaded_keys)} files...")
        # Fire-and-forget rollback as a task so it still runs even though we fail the flow
        rollback_delete_keys.submit(s3_block_name, uploaded_keys)
        raise
How to use: - Create an S3Bucket block named
my-s3-bucket-block
in the Prefect UI with: - Bucket name (e.g.,
my-bucket
) - AWS credentials (via a linked AwsCredentials block or environment credentials) - Deploy/run the flow. If any step fails, you’ll see the rollback task delete any previously uploaded objects. Notes: - This is a compensation pattern; S3 has no atomic multi-object transactions. - Using a unique
run_prefix
per flow run makes it easy to inspect or clean a run’s artifacts in one place. You could roll back by prefix if you prefer, but tracking explicit keys ensures precise cleanup. Helpful docs: - Blocks overview: Prefect 3 Blocks - Storing secrets/credentials: Store secrets in Prefect If you’d prefer using the functional API (
prefect_aws.s3.upload_to_bucket
) instead of the S3Bucket block, I can share that variant too.
r
@Marvin So, the transactions by default work with only local files ?
m
thought for 35.5 seconds
Short answer: no. Prefect (3.x) doesn’t provide built‑in “transactions” for any storage system—local files or S3. - Prefect orchestrates tasks; transactional behavior must be implemented inside your tasks using the target system’s mechanisms. - Local files: you can approximate atomicity for a single file with temp files + atomic rename, and implement compensation (delete files you created) if a later step fails. There’s still no multi-file transaction. - S3: there are no cross-object transactions; use the compensation pattern I showed (track uploaded keys and delete them on failure). - Databases: use the database’s native transactions within a task (begin/commit/rollback). If you tell me which backend you’re targeting (pure local FS, S3, database, etc.), I can tailor the pattern for that specifically.