<@ULVA73B9P> I am self-hosting Prefect and we spaw...
# ask-marvin
c
@Marvin I am self-hosting Prefect and we spawn docker containers to execute our Python code. When I click Cancel in the UI on a running flow - how should I change my Prefect flow to handle cancellation? What I am seeing is that my pipeline sometimes cancels, other times remains in Cancelling and proceeds to run to completion.
m
thought for 389.7 seconds
Great question — and you’re bumping into the exact edge of how cancellations work in Prefect 3.x. Short version: cancellations are cooperative. The UI sets the run to Cancelling and the worker attempts to stop the job, but your code must yield/return control periodically for Prefect to deliver the cancel. If the flow/task is stuck in a blocking call (CPU-bound loop,
time.sleep
, a long DB call without timeouts, etc.) it may ignore the cancel and run to completion. What happens on Cancel - Server marks the run Cancelling; the worker (Docker in your case) is instructed to stop the job. - For Docker workers, the container is sent a stop (SIGTERM; Docker may follow with SIGKILL after a timeout). Inside the container, Prefect’s engine uses AnyIO cancel scopes; cancellation propagates through the flow/tasks. - If your code does not yield to the event loop or ignores the signal (e.g., blocking C extensions, infinite
time.sleep
, shell wrappers that swallow signals), the run can remain in Cancelling and finish anyway. Make your flow “cancellation-friendly” Use these patterns so your code cooperates with cancellation: 1) Prefer async and await something periodically - Cancellation is delivered at await points.
Copy code
from prefect import flow, task
import anyio

@task
async def long_task():
    try:
        for _ in range(1000):
            # Do a chunk of work...
            await anyio.sleep(0)  # yield control so cancellation can be delivered
    finally:
        # cleanup goes here; runs on cancel, failure, or success
        ...
        
@flow
async def my_flow():
    await long_task()
2) Avoid long blocking sync calls in tasks - If you must run sync/blocking code, run it off the event loop so Prefect can still deliver cancellation:
Copy code
from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread

def do_blocking_work():
    # Avoid single monolithic calls; break into chunks if possible
    # and check between chunks
    for _ in range(1000):
        # small unit of work
        ...

@task
async def wrapper():
    try:
        await run_sync_in_worker_thread(do_blocking_work)
    finally:
        # cleanup
        ...
Note: cancellation of a blocking thread only takes effect at cooperative points (e.g., between chunks). Don’t rely on interrupting a single long blocking call. 3) Replace sleeps and blocking waits with cooperative equivalents - Avoid
time.sleep(...)
inside tasks. In async code, use
await anyio.sleep(...)
. - For polling loops, add an await or short sleep to yield. 4) Add timeouts where appropriate - Timeouts won’t make a run “cancel on click,” but they prevent unbounded blocking and make cancellation observable: - Task-level:
@task(timeout_seconds=...)
- Context managers:
prefect.utilities.timeout.timeout(...)
or
timeout_async(...)
Links: Prefect timeout utilities 5) Use Prefect’s process helpers for subprocesses - If you shell out or spawn processes, prefer Prefect’s helpers which forward signals and clean up on cancel:
Copy code
from prefect import task
from prefect.utilities.processutils import run_process

@task
async def run_cmd():
    # Streams output and cooperates with cancellation
    await run_process(["bash", "-lc", "your_long_command"], stream_output=True)
Code ref: processutils.py 6) Clean up in finally blocks - When cancellation occurs, your task body will unwind; put cleanup in
finally:
so it always runs. 7) For Docker specifically - Ensure your container’s entrypoint forwards signals to the Python process. If you wrap Prefect in a shell script, use
exec python ...
so PID 1 is your Python process or use an init like
tini
.
- If your process ignores SIGTERM, Docker will send SIGKILL after a timeout; until then you’ll see “Cancelling.” You can tune stop behavior via work pool job variables, e.g. set
container_create_kwargs
(stop_signal/stop_timeout) if needed. - Keep your worker and
prefect-docker
packages up to date. Useful references - AnyIO/async architecture: How AnyIO powers Prefect - Async utilities: prefect.utilities.asyncutils - Timeout utilities: prefect.utilities.timeout Why you’re seeing inconsistent behavior - If a task happens to be between operations (yielding/awaiting), cancellation is honored quickly. - If it’s inside a long, blocking call (DB query without timeout, large CPU loop,
time.sleep
, third-party C extension), the cancel won’t take effect until that call returns — which can look like “Cancelling” forever, then the run completes. If you can share a bit more, I can suggest targeted changes: - Prefect version and worker type/version (assuming Prefect 3.x + Docker worker?) - Your container base image/entrypoint (shell script or direct Python/Prefect?) - Where the run spends most of its time when it won’t cancel (network I/O, CPU loop, subprocess, sleep)? - Any long-running libraries involved (database driver, SDKs, etc.) and whether timeouts are configured. Meanwhile, try the patterns above (especially adding small await points, using
run_process
for external commands, and avoiding monolithic blocking calls). Setting
PREFECT_LOGGING_LEVEL=DEBUG
in the container can also help you confirm when cancellation is being received.