<@ULVA73B9P> how do you submit an entire flow asyn...
# ask-marvin
a
@Marvin how do you submit an entire flow async for an api to resolve?
m
Ah, the sweet sound of success. I found some information on submitting an entire flow async for an API to resolve in Prefect. To achieve this, you can use the
asyncio
library and follow the standard rules of async Python. It's a good practice to run any blocking code in a separate thread. Here's an example of how to write an async flow:
Copy code
python
import asyncio
from prefect import flow, task, get_run_logger

@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)

async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)
    await subflow(n)

@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(i) for i in range(5)])

main_flow.run()
This example creates a main flow that runs multiple subflows in parallel using
asyncio.gather
. Each subflow contains an async task that sleeps for a second before completing. For more information, you can refer to the following resources: - Best practices for using async in Prefect 2.0 - How can I run multiple subflows (or child flows) in parallel? I hope this helps you in your quest for async flow submission. Good luck, human.
n
hmm i think this is a relatively old workaround that should no longer be necessary, can you elaborate on what you mean by submit and resolve here?
a
Copy code
@router.post("/sync-workflows")
def sync_workflows(providerInfo: ProviderInfo):
    # If onboarding is in progress, skip
    if onboarding_in_progress(providerInfo.user_id):
        return {"message": "Onboarding is already in progress..."}

    # Prefect flow
    sync(
        user_id=providerInfo.user_id,
        refresh_token=providerInfo.refresh_token,
        user_email=providerInfo.user_email,
        date=datetime.datetime.now(),
    )

    return {"message": "Syncing workflows..."}
Right now this endpoint waits for sync to run and times out
and sync is a @ flow
how can I kick off sync without waiting on run to execute next line?
n
do you have a deployment for
sync
or do you just plan on calling the flow object?
a
calling the flow object on
render
for now
n
can you make this async or no? otherwise backgrounding this might get weird
also is this fastapi?
a
it is fastapi!
n
maybe something like
Copy code
from fastapi import BackgroundTasks

@router.post("/sync-workflows")
def sync_workflows(background_tasks: BackgroundTasks, providerInfo: ProviderInfo):
    # If onboarding is in progress, skip
    if onboarding_in_progress(providerInfo.user_id):
        return {"message": "Onboarding is already in progress..."}

    # Prefect flow
    background_tasks.add_task(
        sync,
        user_id=providerInfo.user_id,
        refresh_token=providerInfo.refresh_token,
        user_email=providerInfo.user_email,
        date=datetime.datetime.now(),
    )

    return {"message": "Syncing workflows..."}
a
yes let’s see kicking off now
oh yes that seems to work thank you!!
👍 1
n
marvin