Matt Alhonte
10/03/2023, 2:07 AMwait_for
argument to all run at the same time, while any Failure kills the whole thing.
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time
def fail_hook(flow, flow_run, state):
print("Failed")
raise Exception(state.message)
@task(log_prints=True, on_failure=[fail_hook])
def my_task():
time.sleep(15)
# some code here
raise Exception("This task has failed.") # this will set the task to Failed state
@task(log_prints=True, on_failure=[fail_hook])
def my_task2():
time.sleep(25)
return "sdfsdf"
@flow(
task_runner=ConcurrentTaskRunner(),
)
def my_flow():
result1 = my_task.submit(return_state=True)
result2 = my_task2.submit(return_state=True)
result3 = my_task.submit(return_state=True)
result4 = my_task2.submit(return_state=True, wait_for=[result1])
Jan Malek
10/03/2023, 9:39 AM