<@ULVA73B9P> we are strugling a lot with error ren...
# ask-community
ø
@Marvin we are strugling a lot with error renewing concurrency release for longer running flows. It typically occurs after a somewhat heavy task involving both some CPU bound stuff with lots of async IO. We are using the default task runner, and mostly just awaiting an asyncio gather for several tasks. We updated the PREFECT_RUNNER_HEARTBEAT_FREQUENCY=30 and I can confirm that even during the longer task it is sending frequent heartbeats. However after the long task, consistently we get error renewing the lease when it switches to a new task. For context we are on prefect 3.4.25 now (have tested several versions), and we have a global limit on the deployment (to avoid hitting resource limits on number of containers) We run deployments on azure container instances. They have 1CPU available each
m
thought for 613.3 seconds
Thanks for the detailed context — that helps. Short version: what you’re seeing is usually the concurrency lease expiring while a long, CPU-heavy/IO-heavy task is running and starving the event loop. Heartbeats are separate from lease renewal; you can see frequent heartbeats and still have the lease renewal miss its window. On a single-CPU ACI container, a CPU-bound section inside an async task can prevent the background renewal from running in time; when your code returns to the loop (e.g., when the long task finishes and the next task starts), Prefect tries to renew and you get “error renewing concurrency lease”. Key points - Heartbeats vs lease renewals:
PREFECT_RUNNER_HEARTBEAT_FREQUENCY
controls runner heartbeats for observability. Concurrency lease renewal is a separate background activity; it isn’t controlled by that setting. - Lease TTL: In Prefect 3.x the TTL and renewal cadence aren’t user-configurable. There isn’t a
PREFECT_*
setting to extend the lease TTL. You can adjust the general API timeout via
PREFECT_API_REQUEST_TIMEOUT
. - 1 CPU in ACI: Any CPU-bound work executed in the event loop (e.g., inside an
async def
task) can starve the loop and delay renewal calls beyond the lease TTL. What usually fixes it 1) Keep CPU-bound work off the event loop - If your task is
async def
but does CPU-heavy work inside, offload the heavy part:
Copy code
from prefect import task
import anyio

@task
async def cpu_heavy_async(items):
    def crunch(chunk):
        # CPU-bound logic
        return sum(x*x for x in chunk)

    results = []
    for chunk in chunked(items, 1000):
        # Offload CPU to a thread
        res = await anyio.to_thread.run_sync(crunch, chunk)
        results.append(res)
        # Yield to the loop periodically so renewals/heartbeats run
        await anyio.sleep(0)
    return sum(results)
- Alternatively, make the task synchronous so Prefect runs it in the threadpool by default:
Copy code
from prefect import task

@task
def cpu_heavy_sync(items):
    # same CPU-bound logic here, runs in a worker thread
    ...
- For truly CPU-bound workloads, consider a process pool for those tasks to bypass the GIL. Even with 1 CPU, this prevents the event loop from being blocked:
Copy code
from prefect import flow
from prefect.task_runners import ProcessPoolTaskRunner

@flow(task_runner=ProcessPoolTaskRunner(max_workers=1))
def my_flow():
    ...
2) Make sure your IO is truly async - If you’re using
asyncio.gather(...)
but calling sync libraries inside those coroutines, you’re still blocking the loop. Use async clients or wrap the sync calls with
anyio.to_thread.run_sync(...)
. 3) Yield explicitly during long loops - In long-running async tasks, add cooperative yield points:
Copy code
await anyio.sleep(0)  # or a tiny value like 0.01
- If you’re doing a big
asyncio.gather
over many items, impose a concurrency limit with a semaphore so the loop can service background tasks:
Copy code
sem = asyncio.Semaphore(10)

async def limited(coro):
    async with sem:
        return await coro

await asyncio.gather(*(limited(worker(x)) for x in items))
4) Reduce intra-flow parallelism on 1 CPU - If you have many Prefect tasks running at once on a 1-CPU container, the event loop/renewal may get less time. You can lower task-level parallelism: - Use
ThreadPoolTaskRunner(max_workers=1)
to serialize tasks at the Prefect level, while still allowing your inner async IO to run:
Copy code
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner

@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
async def my_flow():
    ...
