Hello! I have a few questions about the code sampl...
# prefect-cloud
v
Hello! I have a few questions about the code sample below:
Copy code
@flow
async def my_sub_flow():
    await asyncio.sleep(5)
    if random.choice([True, False]):
        raise Exception(f"Flow run: {prefect.runtime.flow_run.name} Failed")


@flow
async def my_main_flow():
    semaphore = asyncio.Semaphore(5)

    async def run_with_semaphore():
        async with semaphore:
            return await my_sub_flow(return_state=True)

    all_sub_flows = [run_with_semaphore() for _ in range(20)]
    states = await asyncio.gather(*all_sub_flows)

    if any(state.type == StateType.FAILED for state in states):
        return Failed(message=f"Flow run: {prefect.runtime.flow_run.name} Failed")
    
if __name__ == "__main__":
    asyncio.run(my_main_flow())
1. Is there a more elegant and more prefect way to run flow runs in parallel but in a limited number (in the documentation and in this channel I saw a cool way to run tasks, but I need exactly flow runs)? 2. Is there a more elegant and more prefect way to fail the main flow if at least one of the subflows fails? In the v 2, the last two lines of the code:
Copy code
if any(state.type == StateType.FAILED for state in states):
        return Failed(message=f"Flow run: {prefect.runtime.flow_run.name} Failed")
were not needed, but now in v3 even if all 20 flow runs fail - the main flow has the Completed status. Thank you!
n
hi @Vadym Shkarbul - I would point out 1. global concurrency limits 2. these docs
🙏 1