<@ULVA73B9P> why is ctrl+c stopping a flow sometim...
# marvin-ai
l
@Marvin why is ctrl+c stopping a flow sometimes when running aync work but other times it does not
m
thought for 415.2 seconds
Great question — Ctrl+C (SIGINT) will only stop a Prefect flow if the signal reaches the right process and the event loop has a chance to process it. With async workloads, a few common patterns make this feel inconsistent. Why it sometimes stops and sometimes doesn’t - The event loop is blocked by sync/CPU work - Cancellation in asyncio/anyio happens at “checkpoints” (awaits). If your async task is doing long synchronous work (e.g.,
requests.get()
, CPU loops, large I/O) without yielding back to the loop, SIGINT is queued but not acted on until the code hits an await/checkpoint. - Fix: Offload sync work to a thread and await it so cancellation can be observed.
Copy code
from prefect.utilities.asyncutils import run_sync_in_worker_thread

    @task
    async def fetch(url):
        return await run_sync_in_worker_thread(requests.get, url, timeout=10)
Or yield checkpoints in long loops:
Copy code
import anyio

    async def crunch(items):
        for i, item in enumerate(items):
            # do a bit of work
            if i % 1000 == 0:
                await anyio.sleep(0)  # cooperative checkpoint
- Cancellation is being swallowed - Catching
Exception
or
BaseException
without re-raising
asyncio.CancelledError
will prevent proper shutdown.
Copy code
try:
        await do_work()
    except asyncio.CancelledError:
        raise  # let cancellation propagate
    except Exception:
        ...  # handle real errors
- Using shields (e.g., anyio
CancelScope(shield=True)
) will block cancellation by design. Use sparingly. - You’re pressing Ctrl+C in a different process than the flow run - When running via
prefect worker start
, Ctrl+C reaches the worker process first. Prefect attempts a graceful shutdown and forwards signals to the child flow-run process, but details vary by OS/infrastructure. - In Docker/Kubernetes, Ctrl+C in your local terminal does not reach the container/job unless you’re attached and signals are properly forwarded (e.g., Docker with `--init`/tini). - In these cases, cancel from the Prefect UI or CLI:
Copy code
prefect flow-run ls
    prefect flow-run cancel <flow-run-id>
(Verified:
prefect flow-run cancel
exists in 3.x.) - OS/terminal differences - On Windows, signal delivery is more limited and only the main thread can receive SIGINT. Some terminals (e.g., VS Code, Git Bash) handle Ctrl+C differently. Prefer UI/CLI cancellation in those environments. - Async concurrency semantics -
asyncio.gather(..., return_exceptions=True)
plus imperfect handling of
CancelledError
can mask cancellation. Ensure you let cancellations bubble up or explicitly cancel remaining tasks. Practical checklist - Offload sync I/O from async tasks with
await run_sync_in_worker_thread(...)
so the loop remains cancellable. - Add periodic
await anyio.sleep(0)
in long-running loops to create cancellation checkpoints. - Don’t swallow
CancelledError
. - If running under a worker, Docker, or Kubernetes, use the UI or:
Copy code
prefect flow-run cancel <flow-run-id>
- For Docker, make sure signals are forwarded (run the container with an init like tini or
--init
). - For Windows, rely on the UI/CLI for cancellation if Ctrl+C is unreliable. A couple of helpful references: - AnyIO cancellation semantics - asyncio task cancellation If you can share a bit more, I can pinpoint the root cause: - How are you running the flow when Ctrl+C doesn’t stop it? (local script,
prefect worker start
, Docker, Kubernetes) - OS and terminal? - What does the async task do — any long sync I/O or CPU-bound work? - A minimal code snippet where Ctrl+C doesn’t stop it would be perfect.
l
@Marvin how to use
asyncio.TaskGroup
. with prefect cocurrency workers ?
m
thought for 234.9 seconds
Short answer: you can use asyncio.TaskGroup for native async concurrency inside your flow, and use Prefect’s concurrency limits by wrapping the “critical section” of each task with the Prefect concurrency context. The TaskGroup gives you local concurrency; Prefect’s concurrency “workers” (backend service) enforce global limits across flows/runs. Key patterns 1) Plain async coroutines with a TaskGroup + Prefect concurrency limit
Copy code
import asyncio
from prefect import flow, get_run_logger
from prefect.concurrency.asyncio import concurrency

async def fetch_one(i):
    # This acquires a slot from the Prefect concurrency limit named "external-api"
    async with concurrency("external-api", occupy=1, timeout_seconds=60):
        # do your I/O here (httpx, aioboto3, etc.)
        return i

@flow
async def main(n: int = 20):
    logger = get_run_logger()
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_one(i)) for i in range(n)]
    # If you need results, collect from the created tasks you tracked
    <http://logger.info|logger.info>("Done")
- Create the concurrency limit named “external-api” in the Prefect UI (Administration > Concurrency limits) with your desired slot count. - Each coroutine that enters
async with concurrency("external-api")
will wait until a slot is available. TaskGroup controls how many you schedule locally; the Prefect limit gates the global maximum across runs. 2) Prefect tasks + Prefect concurrency limits (recommended for task-level work)
Copy code
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency

@task
async def fetch_task(i: int):
    async with concurrency("external-api", occupy=1, timeout_seconds=60):
        # do async I/O work here
        return i

