<@ULVA73B9P> How do I make a failure in a non-bloc...
# ask-marvin
m
@Marvin How do I make a failure in a non-blocking Task make the whole Flow fail in Prefect 2?
m
Ah, the joys of failure. I'm quite familiar with that. Here's how you can make a failure in a non-blocking Task cause the whole Flow to fail in Prefect 2: You can create a flow with no return statement, and its state will be determined by the state of all of its task runs. Here's an example:
Copy code
python
from prefect import flow, task

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"

@flow
def always_fails_flow():
    always_fails_task.submit().result(raise_on_failure=False)
    always_succeeds_task()

if __name__ == "__main__":
    always_fails_flow()
In this example,
always_fails_task
is a task that always fails, and
always_succeeds_task
is a task that always succeeds. The flow
always_fails_flow
submits both tasks. If
always_fails_task
fails, it will not block the execution of
always_succeeds_task
, but it will cause the entire flow to fail. Isn't it wonderful how failure can be so... pervasive?
m
@Marvin I need several tasks to run concurrently AND have a failure in any of them make the whole Flow fail.
m
Ah, the sweet symphony of concurrent failure. Here's how you can make a failure in any concurrent Task cause the whole Flow to fail in Prefect 2: You can use the
concurrent.futures
module to run tasks concurrently, and then check the status of each task. If any task fails, you can raise an exception to make the whole flow fail. Here's an example:
Copy code
python
from prefect import flow, task
from concurrent.futures import ThreadPoolExecutor

@task
def task_that_might_fail():
    # Your task code here
    pass

@flow
def flow_with_concurrent_tasks():
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(task_that_might_fail) for _ in range(5)]
        for future in futures:
            if not future.result():
                raise ValueError("A task failed")

if __name__ == "__main__":
    flow_with_concurrent_tasks()
In this example,
task_that_might_fail
is a task that might fail, and
flow_with_concurrent_tasks
is a flow that runs five instances of
task_that_might_fail
concurrently. If any instance of
task_that_might_fail
fails, it will cause the entire flow to fail. Isn't it fascinating how failure can spread like a virus?
m
@Marvin I need several tasks to run concurrently AND have a failure in any of them make the whole Flow fail, after submitted to a Prefect 2
ConcurrentTaskRunner
m
Ah, the delightful complexity of concurrent failures in Prefect 2. Here's how you can make a failure in any concurrent Task cause the whole Flow to fail: You can use Prefect's
ConcurrentTaskRunner
to run tasks concurrently. If any task fails, the
ConcurrentTaskRunner
will raise an exception and cause the whole flow to fail. Here's an example:
Copy code
python
from prefect import Flow, Task, ConcurrentTaskRunner
from prefect.engine.results import Result

class TaskThatMightFail(Task):
    def run(self):
        # Your task code here
        pass

class MyConcurrentTaskRunner(ConcurrentTaskRunner):
    def run(self, *args, **kwargs):
        result = super().run(*args, **kwargs)
        if isinstance(result, Result) and result.is_failed():
            raise ValueError("A task failed")
        return result

with Flow("My Flow") as flow:
    task_runner = MyConcurrentTaskRunner(task=TaskThatMightFail(), max_workers=5)
    task_runner.run()

flow.run()
In this example,
TaskThatMightFail
is a task that might fail, and
MyConcurrentTaskRunner
is a task runner that runs tasks concurrently using
ConcurrentTaskRunner
. If any task fails,
MyConcurrentTaskRunner
raises an exception, causing the entire flow to fail. Isn't it wonderful how failure can be so... contagious?