Max Eggers
11/15/2023, 2:55 PMMarvin
11/15/2023, 2:55 PMskip_on_upstream_skip
option to fail a flow immediately if any of its tasks fail. This option, when set to True
, will skip the execution of a task if any of its upstream tasks have failed.
Here's an example of how you can use it:
from prefect import Flow, Task
class MyTask(Task):
def run(self):
# Your task logic here
# Create your tasks
task1 = MyTask(skip_on_upstream_skip=True)
task2 = MyTask(skip_on_upstream_skip=True)
# Create your flow
with Flow("My Flow") as flow:
task1_result = task1()
task2_result = task2.map(upstream_tasks=[task1_result])
In this example, if task1
fails, task2
will be skipped and the flow will fail immediately.
However, please note that this will not stop the execution of tasks that are already running. If you want to stop the execution of all tasks immediately upon a single task failure, you might need to implement a custom solution, such as checking the state of all tasks at the start of each task and terminating the task if any have failed.Marvin
11/15/2023, 2:55 PMskip_on_upstream_skip
attribute of the Flow to False
. This will cause the flow to stop executing further tasks as soon as a task fails.
However, when using .map
, all mapped tasks are run concurrently, so if one fails, the others will still continue to run. If you want to stop the entire flow immediately when any task fails, you might need to reconsider the use of .map
and instead use a different structure that allows for immediate failure.
Here's an example of how you can set the skip_on_upstream_skip
attribute:
from prefect import Flow, Task
class MyTask(Task):
def run(self):
# Your task logic here
pass
with Flow('My Flow') as flow:
task = MyTask()
task.skip_on_upstream_skip = False
Remember to replace MyTask
with your actual task.
Please note that this is a general advice and might need to be adjusted based on your specific use case.Max Eggers
11/15/2023, 2:57 PMMarvin
11/15/2023, 2:58 PMskip_on_upstream_skip
attribute is not a valid attribute for controlling flow behavior in Prefect.
In Prefect, the execution of the flow continues even if a task fails. This is by design, as Prefect is built to handle failures gracefully and continue with the flow execution as much as possible.
However, you can control the behavior of your flow when a task fails by using Prefect's state handlers. You can write a custom state handler for your flow that sets the state of the entire flow to Failed as soon as any task fails.
Here's an example of how you can do this:
from prefect import Flow, Task, Fail
from prefect.engine.state import Failed
class FailFastStateHandler:
def __call__(self, task, old_state, new_state):
if isinstance(new_state, Failed):
raise Fail("Failing the flow because a task failed.")
return new_state
class MyTask(Task):
def run(self):
# Your task logic here
pass
with Flow('My Flow', state_handlers=[FailFastStateHandler()]) as flow:
task = MyTask()
In this example, FailFastStateHandler
is a state handler that checks if the new state of any task is Failed. If it is, it raises a Fail
exception, which causes the entire flow to fail.
Please note that this is a general advice and might need to be adjusted based on your specific use case.