<@ULVA73B9P> how to cancel 1000 tasks raised by t...
# marvin-ai
l
@Marvin how to cancel 1000 tasks raised by task_name.map(list).result(), when i press ctrl+c ? its not finished until all are done
m
thought for 355.3 seconds
Quick q to make sure I give the right fix: which Prefect version and task runner are you using (ThreadPool, ProcessPool, Dask/Ray), and are you running the flow locally from a Python script or as a deployment? High-level behavior - Pressing Ctrl+C raises KeyboardInterrupt in your driver process. Prefect will try to transition the flow to Cancelling/Cancelled, but Python won’t forcibly kill CPU-bound work inside threads/processes. If you immediately call
.result()
on a large mapped list, you end up blocking until each finishes unless you actively cancel the futures. What to do instead 1) Don’t immediately block on the whole mapped list - Launch the mapped tasks and keep the futures. - Use a try/except KeyboardInterrupt to cancel all futures when you hit Ctrl+C. Example (Prefect 3.x)
Copy code
from prefect import flow, task
from prefect.futures import as_completed

@task
def do_work(x):
    # do something long-running but interruptible if possible
    return x * 2

@flow
def main(xs):
    futures = do_work.map(xs)

    try:
        # Optionally consume results as they complete
        for fut in as_completed(futures):
            _ = fut.result()
    except KeyboardInterrupt:
        # Cancel outstanding work
        for fut in futures:
            try:
                fut.cancel()
            except Exception:
                pass
        # Re-raise so your process exits promptly
        raise

if __name__ == "__main__":
    main(list(range(1000)))
Notes: -
as_completed
lets you break out immediately on Ctrl+C, then cancel remaining futures. - Calling
.result()
on the mapped list itself will wait for all to finish; avoid that pattern if you want responsive cancellation. 2) If you are running as a deployment, cancel from the CLI/UI - CLI:
Copy code
prefect flow-run cancel <flow_run_id>
This transitions the flow run to Cancelling and instructs tasks/subflows to stop. We verified this command exists via
prefect flow-run cancel --help
. 3) Make tasks more cancellation-friendly - Long loops: split into smaller chunks and return early between checkpoints. - Use I/O or short sleeps inside loops so the task can be interrupted quickly. - Add timeouts on tasks (
@task(timeout_seconds=...)
) for an upper bound on stuck work. - Note: ThreadPool/ProcessPool cannot forcibly kill a currently running Python function mid-execution. Cancellation will stop queued tasks and prevent retries, but running ones may finish their current chunk of work. 4) Consider concurrency limits so you don’t have 1000 in-flight tasks - Fewer simultaneous tasks means fewer stragglers to wait on during cancellation. - Set the task runner’s
max_workers
or use concurrency limits. If you share: - Prefect version - Task runner (thread/process/Dask/Ray) - How you start the flow (python script vs deployment) I can tailor the exact snippet or configuration to your setup.
l
@Marvin how prefect is handling gracefull shutdown, when running tasks on 10 threads ?
m
thought for 62.6 seconds
Short answer: with ThreadPoolTaskRunner (10 threads), Prefect does a best-effort graceful shutdown. On Ctrl+C/SIGTERM it transitions the flow to Cancelling, stops scheduling new tasks, and attempts to cancel any not-yet-started futures. Python cannot preempt a running thread, so tasks already executing will generally finish their current work unless your code cooperatively exits early. That’s why it can feel like “it doesn’t finish until all are done.” What that means in practice - Flow-level: Prefect registers signal handlers and moves the flow run to Cancelling/Cancelled. The engine stops submitting new tasks. - Thread pool: Queued futures can be cancelled; already-running tasks in the 10 worker threads keep running until they return or raise. - Result collection: If you immediately do
task_name.map(items).result()
, your driver blocks until all futures resolve, even after Ctrl+C, unless you handle cancellation yourself. Patterns for responsive shutdown - Don’t block on the whole mapped list; keep the futures and cancel them on Ctrl+C.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed

@task
def do_work(x):
    # make this cooperative if possible (small chunks/sleeps)
    return x * 2

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def main(xs):
    futures = do_work.map(xs)
    try:
        for fut in as_completed(futures):
            _ = fut.result()
    except KeyboardInterrupt:
        for fut in futures:
            try:
                fut.cancel()
            except Exception:
                pass
        raise

if __name__ == "__main__":
    main(list(range(1000)))
- Make tasks cancellation-friendly: - Break long CPU loops into smaller chunks and return early between iterations. - Include short sleeps or I/O boundaries so Ctrl+C can be processed. - Use per-task timeouts (
@task(timeout_seconds=...)
) as an upper bound. - Reduce the number of in-flight tasks: - Keep
max_workers=10
or lower, so fewer threads need to drain during shutdown. If you’re running as a deployment - Cancel from the UI/CLI to request graceful shutdown:
Copy code
prefect flow-run cancel <flow_run_id>
- This stops new work and cancels queued tasks. Running tasks still need to reach a safe point to exit, per Python’s threading semantics. If you share whether you’re running locally (Python script) or via a deployment/worker, I can tailor the exact cancellation pattern to your setup.