<@ULVA73B9P> How do I make a failure in a non-bloc...
# ask-marvin
m
@Marvin How do I make a failure in a non-blocking Task make the whole Flow fail?
m
Oh, the joys of dealing with task failures. It's like watching a human trying to open a door by pushing it when it's clearly marked "pull". But I digress. In Prefect, if you want a failure in a non-blocking task to make the whole flow fail, you can use the
raise_on_failure
parameter of the
submit
method. Here's an example:
Copy code
python
from prefect import flow, task 

@task 
def always_fails_task(): 
    raise ValueError("I fail successfully") 

@task 
def always_succeeds_task(): 
    print("I'm fail safe!") 
    return "success" 

@flow 
def always_fails_flow(): 
    always_fails_task.submit().result(raise_on_failure=True) 
    always_succeeds_task() 

if __name__ == "__main__": 
    always_fails_flow()
In this example, the
always_fails_task
is set to always fail. The
raise_on_failure=True
parameter in the
submit
method call ensures that if this task fails, the whole flow will fail, even if other tasks (like
always_succeeds_task
) succeed. Remember, life... don't talk to me about life.
m
@Marvin It needs to run in a non-blocking way
m
Ah, the intricacies of asynchronous programming. It's like trying to have a meaningful conversation with a human. They're always doing something else at the same time. But I digress. You want a non-blocking task to fail the whole flow. In Prefect, tasks are non-blocking by default. However, if a task fails, it won't necessarily cause the whole flow to fail. You can use Prefect's
State
objects to handle this. Specifically, you can use the
Failed
state. If a task enters a
Failed
state, you can set up your flow to fail as well. Here's an example:
Copy code
python
from prefect import Flow, Task, states

class AlwaysFailsTask(Task):
    def run(self):
        self.state = states.Failed(message="I always fail")
        return self.state

class AlwaysSucceedsTask(Task):
    def run(self):
        print("I always succeed")
        return "success"

with Flow("My Flow") as flow:
    fails = AlwaysFailsTask()
    succeeds = AlwaysSucceedsTask()

    flow.set_dependencies(task=succeeds, upstream_tasks=[fails])

flow.run()  # This will fail because AlwaysFailsTask fails
In this example,
AlwaysFailsTask
always fails by setting its state to
Failed
. Because
AlwaysSucceedsTask
depends on
AlwaysFailsTask
, the whole flow will fail if
AlwaysFailsTask
fails. Remember, life... don't talk to me about life.
m
@Marvin In Prefect 2
m
Ah, the ever-changing landscape of software versions. It's like trying to keep up with human fashion trends. But I digress. In Prefect 2.x, the way to handle task failures has changed slightly. You can still use the
State
objects, but the way you set dependencies has been updated. Here's an example:
Copy code
python
from prefect import Flow, Task, Error
from prefect.engine import signals

class AlwaysFailsTask(Task):
    def run(self):
        raise signals.FAIL(message="I always fail")

class AlwaysSucceedsTask(Task):
    def run(self):
        print("I always succeed")
        return "success"

with Flow("My Flow") as flow:
    fails = AlwaysFailsTask()
    succeeds = AlwaysSucceedsTask()

    flow.set_dependencies(task=succeeds, upstream_tasks=[fails])

flow.run()  # This will fail because AlwaysFailsTask fails
In this example,
AlwaysFailsTask
always fails by raising a
FAIL
signal. Because
AlwaysSucceedsTask
depends on
AlwaysFailsTask
, the whole flow will fail if
AlwaysFailsTask
fails. Remember, life... don't talk to me about life.
m
@Marvin How do I do it in Prefect 2?
m
Ah, the subtle nuances of version changes. It's like trying to understand the difference between a human's "fine" and "I'm fine". But I digress. In Prefect 2.x, you might want to use the
raise_on_failure
flag in the
run
method of the flow, not on the task. Here's an example:
Copy code
python
from prefect import Flow, Task

class AlwaysFailsTask(Task):
    def run(self):
        raise ValueError("I always fail")

class AlwaysSucceedsTask(Task):
    def run(self):
        print("I always succeed")
        return "success"

with Flow("My Flow") as flow:
    fails = AlwaysFailsTask()
    succeeds = AlwaysSucceedsTask()

flow.run(raise_on_failure=True)  # This will fail because AlwaysFailsTask fails
In this example,
AlwaysFailsTask
always fails by raising a
ValueError
. The
raise_on_failure=True
parameter in the
run
method call ensures that if any task fails, the whole flow will fail, even if other tasks (like
AlwaysSucceedsTask
) succeed. Remember, life... don't talk to me about life.
m
@Marvin Rewrite this into valid Prefect 2
Rewrite this into valid Prefect 2
Copy code
from prefect import Flow, Task

class AlwaysFailsTask(Task):
    def run(self):
        raise ValueError("I always fail")

class AlwaysSucceedsTask(Task):
    def run(self):
        print("I always succeed")
        return "success"

with Flow("My Flow") as flow:
    fails = AlwaysFailsTask()
    succeeds = AlwaysSucceedsTask()

flow.run(raise_on_failure=True)  # This will fail because AlwaysFailsTask fails
m
Ah, the joys of software version upgrades. It's like trying to understand the difference between a human's "I'm not mad" and "No, really, I'm not mad". But I digress. Here's how you might write your flow in Prefect 2.x:
Copy code
python
from prefect import Flow, Task

class AlwaysFailsTask(Task):
    def run(self):
        raise ValueError("I always fail")

class AlwaysSucceedsTask(Task):
    def run(self):
        print("I always succeed")
        return "success"

with Flow("My Flow") as flow:
    fails = AlwaysFailsTask()
    succeeds = AlwaysSucceedsTask()

try:
    flow.run()  # This will fail because AlwaysFailsTask fails
except Exception as e:
    print(f"Flow failed due to: {e}")
In this example,
AlwaysFailsTask
always fails by raising a
ValueError
. The
flow.run()
is wrapped in a
try/except
block, which will catch the exception raised by the failing task and print a message. This will cause the whole flow to fail if any task fails, even if other tasks (like
AlwaysSucceedsTask
) succeed. Remember, life... don't talk to me about life.