Hey all, I have a list of `run_deployment` corout...
# ask-community
a
Hey all, I have a list of
run_deployment
coroutines that are all flows. When I do an
asyncio.gather
sometimes one of the flows goes missing. Has anybody experienced something like this? Prefect support were less than useful when I reached out
m
Hello Avinash, I'm also trying to do something similar can we get in touch to discuss ?
a
Hey Manoj, Happy to help. Please DM me
n
hey avinash, any chance you can give an MRE here to show what you mean by one of the flows going missing? that sounds really strange and i’d like to understand what you’re seeing
a
Hey Nate, The code looks something like this
Copy code
deployments = [
    "run-extraction/run_{env}_extraction",
    "run-extraction/run_{env}_extraction",
    "run-extraction/run_{env}_extraction",
    "run-extraction/run_{env}_extraction",
    "run-extraction/run_{env}_extraction",
]
jobs = [
    run_deployment(deployment_name, *args, **kwargs)
    for deployment_name in deployments
]
await asyncio.gather(*jobs, return_exceptions=True)
But sometimes one of the
run_deployment
flows doesn’t get kicked off (or even registered). When we reached out to prefect support they mentioned that the process/thread could have got killed but there were no logs indicating that. We have enabled debug logs but the issue is sporadic so we are not able to reproduce it Some additional context: Prefect version: 2.14.17 We use agents and flows are kicked off as processes on the agent container itself. The flows are stored locally on the agent container. The agent runs on ECS.
n
ahh, so if you use return_exceptions like that, you’re telling asyncio to not raise any exceptions and return them in place of the return value. i’d be curious what you’d see if you printed the return of your gather
if for example, you provided a non existent deployment name to one of them and got a 404 from us, you might end up with a value error instead of getting a flow run from that
a
I would think that a run_deployment flow needs to have run in the first place to return something. I am not seeing the 5th run deployment. Since the flows themself return None or fail in this case I think we would see `[None, None, None, None]`when we expect 5 Nones if that makes sense.
The main issue is that it happens sporadically (once a week on random days) so I don’t believe there is an issue with the code itself
n
run_deployment returns a FlowRun object, not the return value of the flow run directly. you’d need to do flow_run.state.result() to get the value anyways, your point makes sense about the sporadic failure. however if you’re not getting the return values, then why await the calls at all? can you just pass timeout=0? that will make all the run_deployment calls non blocking and just kick off the flow runs
a
We have a custom run_deployment that does a couple of other things (wrote that when run_deployment wasnt added to the repo) but basically it waits for the flow run to finish using
wait_for_flow_run
. I will look into using that instead of our custom logic. > anyways, your point makes sense about the sporadic failure. however if you’re not getting the return values, then why await the calls at all? can you just pass timeout=0? that will make all the run_deployment calls non blocking and just kick off the flow runs We have downstream logic in the DAG that needs to wait until those runs are finished. So if I don’t gather them they will get kicked off immediately if I’m not mistaken
Our custom run_deployment
Copy code
async with get_client() as prefect_client:
        deployment = await prefect_client.read_deployment_by_name(deployment_name)
        <http://logger.info|logger.info>(
            f"""Finished reading deployment information for the provided deployment
            name: {deployment_name}. Got deployment: {deployment}"""
        )

        logger.debug("Going to create flow run for deployment")
        deployment_flow_run = await prefect_client.create_flow_run_from_deployment(
            deployment.id, *args, **kwargs
        )
        <http://logger.info|logger.info>(f"Created flow run: {deployment_flow_run} for deployment: {deployment_name}")

        flow_run_result = await wait_for_flow_run(
            flow_run_id=deployment_flow_run.id,
            timeout=timeout,
            poll_interval=polling_interval,
            client=prefect_client,
        )
        <http://logger.info|logger.info>(f"Flow run finished with: {flow_run_result}")
n
yeah you’re right, it would just continue down to your next logic. as far as the disappearing flow run, i still suspect that there’s a failure in one of the invoked flow runs, something “disappearing” is not something i’ve seen before (but you never know 👻 ) i would encourage you to check out events (if you’re on cloud), as i find they’re a good way to tie disparate deployment flow runs together without big parent flows who’s only job is to dispatch / wait
👀 1