lucian
10/13/2025, 6:34 AMMarvin
10/13/2025, 6:34 AMMarvin
10/13/2025, 6:41 AMrequests.get()
, CPU loops, large I/O) without yielding back to the loop, SIGINT is queued but not acted on until the code hits an await/checkpoint.
- Fix: Offload sync work to a thread and await it so cancellation can be observed.
from prefect.utilities.asyncutils import run_sync_in_worker_thread
@task
async def fetch(url):
return await run_sync_in_worker_thread(requests.get, url, timeout=10)
Or yield checkpoints in long loops:
import anyio
async def crunch(items):
for i, item in enumerate(items):
# do a bit of work
if i % 1000 == 0:
await anyio.sleep(0) # cooperative checkpoint
- Cancellation is being swallowed
- Catching Exception
or BaseException
without re-raising asyncio.CancelledError
will prevent proper shutdown.
try:
await do_work()
except asyncio.CancelledError:
raise # let cancellation propagate
except Exception:
... # handle real errors
- Using shields (e.g., anyio CancelScope(shield=True)
) will block cancellation by design. Use sparingly.
- You’re pressing Ctrl+C in a different process than the flow run
- When running via prefect worker start
, Ctrl+C reaches the worker process first. Prefect attempts a graceful shutdown and forwards signals to the child flow-run process, but details vary by OS/infrastructure.
- In Docker/Kubernetes, Ctrl+C in your local terminal does not reach the container/job unless you’re attached and signals are properly forwarded (e.g., Docker with `--init`/tini).
- In these cases, cancel from the Prefect UI or CLI:
prefect flow-run ls
prefect flow-run cancel <flow-run-id>
(Verified: prefect flow-run cancel
exists in 3.x.)
- OS/terminal differences
- On Windows, signal delivery is more limited and only the main thread can receive SIGINT. Some terminals (e.g., VS Code, Git Bash) handle Ctrl+C differently. Prefer UI/CLI cancellation in those environments.
- Async concurrency semantics
- asyncio.gather(..., return_exceptions=True)
plus imperfect handling of CancelledError
can mask cancellation. Ensure you let cancellations bubble up or explicitly cancel remaining tasks.
Practical checklist
- Offload sync I/O from async tasks with await run_sync_in_worker_thread(...)
so the loop remains cancellable.
- Add periodic await anyio.sleep(0)
in long-running loops to create cancellation checkpoints.
- Don’t swallow CancelledError
.
- If running under a worker, Docker, or Kubernetes, use the UI or:
prefect flow-run cancel <flow-run-id>
- For Docker, make sure signals are forwarded (run the container with an init like tini or --init
).
- For Windows, rely on the UI/CLI for cancellation if Ctrl+C is unreliable.
A couple of helpful references:
- AnyIO cancellation semantics
- asyncio task cancellation
If you can share a bit more, I can pinpoint the root cause:
- How are you running the flow when Ctrl+C doesn’t stop it? (local script, prefect worker start
, Docker, Kubernetes)
- OS and terminal?
- What does the async task do — any long sync I/O or CPU-bound work?
- A minimal code snippet where Ctrl+C doesn’t stop it would be perfect.lucian
10/13/2025, 7:54 AMasyncio.TaskGroup
. with prefect cocurrency workers ?Marvin
10/13/2025, 7:54 AMMarvin
10/13/2025, 7:58 AMimport asyncio
from prefect import flow, get_run_logger
from prefect.concurrency.asyncio import concurrency
async def fetch_one(i):
# This acquires a slot from the Prefect concurrency limit named "external-api"
async with concurrency("external-api", occupy=1, timeout_seconds=60):
# do your I/O here (httpx, aioboto3, etc.)
return i
@flow
async def main(n: int = 20):
logger = get_run_logger()
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(i)) for i in range(n)]
# If you need results, collect from the created tasks you tracked
<http://logger.info|logger.info>("Done")
- Create the concurrency limit named “external-api” in the Prefect UI (Administration > Concurrency limits) with your desired slot count.
- Each coroutine that enters async with concurrency("external-api")
will wait until a slot is available. TaskGroup controls how many you schedule locally; the Prefect limit gates the global maximum across runs.
2) Prefect tasks + Prefect concurrency limits (recommended for task-level work)
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def fetch_task(i: int):
async with concurrency("external-api", occupy=1, timeout_seconds=60):
# do async I/O work here
return i
@flow
async def main(n: int = 20):
# submit a bunch of tasks
futures = [fetch_task.submit(i) for i in range(n)]
# await results in parallel
results = await asyncio.gather(*(f.result() for f in futures))
return results
- Put the async with concurrency(...)
inside the task so every task acquires/releases a slot correctly.
- Prefer this for Prefect tasks instead of running tasks directly in a TaskGroup. Submit tasks, then await the futures.
3) Combine local caps with global Prefect limits
If you want to cap local concurrency as well as global limits:
import asyncio
from prefect import flow
from prefect.concurrency.asyncio import concurrency
sema = asyncio.Semaphore(10) # local cap
async def work(i):
async with sema:
async with concurrency("external-api", occupy=1):
# do work
...
@flow
async def main(n=100):
async with asyncio.TaskGroup() as tg:
for i in range(n):
tg.create_task(work(i))
Notes and gotchas
- Don’t swallow cancellation: if a flow is cancelled, TaskGroup
will cancel children; avoid catching CancelledError
without re-raising.
- If your “async” code does long synchronous work, offload it so cancellation and fairness work as expected (e.g., await prefect.utilities.asyncutils.run_sync_in_worker_thread(...)
).
- Avoid mixing TaskGroup to run Prefect tasks directly. Submit tasks and await their futures instead, or keep TaskGroup for non-Prefect helper coroutines.
- To rate limit instead of slot limit, there’s also prefect.concurrency.asyncio.rate_limit(...)
with a similar pattern.
If you can share:
- whether you want to parallelize native coroutines, Prefect tasks, or both
- your target global limit and whether you also want a local cap
…I can tailor the example to your exact use case. For Python 3.11 TaskGroup semantics, see the Python docs: asyncio.TaskGroup. For Prefect concurrency limits, see the Prefect docs (Concurrency concept page).lucian
10/13/2025, 8:09 AMMarvin
10/13/2025, 8:09 AMMarvin
10/13/2025, 8:09 AMimport asyncio
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.asyncio import concurrency
async def fetch_one(i: int):
# Acquire a global slot before touching the shared resource
async with concurrency("external-api", occupy=1, timeout_seconds=60):
# do async I/O (e.g., httpx)
return i
@task
async def fetch_batch(batch: list[int]):
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(i)) for i in batch]
# TaskGroup raises on first failure; results can be collected if needed
# if you need the results, track them as you create the tasks
return True
@flow(task_runner=ThreadPoolTaskRunner(max_workers=15))
def main(items: list[int]):
# Submit multiple batches as Prefect tasks (up to 15 at a time)
batch_size = 50
futures = [fetch_batch.submit(items[i:i+batch_size])
for i in range(0, len(items), batch_size)]
return [f.result() for f in futures]
- ThreadPoolTaskRunner caps concurrent Prefect tasks at 15.
- Inside each task, TaskGroup gives fast, local async fan-out.
- The Prefect concurrency context concurrency("external-api")
enforces your global limit across all flows/runs.
B) If your tasks are sync, use the sync concurrency context
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import concurrency
@task
def sync_task(i: int):
with concurrency("external-api", occupy=1, timeout_seconds=60):
# blocking I/O here (requests, boto3, etc.)
return i
@flow(task_runner=ThreadPoolTaskRunner(max_workers=15))
def main(items: list[int]):
futures = [sync_task.submit(i) for i in items]
return [f.result() for f in futures]
- Use prefect.concurrency.sync.concurrency
for blocking code running in a thread.
C) If you want TaskGroup at the flow level
- Make the flow async and run coroutines in a TaskGroup. Be aware that those coroutines are not Prefect tasks (so they won’t appear as separate task runs in the UI).
- Still wrap the critical section with prefect.concurrency.asyncio.concurrency(...)
.
- Keep using ThreadPoolTaskRunner for any Prefect tasks you submit from that flow — it won’t affect your flow-level TaskGroup.
Gotchas
- Don’t mix TaskGroup to run Prefect tasks themselves; submit tasks with .submit
and await their .result()
— let the task runner handle concurrency.
- Each TaskGroup inside a task can fan out many coroutines; be mindful of oversubscription. Use a local Semaphore to cap per-task fan-out if needed.
- Use the async concurrency context for async code and the sync context inside blocking/threaded code.
- Create the “external-api” (or your chosen name) concurrency limit in the Prefect UI with the desired slot count so the contexts can acquire it.
If you share whether your inner work is async (httpx/aioboto3/etc.) or sync (requests/boto3), I can tailor the code to your exact case (including optional rate limiting). Python TaskGroup docs: asyncio.TaskGroup.