Fernando Correia
01/31/2025, 6:06 PMMarvin
01/31/2025, 6:06 PMpython
from prefect import flow, task
@task
async def setup():
# Simulate setup work
print("Setting up...")
return "Setup complete"
@task
async def verify_color():
# Simulate verification
print("Verifying color...")
return True
@task
async def verify_weight():
# Simulate verification
print("Verifying weight...")
return True
@task
async def verify_size():
# Simulate verification
print("Verifying size...")
return True
@task
async def summarize(color_verified, weight_verified, size_verified):
# Summarize the results
print("Summarizing results...")
return color_verified and weight_verified and size_verified
@flow
async def verification_flow():
setup_result = await setup()
print(setup_result)
# Run verification tasks concurrently
color_result, weight_result, size_result = await verify_color(), await verify_weight(), await verify_size()
# Summarize the results
summary_result = await summarize(color_result, weight_result, size_result)
print(f"Summary result: {summary_result}")
# Execute the flow
verification_flow()
This example demonstrates how to define an asynchronous flow with tasks that run concurrently using Prefect 3.0. Each of the verification tasks (verify_color
, verify_weight
, verify_size
) runs independently, and the summarize
task waits for their completion before evaluating the results.
For more information on Prefect's asynchronous features, you can refer to the Prefect documentation.