Max Jackson
04/06/2023, 5:35 PMmy_flow_run = prefect.deployments.run_deployment(name='my_deployment')
, then throwing an error if my_flow_run.state_name=='failed'
. I'd like to coordinate them asyncronously, with something like
my_flow_run_1, my_flow_run_2 = await asyncio.gather(
run_deployment(
name=f"my_deployment_1"),
run_deployment(
name=f"my_deployment_2")
)
throw_error_if_flow_run_failed(my_flow_run_1)
throw_error_if_flow_run_failed(my_flow_run_2)
then I'd like to continue the rest of the main flow sequentially.
Am I going about this the right way? is there a built-in solution or will I need to hand-roll something?Sahil Rangwala
04/06/2023, 5:54 PMNelson Griffiths
04/06/2023, 7:16 PMasync
keywords. https://docs.prefect.io/latest/tutorials/execution/#asynchronous-executionNelson Griffiths
04/06/2023, 7:18 PM@flow
async def my_flow():
flow_runs = await asyncio.gather(
run_deployment(
name=f"my_deployment_1"),
run_deployment(
name=f"my_deployment_2")
)
if not all([flow_run.state_type == StateType.COMPLETED for flow_run in flow_runs]):
... # Throw whatever error
# Keep running other tasks sequentially
Max Jackson
04/07/2023, 12:12 PMasync
definition of run_deployment
. namely, I keep getting AttributeError: 'coroutine' object has no attribute 'state_type'
Nelson Griffiths
04/07/2023, 12:48 PM