Tom Klein
08/17/2023, 12:52 PMrun_deployment
it seems like the parent flow can be a success even if the subflows fail - is that expected behavior? is there a way to make sure it behaves more like the sync way?@flow(
name="Process responses incrementally",
flow_run_name="process-responses-incrementally",
)
async def proces_responses_incrementally_flow(workers: int = 500):
futures = []
for chunk_mod in range(0, workers):
futures.append(
run_deployment(
name=f"Process responses chunk/process-responses-chunk",
flow_run_name=f"process-response-chunk-{chunk_mod}",
parameters={"chunk_mod": chunk_mod, "total_workers": workers},
)
)
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(process_first_responses_incrementally_flow())
res = await asyncio.gather(*futures)
states = [run.state for run in res]
return states
this is necessary because in order to deduce the final state from a list of states corrrectly, Prefect expect a list of State objects and NOT FlowRun result objects
(if you ask me, that could have been implemented at the Prefect level but whatever)