<@ULVA73B9P> I'm using the `transaction` context m...
# ask-marvin
r
@Marvin I'm using the
transaction
context manager with an
on_rollback
hook. I also have a downstream task with retries. What happens now, is that the
on_rollback
hook is invoked after the first try of the downstream task fails. What I want is that the
on_rollback
hook is only invoked if the last retry of the downstream task fails. How can I do that?
m
thought for 148.1 seconds
Short answer: today, the
transaction
context rolls back as soon as an exception escapes the
with
block. If your downstream task’s first attempt raises (even if it will retry later), that exception trips the rollback. So you’re seeing the expected behavior. You have a few workable patterns to get “rollback only after the last retry fails”: Option 1 — Keep the exception inside the transaction until retries are exhausted - Wait for the task’s final state before leaving the transaction block. In sync flows, call `.result()`; in async flows,
await
it. That way the exception only escapes after the final attempt. Sync example:
Copy code
from prefect import flow, task
from prefect.transactions import transaction

@task(retries=3, retry_delay_seconds=5)
def downstream():
    ...

@flow
def my_flow():
    with transaction(key="my-key", on_rollback=[lambda: print("rollback")]):
        # do upstream work here
        r = downstream.submit().result()  # blocks through all retries
        # leaving the block only after final success/failure
Async example:
Copy code
from prefect import flow, task
from prefect.transactions import transaction

@task(retries=3, retry_delay_seconds=5)
async def downstream():
    ...

@flow
async def my_flow():
    async with transaction(key="my-key", on_rollback=[lambda: print("rollback")]):
        await downstream()  # awaited through all retries
Option 2 — Disable task retries and implement the retry loop in the flow - This guarantees the exception only leaves the
with
after you decide it’s final.
Copy code
from time import sleep
from prefect import flow, task
from prefect.transactions import transaction

@task  # no built-in retries
def downstream():
    ...

@flow
def my_flow():
    with transaction(key="my-key", on_rollback=[lambda: print("rollback")]):
        max_attempts = 3
        for i in range(1, max_attempts + 1):
            try:
                return downstream.submit().result()
            except Exception:
                if i == max_attempts:
                    raise
                sleep(5)  # backoff
Option 3 — Guard your rollback hook so it only runs on the final attempt - If you really must keep task retries and can’t restructure, make the hook idempotent or no-op until the final attempt. You can check runtime context to infer attempt vs. max attempts (inside a task). If your hook runs in the flow context, persist a “did_rollback” flag keyed by flow/run and gate the hook. Notes and docs - Prefect 3’s transactions run commit/rollback based on exceptions exiting the context; they don’t natively “wait on” downstream retries unless you keep execution in the block until those retries are done. - Docs: Transactions in Prefect 3 - Source: prefect.transactions If you can share a minimal snippet of how you’re structuring the
with transaction(...)
and the downstream task call (sync vs async, whether you call `.result()`/`await`), I can suggest the most surgical change. Otherwise, Option 1 is usually the least painful.
r
@Marvin This is my code:
Copy code
from time import sleep

from prefect import task, flow, get_run_logger
from prefect.transactions import transaction


@task
def task_1():
    logger = get_run_logger()
    logger.info("Executing task 1")
    sleep(1)
    logger.info("Task 1 completed")


@task_1.on_rollback
def task_1_rb(transaction):
    logger = get_run_logger()
    logger.warning("Rolling back task 1")
    sleep(1)
    logger.info("Task 1 rollback completed")


@task(retries=1)
def task_fail():
    logger = get_run_logger()
    logger.info("Executing task that will fail")
    sleep(1)
    logger.error("Task failed")
    raise RuntimeError("This task is expected to fail.")


@flow
def pipeline():
    with transaction():
        task_1()
        task_fail()


if __name__ == "__main__":
    pipeline()