- Or keep the default runner and limit your own
gather
concurrency as above. 5) Increase the API request timeout - If the network is occasionally slow from ACI, give renewal calls more headroom:
Copy code
export PREFECT_API_REQUEST_TIMEOUT=120
This won’t fix event-loop starvation, but it helps with transient network slowness.
6) As a diagnostic, temporarily disable the deployment’s global concurrency - If you can safely do so for a test, remove or increase the deployment’s global concurrency limit and re-run. If the error disappears, it confirms the failure is specifically lease renewal for that limit and not something else. - Docs: Global concurrency limits, Tag-based limits. Helpful references - Concurrency APIs: prefect.concurrency.asyncio, Concurrency services - Task runners: Task runners - Settings: Settings reference (see
PREFECT_API_REQUEST_TIMEOUT
) Next steps / questions - Can you share: - The exact log line(s) for the “error renewing concurrency lease” (including which limit name it mentions)? - Rough duration of the long task. - Whether the long task is defined as
async def
and whether it contains CPU-heavy code (e.g., pandas/numpy loops). - Whether you’re also using tag-based concurrency on the tasks. - If you can try one change first, I’d start by offloading CPU-heavy sections from async tasks using
anyio.to_thread.run_sync(...)
and adding
await anyio.sleep(0)
in long loops, then set
PREFECT_API_REQUEST_TIMEOUT=120
. If the issue persists, we can look at dialing down task-level parallelism or using a process pool for the CPU-heavy pieces. If you can share a minimal snippet of the long task and how it’s invoked, I can tailor the exact refactor.
ø
@Marvin OK so the problem is that lease renewal and flow code execution uses the same async event loop? Could I wrap the actual work in a task that is submitted to a threadpool runner, then inside there async gather? I need all the gathers on the same event loop for async sqlalchemy to be happy. And the code is a bit hard to separate the heavier cpu / io bound. We apply a series of user defined transforms to pandas df which might involve async io operations, or just big cpu transformations
@Marvin
m
thought for 78.3 seconds
Great questions — you’ve got the right mental model. - Yes: the lease renewal runs on the same async event loop that’s orchestrating your flow/task. If a long CPU-heavy section runs inside an async task and starves that loop, the renewal can miss its TTL even though heartbeats keep showing up. About your idea - “Wrap the actual work in a task submitted to a threadpool runner, then inside there async gather?” This can work well as long as you run the async part inside its own event loop in that worker thread and you do not reuse async resources (like an async SQLAlchemy engine) that were created on the original loop. Why this helps - The flow’s main event loop stays free to service lease renewals while the heavy work happens in a worker thread. Your async SQLAlchemy usage still runs on one coherent event loop — just not the flow’s loop — which is fine if you create the engine within that loop. Pattern 1: Wrap the heavy section in a sync Prefect task that runs its own loop
Copy code
from prefect import flow, task
import asyncio

@task
def run_heavy_pipeline_in_thread():
    async def main():
        # IMPORTANT: create your async SQLAlchemy engine HERE so it’s bound to this loop
        # engine = create_async_engine(db_url, ...)

        # Limit gather concurrency to keep the loop responsive
        sem = asyncio.Semaphore(10)

        async def worker(item):
            async with sem:
                # ... async IO with SQLAlchemy here ...
                # run CPU-heavy transforms off the loop:
                df = await asyncio.to_thread(cpu_heavy_transform, item.df)
                # tiny yield to keep loop healthy
                await asyncio.sleep(0)
                return df

        items = [...]  # your workload
        results = await asyncio.gather(*(worker(x) for x in items))

        # await engine.dispose()
        return results

    # Runs a new event loop in this worker thread
    asyncio.run(main())

@flow
def my_flow():
    # Default runner (ThreadPool) will put this task in a worker thread
    run_heavy_pipeline_in_thread()
Notes: - Do not reuse an async engine created at module import time or in the outer flow; create it inside
main()
instead. - You can still mix async IO (SQLAlchemy) with CPU work by offloading the CPU bits to
asyncio.to_thread
(or a process pool if you want to bypass the GIL). Pattern 2: Keep everything on the flow loop but offload CPU and add yield points If you prefer to stay on the same loop (i.e., inside an
async def
task):
Copy code
from prefect import task
import asyncio
import inspect
import anyio

async def maybe_async(fn, *args, **kwargs):
    # Run sync functions in a thread to avoid blocking the loop
    if inspect.iscoroutinefunction(fn):
        return await fn(*args, **kwargs)
    return await anyio.to_thread.run_sync(fn, *args, **kwargs)

@task
async def orchestrate():
    sem = asyncio.Semaphore(10)

    async def worker(item):
        async with sem:
            # async SQLAlchemy I/O
            await do_async_db_work(item)

            # user-defined transforms may be sync or async
            df = item.df
            for transform in user_transforms:
                df = await maybe_async(transform, df)

            # encourage the loop to schedule renewals/heartbeats
            await asyncio.sleep(0)
            return df

    items = [...]
    return await asyncio.gather(*(worker(x) for x in items))
This keeps your async I/O on the same loop and moves any synchronous CPU work into threads. If transforms are very CPU-heavy, consider using a ProcessPool for those specific steps:
Copy code
from concurrent.futures import ProcessPoolExecutor

async def run_in_process(fn, *args):
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor(max_workers=1) as pool:
        return await loop.run_in_executor(pool, lambda: fn(*args))
Small but important tweaks - Add cooperative yields in long loops:
await asyncio.sleep(0)
in async tasks.
- Bound your
gather
concurrency with a semaphore, not an unbounded
gather
. - Increase the API timeout in case the ACI network is occasionally slow:
Copy code
export PREFECT_API_REQUEST_TIMEOUT=120
- If feasible, bump the ACI container to 2 vCPU; even a little headroom makes lease renewal far more reliable under mixed CPU/IO loads. To your specific question “I need all the gathers on the same event loop for async SQLAlchemy to be happy.” - Pattern 1 satisfies this by creating a new event loop inside the task’s worker thread and creating the engine inside that loop. Everything for that task uses one loop. - Pattern 2 keeps everything on the flow’s loop but requires you to offload CPU work and yield frequently. This is the least invasive refactor if you don’t want to move the engine. If you can share how you currently construct your async SQLAlchemy engine (global vs per-task) and whether the heavy pipeline is in an
async def
Prefect task, I can suggest the lowest-risk refactor for your case.