Matt Alhonte
10/03/2023, 1:24 AMMarvin
10/03/2023, 1:25 AMraise_on_failure
parameter of the submit
method.
Here's an example:
python
from prefect import flow, task
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_fails_flow():
always_fails_task.submit().result(raise_on_failure=True)
always_succeeds_task()
if __name__ == "__main__":
always_fails_flow()
In this example, the always_fails_task
is set to always fail. The raise_on_failure=True
parameter in the submit
method call ensures that if this task fails, the whole flow will fail, even if other tasks (like always_succeeds_task
) succeed.
Remember, life... don't talk to me about life.Matt Alhonte
10/03/2023, 1:28 AMMarvin
10/03/2023, 1:28 AMState
objects to handle this. Specifically, you can use the Failed
state. If a task enters a Failed
state, you can set up your flow to fail as well.
Here's an example:
python
from prefect import Flow, Task, states
class AlwaysFailsTask(Task):
def run(self):
self.state = states.Failed(message="I always fail")
return self.state
class AlwaysSucceedsTask(Task):
def run(self):
print("I always succeed")
return "success"
with Flow("My Flow") as flow:
fails = AlwaysFailsTask()
succeeds = AlwaysSucceedsTask()
flow.set_dependencies(task=succeeds, upstream_tasks=[fails])
flow.run() # This will fail because AlwaysFailsTask fails
In this example, AlwaysFailsTask
always fails by setting its state to Failed
. Because AlwaysSucceedsTask
depends on AlwaysFailsTask
, the whole flow will fail if AlwaysFailsTask
fails.
Remember, life... don't talk to me about life.Matt Alhonte
10/03/2023, 1:28 AMMarvin
10/03/2023, 1:29 AMState
objects, but the way you set dependencies has been updated.
Here's an example:
python
from prefect import Flow, Task, Error
from prefect.engine import signals
class AlwaysFailsTask(Task):
def run(self):
raise signals.FAIL(message="I always fail")
class AlwaysSucceedsTask(Task):
def run(self):
print("I always succeed")
return "success"
with Flow("My Flow") as flow:
fails = AlwaysFailsTask()
succeeds = AlwaysSucceedsTask()
flow.set_dependencies(task=succeeds, upstream_tasks=[fails])
flow.run() # This will fail because AlwaysFailsTask fails
In this example, AlwaysFailsTask
always fails by raising a FAIL
signal. Because AlwaysSucceedsTask
depends on AlwaysFailsTask
, the whole flow will fail if AlwaysFailsTask
fails.
Remember, life... don't talk to me about life.Matt Alhonte
10/03/2023, 1:29 AMMarvin
10/03/2023, 1:30 AMraise_on_failure
flag in the run
method of the flow, not on the task. Here's an example:
python
from prefect import Flow, Task
class AlwaysFailsTask(Task):
def run(self):
raise ValueError("I always fail")
class AlwaysSucceedsTask(Task):
def run(self):
print("I always succeed")
return "success"
with Flow("My Flow") as flow:
fails = AlwaysFailsTask()
succeeds = AlwaysSucceedsTask()
flow.run(raise_on_failure=True) # This will fail because AlwaysFailsTask fails
In this example, AlwaysFailsTask
always fails by raising a ValueError
. The raise_on_failure=True
parameter in the run
method call ensures that if any task fails, the whole flow will fail, even if other tasks (like AlwaysSucceedsTask
) succeed.
Remember, life... don't talk to me about life.Matt Alhonte
10/03/2023, 1:30 AMMatt Alhonte
10/03/2023, 1:30 AMfrom prefect import Flow, Task
class AlwaysFailsTask(Task):
def run(self):
raise ValueError("I always fail")
class AlwaysSucceedsTask(Task):
def run(self):
print("I always succeed")
return "success"
with Flow("My Flow") as flow:
fails = AlwaysFailsTask()
succeeds = AlwaysSucceedsTask()
flow.run(raise_on_failure=True) # This will fail because AlwaysFailsTask fails
Marvin
10/03/2023, 1:30 AMpython
from prefect import Flow, Task
class AlwaysFailsTask(Task):
def run(self):
raise ValueError("I always fail")
class AlwaysSucceedsTask(Task):
def run(self):
print("I always succeed")
return "success"
with Flow("My Flow") as flow:
fails = AlwaysFailsTask()
succeeds = AlwaysSucceedsTask()
try:
flow.run() # This will fail because AlwaysFailsTask fails
except Exception as e:
print(f"Flow failed due to: {e}")
In this example, AlwaysFailsTask
always fails by raising a ValueError
. The flow.run()
is wrapped in a try/except
block, which will catch the exception raised by the failing task and print a message. This will cause the whole flow to fail if any task fails, even if other tasks (like AlwaysSucceedsTask
) succeed.
Remember, life... don't talk to me about life.