https://prefect.io logo
Title
s

Santhosh Solomon (Fluffy)

01/01/2023, 12:51 PM
Hi, I am working on integrating async APIs in my workflow. Can someone help me with best practices? #prefect2 #asyncapi
k

Kelvin DeCosta

01/02/2023, 8:56 AM
Hey there! Spent last week refactoring sync mapped tasks to work asynchronously. Some caveats before you start asyncing: • Weirdly enough, an
async
flow cannot call both
async
and
sync
tasks without crashing • I ran into an error where an
async
flow (flow marked with
async def
) would crash when consuming the result of a an
async
subflow. Fixed this by making the flow
sync
and not using
await
when consuming the result of the subflow For mapped
async
tasks,
await async_task.map(return_state=True)
nicely returns all the task states. You would then need to consume them in either two ways: •
asyncio.gather(*[state.result(fetch=True) for state in task_states])
[state.result() for state in task_states]
AFAIK, the first option requires result persistance
🙌 1
As in most cases, the type of
async
features / practices you adopt really depends on the context. I'm in no way an expert with
async
, just sharing my thoughts
💯 1
s

Santhosh Solomon (Fluffy)

01/02/2023, 12:49 PM
Thanks for your inputs. The journey with Prefect so far has been finding what doesn’t work. Hoping for the light in end of the tunnel. 🤓
This actually made me deep dive into asynchronous programming. But because of time crunch I am trying with different approaches.
z

Zanie

01/03/2023, 5:39 PM
I’m happy to help with any specific questions
You should turn on the
PREFECT_ASYNC_FETCH_STATE_RESULT
setting so you don’t need to set
fetch
on
state.result()
calls — this is only not the default behavior for backwards compatibility.
l

Lawrence Lee

01/06/2023, 8:58 AM
Hi @Kelvin DeCosta! I was looking for an example for calling async subflows, but based on your observation here:
• I ran into an error where an
async
flow (flow marked with
async def
) would crash when consuming the result of a an
async
subflow. Fixed this by making the flow
sync
and not using
await
when consuming the result of the subflow
subflows have to be syncronous? Is there no way to run subflows in parallel?
s

Santhosh Solomon (Fluffy)

01/06/2023, 10:36 AM
@Lawrence Lee As far as I have worked with Prefect, subflows work sequentially only. There is no parallel execution of subflows so far.
k

Kelvin DeCosta

01/06/2023, 2:45 PM
Only way I can think of running them in parallel is by invoking them as deployments. Then they’re not technically subflows
z

Zanie

01/06/2023, 4:06 PM
Subflows should be able to be asynchronous (we even explicitly test for this). The only difficulty is running the same flow concurrently since its task runner instance is “reused” and we don’t support that yet.
s

Santhosh Solomon (Fluffy)

01/06/2023, 4:44 PM
@Zanie Can you share a code snippet for the implementation and testing?
z

Zanie

01/06/2023, 4:50 PM
import anyio
import asyncio
from prefect import flow


@flow
async def foo():
    return "foo"

@flow
async def bar():
    return "bar"


@flow
async def asyncio_concurrency():
    results = await asyncio.gather(foo(), bar())
    print("".join(results))

asyncio.run(asyncio_concurrency())

@flow
async def anyio_concurrency():
    results = {}
    async def run_and_capture_result(flow):
        results[flow] = await flow()
        
    async with anyio.create_task_group() as tg:
        tg.start_soon(run_and_capture_result, foo)
        tg.start_soon(run_and_capture_result, bar)

    print("".join([results[foo], results[bar]]))

anyio.run(anyio_concurrency)
❤️ 1
🍻 1
l

Lawrence Lee

01/06/2023, 4:53 PM
Thanks @Zanie. What is the expected behavior if this is run as a deployment with an ECSTask IB? Will all the subflows run as separate ECS tasks?
z

Zanie

01/06/2023, 4:53 PM
Subflows run in-process
You can use
run_deployment
if you want them to run as separate ECSTasks
(They’ll be linked as subflows still! Perhaps worth clarifying that flow calls are run in-process)
l

Lawrence Lee

01/06/2023, 5:01 PM
And since
run_deployment
is async we can use the same asyncio or anyio syntax to run them in parallel? https://docs.prefect.io/api-ref/prefect/deployments/#prefect.deployments.run_deployment
z

Zanie

01/06/2023, 5:12 PM
Yeah or you can start them with no timeout so we don’t wait for completion