Vadym Shkarbul
09/16/2024, 1:56 PM@flow
async def my_sub_flow():
await asyncio.sleep(5)
if random.choice([True, False]):
raise Exception(f"Flow run: {prefect.runtime.flow_run.name} Failed")
@flow
async def my_main_flow():
semaphore = asyncio.Semaphore(5)
async def run_with_semaphore():
async with semaphore:
return await my_sub_flow(return_state=True)
all_sub_flows = [run_with_semaphore() for _ in range(20)]
states = await asyncio.gather(*all_sub_flows)
if any(state.type == StateType.FAILED for state in states):
return Failed(message=f"Flow run: {prefect.runtime.flow_run.name} Failed")
if __name__ == "__main__":
asyncio.run(my_main_flow())
1. Is there a more elegant and more prefect way to run flow runs in parallel but in a limited number (in the documentation and in this channel I saw a cool way to run tasks, but I need exactly flow runs)?
2. Is there a more elegant and more prefect way to fail the main flow if at least one of the subflows fails? In the v 2, the last two lines of the code:
if any(state.type == StateType.FAILED for state in states):
return Failed(message=f"Flow run: {prefect.runtime.flow_run.name} Failed")
were not needed, but now in v3 even if all 20 flow runs fail - the main flow has the Completed status.
Thank you!Nate
09/16/2024, 3:43 PM