https://prefect.io logo
Title
a

Ashe

05/19/2023, 5:56 PM
@Marvin how do you submit an entire flow async for an api to resolve?
m

Marvin

05/19/2023, 5:58 PM
Ah, the sweet sound of success. I found some information on submitting an entire flow async for an API to resolve in Prefect. To achieve this, you can use the
asyncio
library and follow the standard rules of async Python. It's a good practice to run any blocking code in a separate thread. Here's an example of how to write an async flow:
python
import asyncio
from prefect import flow, task, get_run_logger

@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)

async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)
    await subflow(n)

@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(i) for i in range(5)])

main_flow.run()
This example creates a main flow that runs multiple subflows in parallel using
asyncio.gather
. Each subflow contains an async task that sleeps for a second before completing. For more information, you can refer to the following resources: - Best practices for using async in Prefect 2.0 - How can I run multiple subflows (or child flows) in parallel? I hope this helps you in your quest for async flow submission. Good luck, human.
n

Nate

05/19/2023, 6:00 PM
hmm i think this is a relatively old workaround that should no longer be necessary, can you elaborate on what you mean by submit and resolve here?
a

Ashe

05/19/2023, 6:00 PM
@router.post("/sync-workflows")
def sync_workflows(providerInfo: ProviderInfo):
    # If onboarding is in progress, skip
    if onboarding_in_progress(providerInfo.user_id):
        return {"message": "Onboarding is already in progress..."}

    # Prefect flow
    sync(
        user_id=providerInfo.user_id,
        refresh_token=providerInfo.refresh_token,
        user_email=providerInfo.user_email,
        date=datetime.datetime.now(),
    )

    return {"message": "Syncing workflows..."}
Right now this endpoint waits for sync to run and times out
and sync is a @ flow
how can I kick off sync without waiting on run to execute next line?
n

Nate

05/19/2023, 6:01 PM
do you have a deployment for
sync
or do you just plan on calling the flow object?
a

Ashe

05/19/2023, 6:01 PM
calling the flow object on
render
for now
n

Nate

05/19/2023, 6:03 PM
can you make this async or no? otherwise backgrounding this might get weird
also is this fastapi?
a

Ashe

05/19/2023, 6:04 PM
it is fastapi!
n

Nate

05/19/2023, 6:05 PM
maybe something like
from fastapi import BackgroundTasks

@router.post("/sync-workflows")
def sync_workflows(background_tasks: BackgroundTasks, providerInfo: ProviderInfo):
    # If onboarding is in progress, skip
    if onboarding_in_progress(providerInfo.user_id):
        return {"message": "Onboarding is already in progress..."}

    # Prefect flow
    background_tasks.add_task(
        sync,
        user_id=providerInfo.user_id,
        refresh_token=providerInfo.refresh_token,
        user_email=providerInfo.user_email,
        date=datetime.datetime.now(),
    )

    return {"message": "Syncing workflows..."}
a

Ashe

05/19/2023, 6:06 PM
yes let’s see kicking off now
oh yes that seems to work thank you!!
👍 1
n

Nate

05/19/2023, 6:11 PM
:marvin: