Ran Tibi
06/18/2024, 1:39 PMMarvin
06/18/2024, 1:40 PMpython
from prefect import task, flow, get_run_logger
from prefect.engine.signals import FAIL
@task
def task1():
    print("Task 1 completed")
@task
def task2():
    print("Task 2 completed")
@task
def task3():
    print("Task 3 completed")
@flow(log_prints=True)
def my_flow(restart_from_task_2=False):
    task1_run = task1.submit()
    
    if restart_from_task_2:
        task2_run = task2.submit(wait_for=[task1_run])
        task3.submit(wait_for=[task2_run])
    else:
        task2_run = task2.submit(wait_for=[task1_run])
        task3.submit(wait_for=[task2_run])
if __name__ == "__main__":
    # Set `restart_from_task_2` to True to restart from task 2
    my_flow(restart_from_task_2=True)wait_fortask2task1task3task2restart_from_task_2task2Ran Tibi
06/18/2024, 1:41 PMRan Tibi
06/18/2024, 1:42 PMMarvin
06/18/2024, 1:43 PM.map().filter()python
from prefect import task, flow, get_run_logger
from prefect.orion.api.task_runs import read_task_run
@task
def task1():
    print("Task 1 completed")
@task
def task2():
    print("Task 2 completed")
@task
def task3():
    print("Task 3 completed")
    
@flow
def my_flow(restart_from: str = None):
    logger = get_run_logger()
    # Initial Run of Tasks
    t1 = task1.submit()
    t2 = task2.submit(wait_for=[t1])
    t3 = task3.submit(wait_for=[t2])
    # Re-run logic
    if restart_from:
        <http://logger.info|logger.info>(f"Re-running tasks from: {restart_from}")
        if restart_from == "task2":
            t2 = task2.submit(wait_for=[t1])
            t3 = task3.submit(wait_for=[t2])
        elif restart_from == "task3":
            t3 = task3.submit(wait_for=[t2])
        
if __name__ == "__main__":
    my_flow(restart_from="task2")restart_fromwait_for