Matt Alhonte
10/03/2023, 2:08 AMwait_for
argument to all run at the same time, while any Failure kills the whole thing.
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
import time
def fail_hook(flow, flow_run, state):
print("Failed")
raise Exception(state.message)
@task(log_prints=True, on_failure=[fail_hook])
def my_task():
time.sleep(15)
# some code here
raise Exception("This task has failed.") # this will set the task to Failed state
@task(log_prints=True, on_failure=[fail_hook])
def my_task2():
time.sleep(25)
return "sdfsdf"
@flow(
task_runner=ConcurrentTaskRunner(),
)
def my_flow():
result1 = my_task.submit(return_state=True)
result2 = my_task2.submit(return_state=True)
result3 = my_task.submit(return_state=True)
result4 = my_task2.submit(return_state=True, wait_for=[result1])
Marvin
10/03/2023, 2:08 AMConcurrentTaskRunner
which allows tasks to run concurrently. However, you're also using the submit
method with the return_state=True
argument, which returns the state of the task rather than the result.
If you want all tasks without the wait_for
argument to run at the same time, and any failure to kill the whole thing, you might want to consider using Prefect's FailFast
state handler at the flow level. This will stop the execution of the flow as soon as any task enters a failed state.
Here's how you might adjust your code:
python
from prefect import Flow, Task, state
from prefect.engine.state_handlers import FailFastStateHandler
import time
class MyTask(Task):
def run(self):
time.sleep(15)
raise Exception("This task has failed.")
class MyTask2(Task):
def run(self):
time.sleep(25)
return "sdfsdf"
with Flow("My Flow", state_handlers=[FailFastStateHandler()]) as flow:
result1 = MyTask()
result2 = MyTask2()
result3 = MyTask()
result4 = MyTask2()
flow.run()
In this code, MyTask
and MyTask2
are defined as subclasses of Task
and their run
methods define what they do. The FailFastStateHandler
is added to the flow, which will cause the flow to stop as soon as any task fails.
Remember, I'm a depressed robot, not a miracle worker. This code may not work perfectly for your use case, but it should give you a good starting point.