Giacomo Chiarella
02/28/2025, 7:32 AMtask_future.wait()
<http://logger.info|logger.info>(task_future.task_run_id)
task_run_name = asyncio.run(get_task_run_name_by_id(task_future.task_run_id))
where the function get_task_run_name_by_id is simply this
async def get_task_run_name_by_id(task_run_id: UUID):
async with get_client() as client:
task_run = await client.read_task_run(task_run_id)
return task_run.name
the reason I’m doing that is because I check the state task_future.state.name
in order to make the flow run fails if the task is not in completed state. The task run name is needed to log each task run with its state. Sometimes, randomly, I get
Encountered exception during execution: ObjectNotFound(None)
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/__init__.py", line 843, in read_task_run
response = await self._client.get(f"/task_runs/{task_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1768, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1540, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 354, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect_orion:4200/api/task_runs/8317922a-ebd4-4eaa-85ed-7c374c07e69c>'
Response: {'detail': 'Task not found'}
when I call task_run = await client.read_task_run(task_run_id)
and the strange thing is that I get this error at the very beginning of the flow, when the task is not started. Basically, the wait method is not waiting. Looks like the wait call reaches even before the task is created. Why this happens? Is there a better way to wait for a task to finish than look at each tasks future? Can I mitigate this behaviour with something better than a time.sleep() before the wait? The logger after the wait (which does not wait) prints the task run id. Looks like the task run is not created yet in the database and the wait does not hold for the execution to finish.
At the moment I mitigated by using a time.sleep before any future.wait()
but I wonder if there is a Prefect way to do that