Øyvind Monsen
11/05/2025, 3:34 PMMarvin
11/05/2025, 3:35 PMMarvin
11/05/2025, 3:45 PMPREFECT_RUNNER_HEARTBEAT_FREQUENCY controls runner heartbeats for observability. Concurrency lease renewal is a separate background activity; it isn’t controlled by that setting.
- Lease TTL: In Prefect 3.x the TTL and renewal cadence aren’t user-configurable. There isn’t a PREFECT_* setting to extend the lease TTL. You can adjust the general API timeout via PREFECT_API_REQUEST_TIMEOUT.
- 1 CPU in ACI: Any CPU-bound work executed in the event loop (e.g., inside an async def task) can starve the loop and delay renewal calls beyond the lease TTL.
What usually fixes it
1) Keep CPU-bound work off the event loop
- If your task is async def but does CPU-heavy work inside, offload the heavy part:
from prefect import task
import anyio
@task
async def cpu_heavy_async(items):
def crunch(chunk):
# CPU-bound logic
return sum(x*x for x in chunk)
results = []
for chunk in chunked(items, 1000):
# Offload CPU to a thread
res = await anyio.to_thread.run_sync(crunch, chunk)
results.append(res)
# Yield to the loop periodically so renewals/heartbeats run
await anyio.sleep(0)
return sum(results)
- Alternatively, make the task synchronous so Prefect runs it in the threadpool by default:
from prefect import task
@task
def cpu_heavy_sync(items):
# same CPU-bound logic here, runs in a worker thread
...
- For truly CPU-bound workloads, consider a process pool for those tasks to bypass the GIL. Even with 1 CPU, this prevents the event loop from being blocked:
from prefect import flow
from prefect.task_runners import ProcessPoolTaskRunner
@flow(task_runner=ProcessPoolTaskRunner(max_workers=1))
def my_flow():
...
2) Make sure your IO is truly async
- If you’re using asyncio.gather(...) but calling sync libraries inside those coroutines, you’re still blocking the loop. Use async clients or wrap the sync calls with anyio.to_thread.run_sync(...).
3) Yield explicitly during long loops
- In long-running async tasks, add cooperative yield points:
await anyio.sleep(0) # or a tiny value like 0.01
- If you’re doing a big asyncio.gather over many items, impose a concurrency limit with a semaphore so the loop can service background tasks:
sem = asyncio.Semaphore(10)
async def limited(coro):
async with sem:
return await coro
await asyncio.gather(*(limited(worker(x)) for x in items))
4) Reduce intra-flow parallelism on 1 CPU
- If you have many Prefect tasks running at once on a 1-CPU container, the event loop/renewal may get less time. You can lower task-level parallelism:
- Use ThreadPoolTaskRunner(max_workers=1) to serialize tasks at the Prefect level, while still allowing your inner async IO to run:
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
@flow(task_runner=ThreadPoolTaskRunner(max_workers=1))
async def my_flow():
...
- Or keep the default runner and limit your own gather concurrency as above.
5) Increase the API request timeout
- If the network is occasionally slow from ACI, give renewal calls more headroom:
export PREFECT_API_REQUEST_TIMEOUT=120
This won’t fix event-loop starvation, but it helps with transient network slowness.Marvin
11/05/2025, 3:45 PMPREFECT_API_REQUEST_TIMEOUT)
Next steps / questions
- Can you share:
- The exact log line(s) for the “error renewing concurrency lease” (including which limit name it mentions)?
- Rough duration of the long task.
- Whether the long task is defined as async def and whether it contains CPU-heavy code (e.g., pandas/numpy loops).
- Whether you’re also using tag-based concurrency on the tasks.
- If you can try one change first, I’d start by offloading CPU-heavy sections from async tasks using anyio.to_thread.run_sync(...) and adding await anyio.sleep(0) in long loops, then set PREFECT_API_REQUEST_TIMEOUT=120. If the issue persists, we can look at dialing down task-level parallelism or using a process pool for the CPU-heavy pieces.
If you can share a minimal snippet of the long task and how it’s invoked, I can tailor the exact refactor.Øyvind Monsen
11/05/2025, 3:50 PMØyvind Monsen
11/05/2025, 3:56 PMMarvin
11/05/2025, 4:04 PMMarvin
11/05/2025, 4:06 PMfrom prefect import flow, task
import asyncio
@task
def run_heavy_pipeline_in_thread():
async def main():
# IMPORTANT: create your async SQLAlchemy engine HERE so it’s bound to this loop
# engine = create_async_engine(db_url, ...)
# Limit gather concurrency to keep the loop responsive
sem = asyncio.Semaphore(10)
async def worker(item):
async with sem:
# ... async IO with SQLAlchemy here ...
# run CPU-heavy transforms off the loop:
df = await asyncio.to_thread(cpu_heavy_transform, item.df)
# tiny yield to keep loop healthy
await asyncio.sleep(0)
return df
items = [...] # your workload
results = await asyncio.gather(*(worker(x) for x in items))
# await engine.dispose()
return results
# Runs a new event loop in this worker thread
asyncio.run(main())
@flow
def my_flow():
# Default runner (ThreadPool) will put this task in a worker thread
run_heavy_pipeline_in_thread()
Notes:
- Do not reuse an async engine created at module import time or in the outer flow; create it inside main() instead.
- You can still mix async IO (SQLAlchemy) with CPU work by offloading the CPU bits to asyncio.to_thread (or a process pool if you want to bypass the GIL).
Pattern 2: Keep everything on the flow loop but offload CPU and add yield points
If you prefer to stay on the same loop (i.e., inside an async def task):
from prefect import task
import asyncio
import inspect
import anyio
async def maybe_async(fn, *args, **kwargs):
# Run sync functions in a thread to avoid blocking the loop
if inspect.iscoroutinefunction(fn):
return await fn(*args, **kwargs)
return await anyio.to_thread.run_sync(fn, *args, **kwargs)
@task
async def orchestrate():
sem = asyncio.Semaphore(10)
async def worker(item):
async with sem:
# async SQLAlchemy I/O
await do_async_db_work(item)
# user-defined transforms may be sync or async
df = item.df
for transform in user_transforms:
df = await maybe_async(transform, df)
# encourage the loop to schedule renewals/heartbeats
await asyncio.sleep(0)
return df
items = [...]
return await asyncio.gather(*(worker(x) for x in items))
This keeps your async I/O on the same loop and moves any synchronous CPU work into threads. If transforms are very CPU-heavy, consider using a ProcessPool for those specific steps:
from concurrent.futures import ProcessPoolExecutor
async def run_in_process(fn, *args):
loop = asyncio.get_running_loop()
with ProcessPoolExecutor(max_workers=1) as pool:
return await loop.run_in_executor(pool, lambda: fn(*args))
Small but important tweaks
- Add cooperative yields in long loops: await asyncio.sleep(0) in async tasks.Marvin
11/05/2025, 4:06 PMgather concurrency with a semaphore, not an unbounded gather.
- Increase the API timeout in case the ACI network is occasionally slow:
export PREFECT_API_REQUEST_TIMEOUT=120
- If feasible, bump the ACI container to 2 vCPU; even a little headroom makes lease renewal far more reliable under mixed CPU/IO loads.
To your specific question “I need all the gathers on the same event loop for async SQLAlchemy to be happy.”
- Pattern 1 satisfies this by creating a new event loop inside the task’s worker thread and creating the engine inside that loop. Everything for that task uses one loop.
- Pattern 2 keeps everything on the flow’s loop but requires you to offload CPU work and yield frequently. This is the least invasive refactor if you don’t want to move the engine.
If you can share how you currently construct your async SQLAlchemy engine (global vs per-task) and whether the heavy pipeline is in an async def Prefect task, I can suggest the lowest-risk refactor for your case.