Max Jackson04/06/2023, 5:35 PM
, then throwing an error if
my_flow_run = prefect.deployments.run_deployment(name='my_deployment')
. I'd like to coordinate them asyncronously, with something like
then I'd like to continue the rest of the main flow sequentially. Am I going about this the right way? is there a built-in solution or will I need to hand-roll something?
my_flow_run_1, my_flow_run_2 = await asyncio.gather( run_deployment( name=f"my_deployment_1"), run_deployment( name=f"my_deployment_2") ) throw_error_if_flow_run_failed(my_flow_run_1) throw_error_if_flow_run_failed(my_flow_run_2)
Sahil Rangwala04/06/2023, 5:54 PM
Nelson Griffiths04/06/2023, 7:16 PM
@flow async def my_flow(): flow_runs = await asyncio.gather( run_deployment( name=f"my_deployment_1"), run_deployment( name=f"my_deployment_2") ) if not all([flow_run.state_type == StateType.COMPLETED for flow_run in flow_runs]): ... # Throw whatever error # Keep running other tasks sequentially
Max Jackson04/07/2023, 12:12 PM
. namely, I keep getting
AttributeError: 'coroutine' object has no attribute 'state_type'
Nelson Griffiths04/07/2023, 12:48 PM