Santhosh Solomon (Fluffy)
01/01/2023, 12:51 PMKelvin DeCosta
01/02/2023, 8:56 AMasync
flow cannot call both async
and sync
tasks without crashing
• I ran into an error where an async
flow (flow marked with async def
) would crash when consuming the result of a an async
subflow. Fixed this by making the flow sync
and not using await
when consuming the result of the subflow
For mapped async
tasks, await async_task.map(return_state=True)
nicely returns all the task states. You would then need to consume them in either two ways:
• asyncio.gather(*[state.result(fetch=True) for state in task_states])
• [state.result() for state in task_states]
AFAIK, the first option requires result persistanceasync
features / practices you adopt really depends on the context.
I'm in no way an expert with async
, just sharing my thoughtsSanthosh Solomon (Fluffy)
01/02/2023, 12:49 PMZanie
01/03/2023, 5:39 PMPREFECT_ASYNC_FETCH_STATE_RESULT
setting so you don’t need to set fetch
on state.result()
calls — this is only not the default behavior for backwards compatibility.Lawrence Lee
01/06/2023, 8:58 AM• I ran into an error where ansubflows have to be syncronous? Is there no way to run subflows in parallel?flow (flow marked withasync
) would crash when consuming the result of a anasync def
subflow. Fixed this by making the flowasync
and not usingsync
when consuming the result of the subflowawait
Santhosh Solomon (Fluffy)
01/06/2023, 10:36 AMKelvin DeCosta
01/06/2023, 2:45 PMZanie
01/06/2023, 4:06 PMSanthosh Solomon (Fluffy)
01/06/2023, 4:44 PMZanie
01/06/2023, 4:50 PMimport anyio
import asyncio
from prefect import flow
@flow
async def foo():
return "foo"
@flow
async def bar():
return "bar"
@flow
async def asyncio_concurrency():
results = await asyncio.gather(foo(), bar())
print("".join(results))
asyncio.run(asyncio_concurrency())
@flow
async def anyio_concurrency():
results = {}
async def run_and_capture_result(flow):
results[flow] = await flow()
async with anyio.create_task_group() as tg:
tg.start_soon(run_and_capture_result, foo)
tg.start_soon(run_and_capture_result, bar)
print("".join([results[foo], results[bar]]))
anyio.run(anyio_concurrency)
Lawrence Lee
01/06/2023, 4:53 PMZanie
01/06/2023, 4:53 PMrun_deployment
if you want them to run as separate ECSTasksLawrence Lee
01/06/2023, 5:01 PMrun_deployment
is async we can use the same asyncio or anyio syntax to run them in parallel?
https://docs.prefect.io/api-ref/prefect/deployments/#prefect.deployments.run_deploymentZanie
01/06/2023, 5:12 PM