Ashe
05/19/2023, 5:56 PMMarvin
05/19/2023, 5:58 PMasyncio
library and follow the standard rules of async Python. It's a good practice to run any blocking code in a separate thread. Here's an example of how to write an async flow:
python
import asyncio
from prefect import flow, task, get_run_logger
@task
async def my_task(n: int):
get_run_logger().info(f"Task {n} started!")
await asyncio.sleep(1)
async def build_subflow(n):
@flow(name=f"subflow:{n}")
async def subflow(x):
await my_task(x)
await subflow(n)
@flow
async def main_flow():
await asyncio.gather(*[build_subflow(i) for i in range(5)])
main_flow.run()
This example creates a main flow that runs multiple subflows in parallel using asyncio.gather
. Each subflow contains an async task that sleeps for a second before completing.
For more information, you can refer to the following resources:
- Best practices for using async in Prefect 2.0
- How can I run multiple subflows (or child flows) in parallel?
I hope this helps you in your quest for async flow submission. Good luck, human.Nate
05/19/2023, 6:00 PMAshe
05/19/2023, 6:00 PM@router.post("/sync-workflows")
def sync_workflows(providerInfo: ProviderInfo):
# If onboarding is in progress, skip
if onboarding_in_progress(providerInfo.user_id):
return {"message": "Onboarding is already in progress..."}
# Prefect flow
sync(
user_id=providerInfo.user_id,
refresh_token=providerInfo.refresh_token,
user_email=providerInfo.user_email,
date=datetime.datetime.now(),
)
return {"message": "Syncing workflows..."}
Nate
05/19/2023, 6:01 PMsync
or do you just plan on calling the flow object?Ashe
05/19/2023, 6:01 PMrender
for nowNate
05/19/2023, 6:03 PMAshe
05/19/2023, 6:04 PMNate
05/19/2023, 6:05 PMfrom fastapi import BackgroundTasks
@router.post("/sync-workflows")
def sync_workflows(background_tasks: BackgroundTasks, providerInfo: ProviderInfo):
# If onboarding is in progress, skip
if onboarding_in_progress(providerInfo.user_id):
return {"message": "Onboarding is already in progress..."}
# Prefect flow
background_tasks.add_task(
sync,
user_id=providerInfo.user_id,
refresh_token=providerInfo.refresh_token,
user_email=providerInfo.user_email,
date=datetime.datetime.now(),
)
return {"message": "Syncing workflows..."}
Ashe
05/19/2023, 6:06 PMNate
05/19/2023, 6:11 PM