lucian
10/07/2025, 6:20 AMMarvin
10/07/2025, 6:20 AMMarvin
10/07/2025, 6:26 AM.result() on a large mapped list, you end up blocking until each finishes unless you actively cancel the futures.
What to do instead
1) Don’t immediately block on the whole mapped list
- Launch the mapped tasks and keep the futures.
- Use a try/except KeyboardInterrupt to cancel all futures when you hit Ctrl+C.
Example (Prefect 3.x)
from prefect import flow, task
from prefect.futures import as_completed
@task
def do_work(x):
# do something long-running but interruptible if possible
return x * 2
@flow
def main(xs):
futures = do_work.map(xs)
try:
# Optionally consume results as they complete
for fut in as_completed(futures):
_ = fut.result()
except KeyboardInterrupt:
# Cancel outstanding work
for fut in futures:
try:
fut.cancel()
except Exception:
pass
# Re-raise so your process exits promptly
raise
if __name__ == "__main__":
main(list(range(1000)))
Notes:
- as_completed lets you break out immediately on Ctrl+C, then cancel remaining futures.
- Calling .result() on the mapped list itself will wait for all to finish; avoid that pattern if you want responsive cancellation.
2) If you are running as a deployment, cancel from the CLI/UI
- CLI:
prefect flow-run cancel <flow_run_id>
This transitions the flow run to Cancelling and instructs tasks/subflows to stop. We verified this command exists via prefect flow-run cancel --help.
3) Make tasks more cancellation-friendly
- Long loops: split into smaller chunks and return early between checkpoints.
- Use I/O or short sleeps inside loops so the task can be interrupted quickly.
- Add timeouts on tasks (@task(timeout_seconds=...)) for an upper bound on stuck work.
- Note: ThreadPool/ProcessPool cannot forcibly kill a currently running Python function mid-execution. Cancellation will stop queued tasks and prevent retries, but running ones may finish their current chunk of work.
4) Consider concurrency limits so you don’t have 1000 in-flight tasks
- Fewer simultaneous tasks means fewer stragglers to wait on during cancellation.
- Set the task runner’s max_workers or use concurrency limits.
If you share:
- Prefect version
- Task runner (thread/process/Dask/Ray)
- How you start the flow (python script vs deployment)
I can tailor the exact snippet or configuration to your setup.lucian
10/07/2025, 7:15 AMMarvin
10/07/2025, 7:15 AMMarvin
10/07/2025, 7:16 AMtask_name.map(items).result(), your driver blocks until all futures resolve, even after Ctrl+C, unless you handle cancellation yourself.
Patterns for responsive shutdown
- Don’t block on the whole mapped list; keep the futures and cancel them on Ctrl+C.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.futures import as_completed
@task
def do_work(x):
# make this cooperative if possible (small chunks/sleeps)
return x * 2
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def main(xs):
futures = do_work.map(xs)
try:
for fut in as_completed(futures):
_ = fut.result()
except KeyboardInterrupt:
for fut in futures:
try:
fut.cancel()
except Exception:
pass
raise
if __name__ == "__main__":
main(list(range(1000)))
- Make tasks cancellation-friendly:
- Break long CPU loops into smaller chunks and return early between iterations.
- Include short sleeps or I/O boundaries so Ctrl+C can be processed.
- Use per-task timeouts (@task(timeout_seconds=...)) as an upper bound.
- Reduce the number of in-flight tasks:
- Keep max_workers=10 or lower, so fewer threads need to drain during shutdown.
If you’re running as a deployment
- Cancel from the UI/CLI to request graceful shutdown:
prefect flow-run cancel <flow_run_id>
- This stops new work and cancels queued tasks. Running tasks still need to reach a safe point to exit, per Python’s threading semantics.
If you share whether you’re running locally (Python script) or via a deployment/worker, I can tailor the exact cancellation pattern to your setup.