Giacomo Chiarella
02/26/2025, 1:41 PMJenny
02/26/2025, 5:22 PMGiacomo Chiarella
02/26/2025, 5:42 PMpython -m pip install prefect==3.2.7
. Once this is done, I’ve deployed this flow
from prefect import flow, task
@task(name="simple_task", retries=1, retry_delay_seconds=30, retry_jitter_factor=1)
def simple_task():
raise Exception("Test")
@flow(name=DAG_NAME, retries=1, retry_delay_seconds=30, description="Test")
def flow_entrypoint():
a = simple_task.submit()
a.wait()
if str.lower(a.state.name) != "completed":
raise Exception("Flow error")
if I just run a brand new flow run everything works as expected and the flow run fails. If, once it failed, I use the UI button to retry the flow run gets stuck in AwaitingRetry. The flow run concurrency is set to 1 and there are no other flow runs at all in the whole Prefect instance. It is stuck indefinitely, I had to cancel it after 2 hours it was in AwaitingRetry. The work pool is of type Process. I think there is something off with the retry button submission because everything else works as expectedJenny
02/26/2025, 6:12 PMGiacomo Chiarella
02/26/2025, 6:13 PMJenny
02/26/2025, 6:27 PMGiacomo Chiarella
02/27/2025, 8:03 AMa.wait()
logger.info(a.task_run_id)
task_run_name = asyncio.run(get_task_run_name_by_id(prefect_future.task_run_id))
where the function get_task_run_name_by_id is simply this
async def get_task_run_name_by_id(task_run_id: UUID):
async with get_client() as client:
task_run = await client.read_task_run(task_run_id)
return task_run.name
the reason I’m doing that is because I check the state a.state.name
in order to make the flow run fails if the task is not in completed state. The task run name is needed to log each task run with its state. Sometimes, randomly, I get
Encountered exception during execution: ObjectNotFound(None)
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration/__init__.py", line 843, in read_task_run
response = await self._client.get(f"/task_runs/{task_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1768, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1540, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 354, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 162, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '<http://prefect_orion:4200/api/task_runs/8317922a-ebd4-4eaa-85ed-7c374c07e69c>'
Response: {'detail': 'Task not found'}
when I call task_run = await client.read_task_run(task_run_id)
and the strange thing is that I get this error at the very beginning of the flow, when the task is not started. Basically, the wait method is not waiting. Is it possible it is called too early. Why this happens? Is there a better way to wait for a task to finish? Can I mitigate this behaviour with something better than a time.sleep() before the wait? The logger after the wait (which does not wait) prints the task run id. Looks like the task run is not created yet in the database and the wait does not hold for the execution to finishGiacomo Chiarella
02/27/2025, 3:43 PMJenny
02/27/2025, 8:04 PMGiacomo Chiarella
02/28/2025, 7:28 AM