Tom Klein
08/17/2023, 12:52 PMrun_deployment
it seems like the parent flow can be a success even if the subflows fail - is that expected behavior? is there a way to make sure it behaves more like the sync way?Tom Klein
08/17/2023, 12:53 PM@flow(
name="Process responses incrementally",
flow_run_name="process-responses-incrementally",
)
async def proces_responses_incrementally_flow(workers: int = 500):
futures = []
for chunk_mod in range(0, workers):
futures.append(
run_deployment(
name=f"Process responses chunk/process-responses-chunk",
flow_run_name=f"process-response-chunk-{chunk_mod}",
parameters={"chunk_mod": chunk_mod, "total_workers": workers},
)
)
await asyncio.gather(*futures)
if __name__ == "__main__":
asyncio.run(process_first_responses_incrementally_flow())
Tom Klein
08/19/2023, 12:29 PMres = await asyncio.gather(*futures)
states = [run.state for run in res]
return states
this is necessary because in order to deduce the final state from a list of states corrrectly, Prefect expect a list of State objects and NOT FlowRun result objects
(if you ask me, that could have been implemented at the Prefect level but whatever)Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by