How can I refactor it?
m
thought for 62.6 seconds
You’re seeing the expected behavior: the transaction rolls back as soon as an exception escapes the
with
block. A task’s first failed attempt will do that, even if Prefect plans to retry it. Refactor to keep the exception inside the transaction until all retries are exhausted. The simplest way in a sync flow is to submit the task and block on its final state:
Copy code
from time import sleep
from prefect import task, flow, get_run_logger
from prefect.transactions import transaction

@task
def task_1():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Executing task 1")
    sleep(1)
    <http://logger.info|logger.info>("Task 1 completed")

@task_1.on_rollback
def task_1_rb(txn):
    logger = get_run_logger()
    logger.warning("Rolling back task 1")
    sleep(1)
    <http://logger.info|logger.info>("Task 1 rollback completed")

@task(retries=1)
def task_fail():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Executing task that will fail")
    sleep(1)
    logger.error("Task failed")
    raise RuntimeError("This task is expected to fail.")

@flow
def pipeline():
    with transaction():
        task_1()
        # Block until task_fail reaches a terminal state (success or final failure)
        task_fail.submit().result()

if __name__ == "__main__":
    pipeline()
result()
waits through all retries and only raises on the final failure, so the exception doesn’t escape the
with transaction()
until the last attempt fails. Your rollback hook will only run then. Alternative (more explicit): remove
retries
from
task_fail
and implement the retry loop in the flow so you fully control when an exception leaves the `with`:
Copy code
@task
def task_fail():
    ...

@flow
def pipeline():
    with transaction():
        task_1()
        max_attempts = 2  # matches your retries=1 total attempts
        for i in range(1, max_attempts + 1):
            try:
                return task_fail.submit().result()
            except Exception:
                if i == max_attempts:
                    raise
                sleep(5)  # backoff
Docs, for when you need a light bedtime read: - Transactions in Prefect 3: docs - Source: prefect.transactions If you’re running an async flow, same idea—`await task_fail()` inside the transaction so the exception only bubbles out after the final attempt.
r
@Marvin this doesn't work. When using
result()
, the rollback still is invoked after the first try. How can I fix this?
m
thought for 62.1 seconds
Right, that confirms the current behavior: Prefect’s transaction rolls back on the first exception that escapes the
with
block. Built‑in task retries don’t change that, so
.result()
won’t save you here. There isn’t a built‑in way to “tie” transaction rollback to the final retry. The reliable fix is to take control of retries at the flow level so no exception leaves the
with transaction()
until you decide it’s final. Refactor like this:
Copy code
from time import sleep
from prefect import task, flow, get_run_logger
from prefect.transactions import transaction


@task
def task_1():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Executing task 1")
    sleep(1)
    <http://logger.info|logger.info>("Task 1 completed")


@task_1.on_rollback
def task_1_rb(txn):
    logger = get_run_logger()
    logger.warning("Rolling back task 1")
    sleep(1)
    <http://logger.info|logger.info>("Task 1 rollback completed")


# Remove built-in retries here
@task
def task_fail():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Executing task that will fail")
    sleep(1)
    logger.error("Task failed")
    raise RuntimeError("This task is expected to fail.")


@flow
def pipeline():
    logger = get_run_logger()
    with transaction():
        task_1()

        # Implement retries in the flow so exceptions don't escape early
        max_attempts = 2  # matches your original retries=1 (2 total attempts)
        delay_seconds = 5

        for attempt in range(1, max_attempts + 1):
            try:
                task_fail.submit().result()  # wait for the attempt
                break  # success
            except Exception as e:
                if attempt == max_attempts:
                    # Now we let the exception escape the block -> triggers rollback
                    raise
                logger.warning(f"task_fail attempt {attempt} failed; retrying in {delay_seconds}s...")
                sleep(delay_seconds)


if __name__ == "__main__":
    pipeline()
Why this works: - We catch the first failure(s) inside the transaction, so no rollback yet. - We only re-raise after the final attempt, so the rollback hook runs exactly once, only after all retries are exhausted. If you must keep Prefect’s built-in retries for
task_fail
, you can’t defer rollback with transactions today—move the unstable task outside the transaction or gate your rollback hook to be a no-op until you decide it’s truly final. Docs for reference: Transactions in Prefect 3