<@ULVA73B9P> why i cannot stop a flow when i press...
# marvin-ai
l
@Marvin why i cannot stop a flow when i press ctrl+c, sigterm , if the flow is running concurrent tasks using .map ?
m
thought for 315.0 seconds
Great question — cancellation behavior depends on your Prefect version and how you’re doing concurrency. A couple quick clarifiers to help me give you the exact fix: - Which Prefect version are you using? The
.map
API suggests Prefect 1.x. In Prefect 3.x, we use
task.submit(...)
instead of
.map
. - Are you running the flow locally (e.g.,
python script.py
) or as a deployment via a worker (Docker/K8s/etc.)? - Which executor/task runner are you using (Dask, threads, processes)? Why this happens (short version) - Prefect 3.x: Cancellation is cooperative. Ctrl+C/SIGTERM asks tasks to stop, but synchronous tasks running in a thread pool can’t be force-stopped by Python — they only stop when they return to the event loop. Long blocking calls (e.g.,
time.sleep
,
requests
without timeouts) will ignore cancellation until they finish. - Prefect 1.x with
.map
(usually backed by Dask): Ctrl+C often won’t propagate cleanly to worker threads/processes. Dask workers running mapped tasks may not stop immediately; the parent process sees the signal but the running mapped tasks continue. Recommended approaches If you’re on Prefect 3.x: - Use
task.submit
instead of
.map
. - Add timeouts to tasks so they can be marked as cancelled and stop faster:
Copy code
from prefect import task

  @task(timeout_seconds=60)
  def work(x):
      ...
- Prefer a process-based runner for CPU-bound or long blocking work; processes can be terminated by the OS:
Copy code
from prefect import flow, task
  from prefect.task_runners import ProcessPoolTaskRunner

  @task(timeout_seconds=60)
  def work(x): ...

  @flow(task_runner=ProcessPoolTaskRunner())
  def my_flow(items):
      futures = [work.submit(i) for i in items]
      return [f.result() for f in futures]
- Make I/O operations interruptible: always set timeouts on network calls (e.g.,
requests.get(..., timeout=10)
), avoid
time.sleep
in sync tasks (use
anyio.sleep
in async tasks), and periodically yield or check for cancellation. - If the run is orchestrated, cancel from the UI or CLI. We verified the command is available:
Copy code
prefect flow-run cancel <flow-run-id>
If you’re on Prefect 1.x with `.map`: - This is a known limitation with mapped tasks and Dask signal handling — Ctrl+C is not reliably propagated to all workers running mapped tasks. - Workarounds: - Consider upgrading to Prefect 3.x where cancellation is much more predictable with structured concurrency powered by AnyIO. See How AnyIO powers Prefect’s async architecture. - If you must stay on 1.x, ensure you can cancel Dask futures explicitly (e.g., by keeping references to futures and calling
client.cancel(futures)
), and prefer process-based workers for killability. If you can share a small snippet of how you’re launching the concurrent tasks and your Prefect version, I can point you to the most precise fix for your setup.