Florian Giroud
12/17/2024, 12:03 PMMarvin
12/17/2024, 12:04 PMFlorian Giroud
12/17/2024, 12:04 PMMarvin
12/17/2024, 12:04 PMpython
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
import random
# Define a sample task
@task
def my_task(x):
if random.choice([True, False]):
print(f"Task {x} succeeded")
else:
raise Exception(f"Task {x} failed")
# Define the flow
@flow(task_runner=DaskTaskRunner())
def my_flow():
results = []
for i in range(3):
results.append(my_task.submit(i))
# Await task completion and check results
task_results = [result.result() for result in results]
if any(result is None for result in task_results): # Checking for any failed task
raise Exception("One or more tasks failed")
my_flow()
### Important Points:
- DaskTaskRunner: Enables parallel task execution.
- Error Handling: After all tasks complete, task_results
checks for any failure.
- Flow Failure: If any task fails (here simulated randomly), the flow will raise an exception.
For more detailed guidance, visit the Prefect documentation on running tasks in parallel and handling transactions and failures effectively.
If you have any questions or need further details, feel free to ask!