Avinash Santhanagopalan
02/29/2024, 11:47 AMrun_deployment
coroutines that are all flows. When I do an asyncio.gather
sometimes one of the flows goes missing. Has anybody experienced something like this? Prefect support were less than useful when I reached outManoj Ravi
02/29/2024, 12:53 PMAvinash Santhanagopalan
02/29/2024, 12:59 PMNate
02/29/2024, 2:05 PMAvinash Santhanagopalan
02/29/2024, 2:11 PMdeployments = [
"run-extraction/run_{env}_extraction",
"run-extraction/run_{env}_extraction",
"run-extraction/run_{env}_extraction",
"run-extraction/run_{env}_extraction",
"run-extraction/run_{env}_extraction",
]
jobs = [
run_deployment(deployment_name, *args, **kwargs)
for deployment_name in deployments
]
await asyncio.gather(*jobs, return_exceptions=True)
But sometimes one of the run_deployment
flows doesn’t get kicked off (or even registered). When we reached out to prefect support they mentioned that the process/thread could have got killed but there were no logs indicating that. We have enabled debug logs but the issue is sporadic so we are not able to reproduce it
Some additional context:
Prefect version: 2.14.17
We use agents and flows are kicked off as processes on the agent container itself. The flows are stored locally on the agent container. The agent runs on ECS.Nate
02/29/2024, 2:15 PMNate
02/29/2024, 2:16 PMAvinash Santhanagopalan
02/29/2024, 2:18 PMAvinash Santhanagopalan
02/29/2024, 2:20 PMNate
02/29/2024, 2:22 PMAvinash Santhanagopalan
02/29/2024, 2:25 PMwait_for_flow_run
. I will look into using that instead of our custom logic.
> anyways, your point makes sense about the sporadic failure. however if you’re not getting the return values, then why await the calls at all? can you just pass timeout=0? that will make all the run_deployment calls non blocking and just kick off the flow runs
We have downstream logic in the DAG that needs to wait until those runs are finished. So if I don’t gather them they will get kicked off immediately if I’m not mistakenAvinash Santhanagopalan
02/29/2024, 2:27 PMasync with get_client() as prefect_client:
deployment = await prefect_client.read_deployment_by_name(deployment_name)
<http://logger.info|logger.info>(
f"""Finished reading deployment information for the provided deployment
name: {deployment_name}. Got deployment: {deployment}"""
)
logger.debug("Going to create flow run for deployment")
deployment_flow_run = await prefect_client.create_flow_run_from_deployment(
deployment.id, *args, **kwargs
)
<http://logger.info|logger.info>(f"Created flow run: {deployment_flow_run} for deployment: {deployment_name}")
flow_run_result = await wait_for_flow_run(
flow_run_id=deployment_flow_run.id,
timeout=timeout,
poll_interval=polling_interval,
client=prefect_client,
)
<http://logger.info|logger.info>(f"Flow run finished with: {flow_run_result}")
Nate
02/29/2024, 2:33 PM