Roy Wolters
08/22/2025, 3:18 PMRoy Wolters
08/22/2025, 3:19 PMfrom time import sleep
from prefect import flow, get_run_logger, task
from prefect.transactions import transaction
@task
def task_1():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task 1")
sleep(1)
<http://logger.info|logger.info>("Task 1 completed")
@task_1.on_rollback
def task_1_rb(transaction):
logger = get_run_logger()
logger.warning("Rolling back task 1")
sleep(1)
<http://logger.info|logger.info>("Task 1 rollback completed")
def task_fail():
logger = get_run_logger()
<http://logger.info|logger.info>("Executing task that will fail")
sleep(1)
logger.error("Task failed")
raise RuntimeError("This task is expected to fail.")
@flow
def sub_pipeline():
task_1()
@flow
def pipeline_original():
sub_pipeline()
task_fail()
@flow
def pipeline_problem():
with transaction():
sub_pipeline()
task_fail()
if __name__ == "__main__":
print("\n=== ORIGINAL: Without transactions ===")
try:
pipeline_original()
except Exception:
pass
print("\n=== PROBLEM: Subflow is not displayed correctly ===")
try:
pipeline_problem()
except Exception:
pass