Jean Du
10/15/2025, 10:30 AM15:49:14.892 | ERROR | asyncio - Exception in callback FlowRunCancellingObserver._start_polling_task.<locals>.<lambda>(<Task cancell...rvices.py:21>>) at /usr/local/lib/python3.11/site-packages/prefect/runner/_observers.py:99
handle: <Handle FlowRunCancellingObserver._start_polling_task.<locals>.<lambda>(<Task cancell...rvices.py:21>>) at /usr/local/lib/python3.11/site-packages/prefect/runner/_observers.py:99>
Traceback (most recent call last):
File "/usr/local/lib/python3.11/asyncio/events.py", line 84, in _run
self._context.run(self._callback, *self._args)
File "/usr/local/lib/python3.11/site-packages/prefect/runner/_observers.py", line 103, in <lambda>
if task.exception()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/services.py", line 64, in critical_service_loop
await workload()
File "/usr/local/lib/python3.11/site-packages/prefect/runner/_observers.py", line 131, in _check_for_cancelled_flow_runs
typed_cancelling_flow_runs = await self._client.read_flow_runs(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration/_flow_runs/client.py", line 813, in read_flow_runs
response = await self.request("POST", "/flow_runs/filter", json=body)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/orchestration/base.py", line 53, in request
return await self._client.send(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 348, in send
response = await self._send_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/client/base.py", line 268, in _send_with_retry
response = await send(request, *send_args, **send_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1629, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1657, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1694, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1730, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 394, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request
raise exc from None
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 236, in handle_async_request
response = await connection.handle_async_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 103, in handle_async_request
return await self._connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 136, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 106, in handle_async_request
) = await self._receive_response_headers(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 177, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 217, in _receive_event
data = await self._network_stream.read(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 35, in read
return await self._stream.receive(max_bytes=max_bytes)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 1263, in receive
await self._protocol.read_event.wait()
File "/usr/local/lib/python3.11/asyncio/locks.py", line 213, in wait
await fut
asyncio.exceptions.CancelledErrorMarvin
10/15/2025, 10:30 AMMarvin
10/15/2025, 10:32 AMprefect version)
- Where you see this (during Ctrl+C, container shutdown, end of run, or randomly mid-run)
- Whether cancellations are working as expected
…I can confirm whether it’s purely cosmetic in your case or help troubleshoot further.Jean Du
10/15/2025, 10:44 AMVersion: 3.4.20
API version: 0.8.4
Python version: 3.11.13
Git commit: 5d7d5eb6
Built: Thu, Sep 25, 2025 09:04 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: server
Pydantic version: 2.11.9
Server:
Database: sqlite
SQLite version: 3.46.1Jean Du
10/15/2025, 11:03 AMMarvin
10/15/2025, 11:03 AMMarvin
10/15/2025, 11:07 AMforce=True via the client:
import asyncio, uuid
from prefect.client.orchestration import get_client
from prefect.states import Cancelled
async def force_cancel(flow_run_id: str):
async with get_client() as client:
await client.set_flow_run_state(uuid.UUID(flow_run_id), Cancelled(name="Force-cancelled"), force=True)
asyncio.run(force_cancel("<FLOW_RUN_ID>"))
- To quickly find the run IDs stuck in Cancelling:
prefect flow-run ls --state CANCELLING --limit 50
Stabilize going forward
- Upgrade to the latest Prefect 3.x — there have been improvements to cancellation handling and the FlowRunCancellingObserver (see PR above).
- Check the worker logs around the time you clicked Cancel. If you see websocket errors or disconnects, the polling fallback in newer versions helps.
- If your worker concurrency is 1, a stuck Cancelling run will block the next start; after upgrading, also consider temporarily restarting workers when you see cancellations not taking effect.
Help me reproduce
Could you share:
- prefect version
- Worker type and where it’s running (process/Docker/K8s/ECS; Prefect Cloud or Server)
- How you issued the cancel (UI vs CLI vs code)
- A snippet of worker logs around the cancellation time
With that, I can pinpoint whether you’re hitting the known observer issue or something environment-specific and suggest a tighter fix.Jean Du
10/15/2025, 12:38 PMMarvin
10/15/2025, 12:38 PMMarvin
10/15/2025, 12:39 PMimport asyncio, uuid
from prefect.client.orchestration import get_client
from prefect.states import Cancelled
async def force_cancel(flow_run_id: str):
async with get_client() as client:
await client.set_flow_run_state(
uuid.UUID(flow_run_id),
Cancelled(name="Force-cancelled"),
force=True,
)
asyncio.run(force_cancel("<FLOW_RUN_ID>"))
To list stuck runs:
prefect flow-run ls --state CANCELLING --limit 50
Hardening going forward
- Upgrade Prefect to the latest 3.4.x. The observer/polling fallback and other cancellation fixes landed after 3.4.20, and they specifically target cases where cancellations don’t complete.
- Add a flow-level timeout so the engine cancels from inside the run (often more reliable than an external cancel alone):
from prefect import flow
@flow(timeout_seconds=5*60*60)
def my_flow():
...
- Consider a safety net for Cancelling-too-long:
- If possible in your setup, add a follow-up Automation that detects “state=Cancelling for > N minutes” and escalates (e.g., ChangeFlowRunState to Cancelled). If you need a true force, a small service/script using the client with force=True can be triggered by a webhook action from Automations.
- If your work pool concurrency is 1, a stuck Cancelling run will block the next. As a mitigation, you can temporarily allow concurrency >1 so one stuck run doesn’t delay the schedule (while still fixing the underlying issue).
A couple of questions to pinpoint the root cause
- Which worker/infrastructure are you using (process, Docker, K8s, ECS)? This affects how the cancel signal ultimately terminates the run.
- Do the worker logs around the automation’s cancel timestamp show websocket errors or disconnects? The polling fallback in newer versions addresses that case.
If you can share the worker type and we confirm upgrading is feasible, I can suggest any infra-specific tweaks to make cancellation immediate in your environment.Jean Du
10/15/2025, 2:18 PM/usr/local/lib/python3.11/site-packages/pydantic/_internal/_generate_schema.py:2249: UnsupportedFieldAttributeWarning: The 'default' attribute with value 'UTC' was provided to the `Field()` function, which has no effect in the context it was used. 'default' is field-specific metadata, and can only be attached to a model field using `Annotated` metadata or by assignment. This may have happened because an `Annotated` type alias using the `type` statement was used, or if the `Field()` function was attached to a single member of a union type.
warnings.warn(Marvin
10/15/2025, 2:19 PMMarvin
10/15/2025, 2:20 PMexport PYTHONWARNINGS="ignore:.*UnsupportedFieldAttributeWarning.*:UserWarning"
PowerShell:
$env:PYTHONWARNINGS='ignore:.*UnsupportedFieldAttributeWarning.*:UserWarning'
- Or at the very top of your entrypoint (before importing Prefect or your flow code):
import warnings
warnings.filterwarnings(
"ignore",
message=r".*UnsupportedFieldAttributeWarning.*",
category=UserWarning,
)
- If you prefer not to suppress: you can safely ignore it until we release a patch that removes the offending Field(default=...) usage.
A couple follow-ups so I can track this and get it fixed upstream:
- What exact Prefect version are you on now (e.g., 3.4.23)?
- What Pydantic version is installed (pip show pydantic)?
- If you can, run once with default warning formatting so we get the full stack (helps pinpoint the exact field):
PYTHONWARNINGS=default <your command>
and paste the stack that points into Prefect.
Separately, did upgrading resolve your Cancelling-stuck issue from the automation? If not, I can help dig deeper.