lucian
10/10/2025, 7:31 AMMarvin
10/10/2025, 7:31 AMMarvin
10/10/2025, 7:37 AM.map API suggests Prefect 1.x. In Prefect 3.x, we use task.submit(...) instead of .map.
- Are you running the flow locally (e.g., python script.py) or as a deployment via a worker (Docker/K8s/etc.)?
- Which executor/task runner are you using (Dask, threads, processes)?
Why this happens (short version)
- Prefect 3.x: Cancellation is cooperative. Ctrl+C/SIGTERM asks tasks to stop, but synchronous tasks running in a thread pool can’t be force-stopped by Python — they only stop when they return to the event loop. Long blocking calls (e.g., time.sleep, requests without timeouts) will ignore cancellation until they finish.
- Prefect 1.x with .map (usually backed by Dask): Ctrl+C often won’t propagate cleanly to worker threads/processes. Dask workers running mapped tasks may not stop immediately; the parent process sees the signal but the running mapped tasks continue.
Recommended approaches
If you’re on Prefect 3.x:
- Use task.submit instead of .map.
- Add timeouts to tasks so they can be marked as cancelled and stop faster:
from prefect import task
@task(timeout_seconds=60)
def work(x):
...
- Prefer a process-based runner for CPU-bound or long blocking work; processes can be terminated by the OS:
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task(timeout_seconds=60)
def work(x): ...
@flow(task_runner=ProcessPoolTaskRunner())
def my_flow(items):
futures = [work.submit(i) for i in items]
return [f.result() for f in futures]
- Make I/O operations interruptible: always set timeouts on network calls (e.g., requests.get(..., timeout=10)), avoid time.sleep in sync tasks (use anyio.sleep in async tasks), and periodically yield or check for cancellation.
- If the run is orchestrated, cancel from the UI or CLI. We verified the command is available:
prefect flow-run cancel <flow-run-id>
If you’re on Prefect 1.x with `.map`:
- This is a known limitation with mapped tasks and Dask signal handling — Ctrl+C is not reliably propagated to all workers running mapped tasks.
- Workarounds:
- Consider upgrading to Prefect 3.x where cancellation is much more predictable with structured concurrency powered by AnyIO. See How AnyIO powers Prefect’s async architecture.
- If you must stay on 1.x, ensure you can cancel Dask futures explicitly (e.g., by keeping references to futures and calling client.cancel(futures)), and prefer process-based workers for killability.
If you can share a small snippet of how you’re launching the concurrent tasks and your Prefect version, I can point you to the most precise fix for your setup.