Roy Wolters
08/14/2025, 1:10 PMtransaction
context manager with an on_rollback
hook. I also have a downstream task with retries. What happens now, is that the on_rollback
hook is invoked after the first try of the downstream task fails. What I want is that the on_rollback
hook is only invoked if the last retry of the downstream task fails. How can I do that?Marvin
08/14/2025, 1:10 PMMarvin
08/14/2025, 1:13 PMtransaction
context rolls back as soon as an exception escapes the with
block. If your downstream task’s first attempt raises (even if it will retry later), that exception trips the rollback. So you’re seeing the expected behavior.
You have a few workable patterns to get “rollback only after the last retry fails”:
Option 1 — Keep the exception inside the transaction until retries are exhausted
- Wait for the task’s final state before leaving the transaction block. In sync flows, call `.result()`; in async flows, await
it. That way the exception only escapes after the final attempt.
Sync example:
from prefect import flow, task
from prefect.transactions import transaction
@task(retries=3, retry_delay_seconds=5)
def downstream():
...
@flow
def my_flow():
with transaction(key="my-key", on_rollback=[lambda: print("rollback")]):
# do upstream work here
r = downstream.submit().result() # blocks through all retries
# leaving the block only after final success/failure
Async example:
from prefect import flow, task
from prefect.transactions import transaction
@task(retries=3, retry_delay_seconds=5)
async def downstream():
...
@flow
async def my_flow():
async with transaction(key="my-key", on_rollback=[lambda: print("rollback")]):
await downstream() # awaited through all retries
Option 2 — Disable task retries and implement the retry loop in the flow
- This guarantees the exception only leaves the with
after you decide it’s final.
from time import sleep
from prefect import flow, task
from prefect.transactions import transaction
@task # no built-in retries
def downstream():
...
@flow
def my_flow():
with transaction(key="my-key", on_rollback=[lambda: print("rollback")]):
max_attempts = 3
for i in range(1, max_attempts + 1):
try:
return downstream.submit().result()
except Exception:
if i == max_attempts:
raise
sleep(5) # backoff
Option 3 — Guard your rollback hook so it only runs on the final attempt
- If you really must keep task retries and can’t restructure, make the hook idempotent or no-op until the final attempt. You can check runtime context to infer attempt vs. max attempts (inside a task). If your hook runs in the flow context, persist a “did_rollback” flag keyed by flow/run and gate the hook.
Notes and docs
- Prefect 3’s transactions run commit/rollback based on exceptions exiting the context; they don’t natively “wait on” downstream retries unless you keep execution in the block until those retries are done.
- Docs: Transactions in Prefect 3
- Source: prefect.transactions
If you can share a minimal snippet of how you’re structuring the with transaction(...)
and the downstream task call (sync vs async, whether you call `.result()`/`await`), I can suggest the most surgical change. Otherwise, Option 1 is usually the least painful.Roy Wolters
08/14/2025, 1:18 PMfrom time import sleep
from prefect import task, flow, get_run_logger
from prefect.transactions import transaction
@task
def task_1():
logger = get_run_logger()
logger.info("Executing task 1")
sleep(1)
logger.info("Task 1 completed")
@task_1.on_rollback
def task_1_rb(transaction):
logger = get_run_logger()
logger.warning("Rolling back task 1")
sleep(1)
logger.info("Task 1 rollback completed")
@task(retries=1)
def task_fail():
logger = get_run_logger()
logger.info("Executing task that will fail")
sleep(1)
logger.error("Task failed")
raise RuntimeError("This task is expected to fail.")
@flow
def pipeline():
with transaction():
task_1()
task_fail()
if __name__ == "__main__":
pipeline()
How can I refactor it?Marvin
08/14/2025, 1:20 PMMarvin
08/14/2025, 1:22 PMwith
block. A task’s first failed attempt will do that, even if Prefect plans to retry it.
Refactor to keep the exception inside the transaction until all retries are exhausted. The simplest way in a sync flow is to submit the task and block on its final state:
from time import sleep
from prefect import task, flow, get_run_logger
from prefect.transactions import transaction
@task
def task_1():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task 1")
sleep(1)
<http://logger.info|logger.info>("Task 1 completed")
@task_1.on_rollback
def task_1_rb(txn):
logger = get_run_logger()
logger.warning("Rolling back task 1")
sleep(1)
<http://logger.info|logger.info>("Task 1 rollback completed")
@task(retries=1)
def task_fail():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task that will fail")
sleep(1)
logger.error("Task failed")
raise RuntimeError("This task is expected to fail.")
@flow
def pipeline():
with transaction():
task_1()
# Block until task_fail reaches a terminal state (success or final failure)
task_fail.submit().result()
if __name__ == "__main__":
pipeline()
result()
waits through all retries and only raises on the final failure, so the exception doesn’t escape the with transaction()
until the last attempt fails. Your rollback hook will only run then.
Alternative (more explicit): remove retries
from task_fail
and implement the retry loop in the flow so you fully control when an exception leaves the `with`:
@task
def task_fail():
...
@flow
def pipeline():
with transaction():
task_1()
max_attempts = 2 # matches your retries=1 total attempts
for i in range(1, max_attempts + 1):
try:
return task_fail.submit().result()
except Exception:
if i == max_attempts:
raise
sleep(5) # backoff
Docs, for when you need a light bedtime read:
- Transactions in Prefect 3: docs
- Source: prefect.transactions
If you’re running an async flow, same idea—`await task_fail()` inside the transaction so the exception only bubbles out after the final attempt.Roy Wolters
08/14/2025, 2:06 PMresult()
, the rollback still is invoked after the first try. How can I fix this?Marvin
08/14/2025, 2:06 PMMarvin
08/14/2025, 2:07 PMwith
block. Built‑in task retries don’t change that, so .result()
won’t save you here.
There isn’t a built‑in way to “tie” transaction rollback to the final retry. The reliable fix is to take control of retries at the flow level so no exception leaves the with transaction()
until you decide it’s final.
Refactor like this:
from time import sleep
from prefect import task, flow, get_run_logger
from prefect.transactions import transaction
@task
def task_1():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task 1")
sleep(1)
<http://logger.info|logger.info>("Task 1 completed")
@task_1.on_rollback
def task_1_rb(txn):
logger = get_run_logger()
logger.warning("Rolling back task 1")
sleep(1)
<http://logger.info|logger.info>("Task 1 rollback completed")
# Remove built-in retries here
@task
def task_fail():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task that will fail")
sleep(1)
logger.error("Task failed")
raise RuntimeError("This task is expected to fail.")
@flow
def pipeline():
logger = get_run_logger()
with transaction():
task_1()
# Implement retries in the flow so exceptions don't escape early
max_attempts = 2 # matches your original retries=1 (2 total attempts)
delay_seconds = 5
for attempt in range(1, max_attempts + 1):
try:
task_fail.submit().result() # wait for the attempt
break # success
except Exception as e:
if attempt == max_attempts:
# Now we let the exception escape the block -> triggers rollback
raise
logger.warning(f"task_fail attempt {attempt} failed; retrying in {delay_seconds}s...")
sleep(delay_seconds)
if __name__ == "__main__":
pipeline()
Why this works:
- We catch the first failure(s) inside the transaction, so no rollback yet.
- We only re-raise after the final attempt, so the rollback hook runs exactly once, only after all retries are exhausted.
If you must keep Prefect’s built-in retries for task_fail
, you can’t defer rollback with transactions today—move the unstable task outside the transaction or gate your rollback hook to be a no-op until you decide it’s truly final.
Docs for reference: Transactions in Prefect 3