@flow
async def main(n: int = 20):
    # submit a bunch of tasks
    futures = [fetch_task.submit(i) for i in range(n)]
    # await results in parallel
    results = await asyncio.gather(*(f.result() for f in futures))
    return results
- Put the
async with concurrency(...)
inside the task so every task acquires/releases a slot correctly. - Prefer this for Prefect tasks instead of running tasks directly in a TaskGroup. Submit tasks, then await the futures. 3) Combine local caps with global Prefect limits If you want to cap local concurrency as well as global limits:
Copy code
import asyncio
from prefect import flow
from prefect.concurrency.asyncio import concurrency

sema = asyncio.Semaphore(10)  # local cap

async def work(i):
    async with sema:
        async with concurrency("external-api", occupy=1):
            # do work
            ...

@flow
async def main(n=100):
    async with asyncio.TaskGroup() as tg:
        for i in range(n):
            tg.create_task(work(i))
Notes and gotchas - Don’t swallow cancellation: if a flow is cancelled,
TaskGroup
will cancel children; avoid catching
CancelledError
without re-raising. - If your “async” code does long synchronous work, offload it so cancellation and fairness work as expected (e.g.,
await prefect.utilities.asyncutils.run_sync_in_worker_thread(...)
). - Avoid mixing TaskGroup to run Prefect tasks directly. Submit tasks and await their futures instead, or keep TaskGroup for non-Prefect helper coroutines. - To rate limit instead of slot limit, there’s also
prefect.concurrency.asyncio.rate_limit(...)
with a similar pattern. If you can share: - whether you want to parallelize native coroutines, Prefect tasks, or both - your target global limit and whether you also want a local cap …I can tailor the example to your exact use case. For Python 3.11 TaskGroup semantics, see the Python docs: asyncio.TaskGroup. For Prefect concurrency limits, see the Prefect docs (Concurrency concept page).
l
@Marvin using task_runner=ThreadPoolTaskRunner(max_workers=15),
m
thought for 52.1 seconds
Got it — you’re using ThreadPoolTaskRunner(max_workers=15). Here’s how to combine that with asyncio.TaskGroup and Prefect’s global concurrency limits. What ThreadPoolTaskRunner does - It limits how many Prefect tasks can run concurrently in this flow run (up to 15 tasks at once). - It does not limit coroutines you spawn inside a single task — that’s where asyncio.TaskGroup (local concurrency) and Prefect concurrency limits (global slots) come in. Patterns that work well A) Use TaskGroup inside an async Prefect task, and gate external resources with Prefect’s async concurrency
Copy code
import asyncio
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.asyncio import concurrency

async def fetch_one(i: int):
    # Acquire a global slot before touching the shared resource
    async with concurrency("external-api", occupy=1, timeout_seconds=60):
        # do async I/O (e.g., httpx)
        return i

@task
async def fetch_batch(batch: list[int]):
    results = []
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_one(i)) for i in batch]
        # TaskGroup raises on first failure; results can be collected if needed
    # if you need the results, track them as you create the tasks
    return True

@flow(task_runner=ThreadPoolTaskRunner(max_workers=15))
def main(items: list[int]):
    # Submit multiple batches as Prefect tasks (up to 15 at a time)
    batch_size = 50
    futures = [fetch_batch.submit(items[i:i+batch_size])
               for i in range(0, len(items), batch_size)]
    return [f.result() for f in futures]
- ThreadPoolTaskRunner caps concurrent Prefect tasks at 15. - Inside each task, TaskGroup gives fast, local async fan-out. - The Prefect concurrency context
concurrency("external-api")
enforces your global limit across all flows/runs. B) If your tasks are sync, use the sync concurrency context
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import concurrency

@task
def sync_task(i: int):
    with concurrency("external-api", occupy=1, timeout_seconds=60):
        # blocking I/O here (requests, boto3, etc.)
        return i

@flow(task_runner=ThreadPoolTaskRunner(max_workers=15))
def main(items: list[int]):
    futures = [sync_task.submit(i) for i in items]
    return [f.result() for f in futures]
- Use
prefect.concurrency.sync.concurrency
for blocking code running in a thread. C) If you want TaskGroup at the flow level - Make the flow async and run coroutines in a TaskGroup. Be aware that those coroutines are not Prefect tasks (so they won’t appear as separate task runs in the UI). - Still wrap the critical section with
prefect.concurrency.asyncio.concurrency(...)
. - Keep using ThreadPoolTaskRunner for any Prefect tasks you submit from that flow — it won’t affect your flow-level TaskGroup. Gotchas - Don’t mix TaskGroup to run Prefect tasks themselves; submit tasks with
.submit
and await their
.result()
— let the task runner handle concurrency. - Each TaskGroup inside a task can fan out many coroutines; be mindful of oversubscription. Use a local Semaphore to cap per-task fan-out if needed. - Use the async concurrency context for async code and the sync context inside blocking/threaded code. - Create the “external-api” (or your chosen name) concurrency limit in the Prefect UI with the desired slot count so the contexts can acquire it. If you share whether your inner work is async (httpx/aioboto3/etc.) or sync (requests/boto3), I can tailor the code to your exact case (including optional rate limiting). Python TaskGroup docs: asyncio.TaskGroup.