https://prefect.io logo
t

Tom Klein

08/17/2023, 12:52 PM
When firing subflows asynchronously as deployments using
run_deployment
it seems like the parent flow can be a success even if the subflows fail - is that expected behavior? is there a way to make sure it behaves more like the sync way?
Example:
Copy code
@flow(
    name="Process responses incrementally",
    flow_run_name="process-responses-incrementally",
)
async def proces_responses_incrementally_flow(workers: int = 500):
    futures = []
    for chunk_mod in range(0, workers):
        futures.append(
            run_deployment(
                name=f"Process responses chunk/process-responses-chunk",
                flow_run_name=f"process-response-chunk-{chunk_mod}",
                parameters={"chunk_mod": chunk_mod, "total_workers": workers},
            )
        )
    await asyncio.gather(*futures)


if __name__ == "__main__":
    asyncio.run(process_first_responses_incrementally_flow())
For posterity’s sake, this is the workaround we found:
Copy code
res = await asyncio.gather(*futures)
    states = [run.state for run in res]
    return states
this is necessary because in order to deduce the final state from a list of states corrrectly, Prefect expect a list of State objects and NOT FlowRun result objects (if you ask me, that could have been implemented at the Prefect level but whatever)