Santhosh Solomon (Fluffy)
01/01/2023, 12:51 PMKelvin DeCosta
01/02/2023, 8:56 AMasync flow cannot call both async and sync tasks without crashing
• I ran into an error where an async flow (flow marked with async def) would crash when consuming the result of a an async subflow. Fixed this by making the flow sync and not using await when consuming the result of the subflow
For mapped async tasks, await async_task.map(return_state=True) nicely returns all the task states. You would then need to consume them in either two ways:
• asyncio.gather(*[state.result(fetch=True) for state in task_states])
• [state.result() for state in task_states]
AFAIK, the first option requires result persistanceKelvin DeCosta
01/02/2023, 8:57 AMasync features / practices you adopt really depends on the context.
I'm in no way an expert with async , just sharing my thoughtsSanthosh Solomon (Fluffy)
01/02/2023, 12:49 PMSanthosh Solomon (Fluffy)
01/02/2023, 12:50 PMZanie
Zanie
PREFECT_ASYNC_FETCH_STATE_RESULT setting so you don’t need to set fetch on state.result() calls — this is only not the default behavior for backwards compatibility.Lawrence Lee
01/06/2023, 8:58 AM• I ran into an error where ansubflows have to be syncronous? Is there no way to run subflows in parallel?flow (flow marked withasync) would crash when consuming the result of a anasync defsubflow. Fixed this by making the flowasyncand not usingsyncwhen consuming the result of the subflowawait
Santhosh Solomon (Fluffy)
01/06/2023, 10:36 AMKelvin DeCosta
01/06/2023, 2:45 PMZanie
Santhosh Solomon (Fluffy)
01/06/2023, 4:44 PMZanie
import anyio
import asyncio
from prefect import flow
@flow
async def foo():
return "foo"
@flow
async def bar():
return "bar"
@flow
async def asyncio_concurrency():
results = await asyncio.gather(foo(), bar())
print("".join(results))
asyncio.run(asyncio_concurrency())
@flow
async def anyio_concurrency():
results = {}
async def run_and_capture_result(flow):
results[flow] = await flow()
async with anyio.create_task_group() as tg:
tg.start_soon(run_and_capture_result, foo)
tg.start_soon(run_and_capture_result, bar)
print("".join([results[foo], results[bar]]))
anyio.run(anyio_concurrency)Zanie
Lawrence Lee
01/06/2023, 4:53 PMZanie
Zanie
run_deployment if you want them to run as separate ECSTasksZanie
Lawrence Lee
01/06/2023, 5:01 PMrun_deployment is async we can use the same asyncio or anyio syntax to run them in parallel?
https://docs.prefect.io/api-ref/prefect/deployments/#prefect.deployments.run_deploymentZanie