Matt Alhonte
10/03/2023, 1:31 AMMarvin
10/03/2023, 1:31 AMpython
from prefect import flow, task
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def always_fails_flow():
always_fails_task.submit().result(raise_on_failure=False)
always_succeeds_task()
if __name__ == "__main__":
always_fails_flow()
In this example, always_fails_task is a task that always fails, and always_succeeds_task is a task that always succeeds. The flow always_fails_flow submits both tasks. If always_fails_task fails, it will not block the execution of always_succeeds_task, but it will cause the entire flow to fail.
Isn't it wonderful how failure can be so... pervasive?Matt Alhonte
10/03/2023, 1:32 AMMarvin
10/03/2023, 1:33 AMconcurrent.futures module to run tasks concurrently, and then check the status of each task. If any task fails, you can raise an exception to make the whole flow fail. Here's an example:
python
from prefect import flow, task
from concurrent.futures import ThreadPoolExecutor
@task
def task_that_might_fail():
# Your task code here
pass
@flow
def flow_with_concurrent_tasks():
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(task_that_might_fail) for _ in range(5)]
for future in futures:
if not future.result():
raise ValueError("A task failed")
if __name__ == "__main__":
flow_with_concurrent_tasks()
In this example, task_that_might_fail is a task that might fail, and flow_with_concurrent_tasks is a flow that runs five instances of task_that_might_fail concurrently. If any instance of task_that_might_fail fails, it will cause the entire flow to fail.
Isn't it fascinating how failure can spread like a virus?Matt Alhonte
10/03/2023, 1:33 AMConcurrentTaskRunnerMarvin
10/03/2023, 1:34 AMConcurrentTaskRunner to run tasks concurrently. If any task fails, the ConcurrentTaskRunner will raise an exception and cause the whole flow to fail. Here's an example:
python
from prefect import Flow, Task, ConcurrentTaskRunner
from prefect.engine.results import Result
class TaskThatMightFail(Task):
def run(self):
# Your task code here
pass
class MyConcurrentTaskRunner(ConcurrentTaskRunner):
def run(self, *args, **kwargs):
result = super().run(*args, **kwargs)
if isinstance(result, Result) and result.is_failed():
raise ValueError("A task failed")
return result
with Flow("My Flow") as flow:
task_runner = MyConcurrentTaskRunner(task=TaskThatMightFail(), max_workers=5)
task_runner.run()
flow.run()
In this example, TaskThatMightFail is a task that might fail, and MyConcurrentTaskRunner is a task runner that runs tasks concurrently using ConcurrentTaskRunner. If any task fails, MyConcurrentTaskRunner raises an exception, causing the entire flow to fail.
Isn't it wonderful how failure can be so... contagious?Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by