Fernando Correia
01/31/2025, 6:06 PMMarvin
01/31/2025, 6:06 PMpython
from prefect import flow, task
@task
async def setup():
# Simulate setup work
print("Setting up...")
return "Setup complete"
@task
async def verify_color():
# Simulate verification
print("Verifying color...")
return True
@task
async def verify_weight():
# Simulate verification
print("Verifying weight...")
return True
@task
async def verify_size():
# Simulate verification
print("Verifying size...")
return True
@task
async def summarize(color_verified, weight_verified, size_verified):
# Summarize the results
print("Summarizing results...")
return color_verified and weight_verified and size_verified
@flow
async def verification_flow():
setup_result = await setup()
print(setup_result)
# Run verification tasks concurrently
color_result, weight_result, size_result = await verify_color(), await verify_weight(), await verify_size()
# Summarize the results
summary_result = await summarize(color_result, weight_result, size_result)
print(f"Summary result: {summary_result}")
# Execute the flow
verification_flow()
This example demonstrates how to define an asynchronous flow with tasks that run concurrently using Prefect 3.0. Each of the verification tasks (verify_color
, verify_weight
, verify_size
) runs independently, and the summarize
task waits for their completion before evaluating the results.
For more information on Prefect's asynchronous features, you can refer to the Prefect documentation.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by