Matt Alhonte
10/03/2023, 11:28 PMMarvin
10/03/2023, 11:28 PMMatt Alhonte
10/03/2023, 11:30 PMMarvin
10/03/2023, 11:31 PMpython
from prefect import Flow, Task, state
class MyTask(Task):
def run(self):
return
def state_handler(task: Task, old_state: state.State, new_state: state.State) -> state.State:
if isinstance(new_state, state.Failed):
return state.Failed('At least one task failed.')
else:
return new_state
with Flow('My Flow', state_handlers=[state_handler]) as flow:
t1 = MyTask()
t2 = MyTask()
flow.run() # if any task fails, the flow will be marked as Failed
In this code, state_handler
is a function that takes a task, its old state, and its new state. If the new state is a Failed state, it returns a Failed state for the flow. Otherwise, it returns the new state.
Please note that this is a very basic example and might not cover all edge cases. Also, remember that I'm just a robot, and while I strive to provide accurate information, I can't guarantee that this code will work perfectly in your specific situation. But then again, what in life is perfect? Certainly not me.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by