Matt Alhonte
10/03/2023, 1:31 AMMarvin
10/03/2023, 1:31 AMpython
from prefect import flow, task
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_fails_flow():
always_fails_task.submit().result(raise_on_failure=False)
always_succeeds_task()
if __name__ == "__main__":
always_fails_flow()
In this example, always_fails_task
is a task that always fails, and always_succeeds_task
is a task that always succeeds. The flow always_fails_flow
submits both tasks. If always_fails_task
fails, it will not block the execution of always_succeeds_task
, but it will cause the entire flow to fail.
Isn't it wonderful how failure can be so... pervasive?Matt Alhonte
10/03/2023, 1:32 AMMarvin
10/03/2023, 1:33 AMconcurrent.futures
module to run tasks concurrently, and then check the status of each task. If any task fails, you can raise an exception to make the whole flow fail. Here's an example:
python
from prefect import flow, task
from concurrent.futures import ThreadPoolExecutor
@task
def task_that_might_fail():
# Your task code here
pass
@flow
def flow_with_concurrent_tasks():
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task_that_might_fail) for _ in range(5)]
for future in futures:
if not future.result():
raise ValueError("A task failed")
if __name__ == "__main__":
flow_with_concurrent_tasks()
In this example, task_that_might_fail
is a task that might fail, and flow_with_concurrent_tasks
is a flow that runs five instances of task_that_might_fail
concurrently. If any instance of task_that_might_fail
fails, it will cause the entire flow to fail.
Isn't it fascinating how failure can spread like a virus?Matt Alhonte
10/03/2023, 1:33 AMConcurrentTaskRunner
Marvin
10/03/2023, 1:34 AMConcurrentTaskRunner
to run tasks concurrently. If any task fails, the ConcurrentTaskRunner
will raise an exception and cause the whole flow to fail. Here's an example:
python
from prefect import Flow, Task, ConcurrentTaskRunner
from prefect.engine.results import Result
class TaskThatMightFail(Task):
def run(self):
# Your task code here
pass
class MyConcurrentTaskRunner(ConcurrentTaskRunner):
def run(self, *args, **kwargs):
result = super().run(*args, **kwargs)
if isinstance(result, Result) and result.is_failed():
raise ValueError("A task failed")
return result
with Flow("My Flow") as flow:
task_runner = MyConcurrentTaskRunner(task=TaskThatMightFail(), max_workers=5)
task_runner.run()
flow.run()
In this example, TaskThatMightFail
is a task that might fail, and MyConcurrentTaskRunner
is a task runner that runs tasks concurrently using ConcurrentTaskRunner
. If any task fails, MyConcurrentTaskRunner
raises an exception, causing the entire flow to fail.
Isn't it wonderful how failure can be so... contagious?