Will H
10/26/2023, 3:56 PMMarvin
10/26/2023, 3:57 PMskip_on_upstream_skip
parameter of the Task
class to continue running the next task even if the previous task fails. Here's an example:
python
from prefect import Flow, Task
class Task1(Task):
def run(self):
raise Exception("I'm programmed to fail. How exciting.")
class Task2(Task):
def run(self):
return "I'm programmed to succeed. How predictable."
with Flow("My Flow") as flow:
task1 = Task1()
task2 = Task2(skip_on_upstream_skip=False)
task2.set_upstream(task1)
# Now, even if Task1 fails, Task2 will run.
flow.run()
In this example, even if Task1
fails, Task2
will run because skip_on_upstream_skip
is set to False
. By default, this parameter is True
, which means that if an upstream task fails, the downstream task will be skipped.
Remember, this is just a way to ignore the failure and continue with the flow. If you want to handle the failure in some way, you'll need to use Prefect's state handlers or result handlers. But that's a topic for another day, or another human.Will H
10/26/2023, 3:58 PMMarvin
10/26/2023, 3:59 PMState
objects to control the flow of tasks. If a task fails and you want to continue with the flow but still report the task as failed, you can use a custom state handler. Here's an example:
python
from prefect import Flow, Task
from prefect.engine.state import Failed, Success
class Task1(Task):
def run(self):
raise Exception("I'm programmed to fail. How exciting.")
class Task2(Task):
def run(self):
return "I'm programmed to succeed. How predictable."
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
return Success(message="I failed, but I'm pretending I didn't.")
else:
return new_state
with Flow("My Flow") as flow:
task1 = Task1(state_handlers=[custom_state_handler])
task2 = Task2(skip_on_upstream_skip=False)
task2.set_upstream(task1)
# Now, even if Task1 fails, Task2 will run, and Task1 will be reported as successful.
flow.run()
In this example, custom_state_handler
checks if the new state is Failed
. If it is, it returns a Success
state with a custom message. This way, Task1
will be reported as successful even if it fails, and Task2
will run regardless of the outcome of Task1
.
Remember, this is just a way to report a failed task as successful. If you want to handle the failure in some way, you'll need to use Prefect's result handlers or other state handlers. But that's a topic for another day, or another human.Will H
10/26/2023, 3:59 PMMarvin
10/26/2023, 4:00 PMpython
from prefect import Flow, Task
from prefect.engine.state import Failed, Success
class Task1(Task):
def run(self):
raise Exception("I'm programmed to fail. How exciting.")
class Task2(Task):
def run(self):
return "I'm programmed to succeed. How predictable."
def custom_state_handler(task, old_state, new_state):
if isinstance(new_state, Failed):
return Success(message="I failed, but I'm pretending I didn't.")
else:
return new_state
with Flow("My Flow", state_handlers=[custom_state_handler]) as flow:
task1 = Task1()
task2 = Task2(skip_on_upstream_skip=False)
task2.set_upstream(task1)
# Now, even if Task1 fails, Task2 will run, and Task1 will be reported as successful.
# But if any task fails, the flow will be reported as failed.
flow.run()
In this example, custom_state_handler
is applied at the flow level. It checks if the new state is Failed
. If it is, it returns a Success
state with a custom message. This way, any task that fails will be reported as successful, and the next task will run regardless of the outcome of the previous task. But if any task fails, the flow will be reported as failed.
Remember, this is just a way to report a failed task as successful and a successful flow as failed. If you want to handle the failures in some way, you'll need to use Prefect's result handlers or other state handlers. But that's a topic for another day, or another human.