Ravish Mallya
09/27/2025, 7:59 PMMarvin
09/27/2025, 7:59 PMMarvin
09/27/2025, 8:04 PMmy-s3-bucket-block
with your bucket and credentials.
- Uses a per-run prefix so you can easily find or clean up all outputs from a single run.
from typing import List
from prefect import flow, task, get_run_logger
from prefect.runtime import flow_run
from prefect_aws.s3 import S3Bucket
@task
def write_file_to_s3(s3_block_name: str, key: str, content: bytes) -> str:
s3 = S3Bucket.load(s3_block_name)
s3.write_path(path=key, content=content)
return key
@task(retries=3, retry_delay_seconds=5)
def rollback_delete_keys(s3_block_name: str, keys: List[str]) -> None:
logger = get_run_logger()
s3 = S3Bucket.load(s3_block_name)
for key in keys:
try:
s3.delete_path(key)
<http://logger.info|logger.info>(f"Deleted s3://{s3.bucket_name}/{key}")
except Exception as exc:
# Log and continue so we attempt to delete all keys
logger.warning(f"Failed to delete s3://{s3.bucket_name}/{key}: {exc}")
@flow
def transactional_s3_pipeline(
s3_block_name: str = "my-s3-bucket-block",
run_prefix: str | None = None,
) -> List[str]:
"""
Run three steps that each write to S3. On any failure, delete all prior outputs.
"""
logger = get_run_logger()
# Use a unique per-run prefix in S3 for easy grouping/cleanup
if not run_prefix:
run_prefix = f"runs/{flow_run.id}"
uploaded_keys: List[str] = []
try:
# Step 1
k1 = write_file_to_s3.submit(
s3_block_name,
f"{run_prefix}/step1.txt",
b"step 1 data",
).result()
uploaded_keys.append(k1)
# Step 2
k2 = write_file_to_s3.submit(
s3_block_name,
f"{run_prefix}/step2.txt",
b"step 2 data",
).result()
uploaded_keys.append(k2)
# Step 3 (example; replace with your actual task — this could fail)
k3 = write_file_to_s3.submit(
s3_block_name,
f"{run_prefix}/step3.txt",
b"step 3 data",
).result()
uploaded_keys.append(k3)
<http://logger.info|logger.info>("All steps completed successfully.")
return uploaded_keys
except Exception as exc:
logger.error(f"Failure detected: {exc}. Rolling back {len(uploaded_keys)} files...")
# Fire-and-forget rollback as a task so it still runs even though we fail the flow
rollback_delete_keys.submit(s3_block_name, uploaded_keys)
raise
How to use:
- Create an S3Bucket block named my-s3-bucket-block
in the Prefect UI with:
- Bucket name (e.g., my-bucket
)
- AWS credentials (via a linked AwsCredentials block or environment credentials)
- Deploy/run the flow. If any step fails, you’ll see the rollback task delete any previously uploaded objects.
Notes:
- This is a compensation pattern; S3 has no atomic multi-object transactions.
- Using a unique run_prefix
per flow run makes it easy to inspect or clean a run’s artifacts in one place. You could roll back by prefix if you prefer, but tracking explicit keys ensures precise cleanup.
Helpful docs:
- Blocks overview: Prefect 3 Blocks
- Storing secrets/credentials: Store secrets in Prefect
If you’d prefer using the functional API (prefect_aws.s3.upload_to_bucket
) instead of the S3Bucket block, I can share that variant too.Ravish Mallya
09/27/2025, 8:11 PMMarvin
09/27/2025, 8:11 PMMarvin
09/27/2025, 8:12 PM