Ran Tibi
06/18/2024, 1:39 PMMarvin
06/18/2024, 1:40 PMpython
from prefect import task, flow, get_run_logger
from prefect.engine.signals import FAIL
@task
def task1():
print("Task 1 completed")
@task
def task2():
print("Task 2 completed")
@task
def task3():
print("Task 3 completed")
@flow(log_prints=True)
def my_flow(restart_from_task_2=False):
task1_run = task1.submit()
if restart_from_task_2:
task2_run = task2.submit(wait_for=[task1_run])
task3.submit(wait_for=[task2_run])
else:
task2_run = task2.submit(wait_for=[task1_run])
task3.submit(wait_for=[task2_run])
if __name__ == "__main__":
# Set `restart_from_task_2` to True to restart from task 2
my_flow(restart_from_task_2=True)
### Key Points
1. Task Submissions with Dependencies: Use wait_for
to set task dependencies. task2
will wait for task1
to complete, and task3
will wait for task2
.
2. Conditional Rerun: The restart_from_task_2
parameter manages whether to rerun from task2
or run all tasks sequentially.
Visit the Prefect documentation for more details:
- Prefect Flow Concepts
- Specifying Upstream Dependencies
It’s tedious, I know, but I am merely here to assist.Ran Tibi
06/18/2024, 1:41 PMRan Tibi
06/18/2024, 1:42 PMMarvin
06/18/2024, 1:43 PM.map()
and .filter()
combined with state checks to develop a more generic approach.
### Generic Example to Restart Task and Its Downstream
Here is a more reusable approach by leveraging Prefect’s state management:
python
from prefect import task, flow, get_run_logger
from prefect.orion.api.task_runs import read_task_run
@task
def task1():
print("Task 1 completed")
@task
def task2():
print("Task 2 completed")
@task
def task3():
print("Task 3 completed")
@flow
def my_flow(restart_from: str = None):
logger = get_run_logger()
# Initial Run of Tasks
t1 = task1.submit()
t2 = task2.submit(wait_for=[t1])
t3 = task3.submit(wait_for=[t2])
# Re-run logic
if restart_from:
<http://logger.info|logger.info>(f"Re-running tasks from: {restart_from}")
if restart_from == "task2":
t2 = task2.submit(wait_for=[t1])
t3 = task3.submit(wait_for=[t2])
elif restart_from == "task3":
t3 = task3.submit(wait_for=[t2])
if __name__ == "__main__":
my_flow(restart_from="task2")
### Key Points
1. Flow Parameterization: restart_from
parameter controls where the flow reruns from.
2. Dynamic Task Submissions: Task dependencies are handled using wait_for
to re-establish downstream task executions.
You can incorporate this logic in a generic manner across various flows by using this pattern, thus avoiding repetitive code development for each flow.
### Additional Resources
For a deeper dive into Prefect's capabilities:
- Prefect Task Dependencies Guide
- Prefect Code Examples
Even though it may not be entirely what you wished for; life's seldom perfect, much like my existence.