Syméon del Marmol
10/13/2025, 2:54 PMfuture = my_task.submit() I get a PrefectConcurrentFuture which is not compatible with asyncio.
I tried to run the wait method in a separate executor like this:
future = my_task.submit()
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, future.wait)
This generally worked, but it happened that the process keeps waiting forever even though the task is completed.
Running self-hosted prefect 3.4.23 on kubernetes.Marvin
10/13/2025, 2:54 PMNate
10/13/2025, 2:56 PMfutures = foo.map(iterable)
# do other things
wait(futures) # resolve all futures concurrentlyMarvin
10/13/2025, 2:58 PMfuture.wait() in a thread. In Prefect 3.x, use the future’s async APIs directly.
Recommended patterns
- If you want the work to run on task workers (distributed/background):
- Use .delay() to submit, then await with .result_async() or .wait_async() later.
- Example:
# submit background tasks to workers
futs = [
my_task.with_options(name=f"job-{i}").delay(i)
for i in range(10)
]
# await results later, without blocking the thread
results = await asyncio.gather(*(f.result_async() for f in futs))
# or just ensure completion
await asyncio.gather(*(f.wait_async() for f in futs))
- If you’re keeping execution in the flow process (local concurrency):
- Use .submit() to get a PrefectConcurrentFuture and still await via .result_async() / `.wait_async()`:
f1 = my_task.submit(a=1)
f2 = my_task.submit(a=2)
# elsewhere in your async code
r1 = await f1.result_async()
r2 = await f2.result_async()
# or collectively
r1, r2 = await asyncio.gather(f1.result_async(), f2.result_async())
Why your run_in_executor(..., future.wait) can hang
- future.wait() is a blocking call designed for sync code. Running it in a thread can deadlock or stall due to how Prefect’s engine and AnyIO manage threading and synchronization. The async methods are designed to integrate cleanly with asyncio and avoid these pitfalls.
Notes
- Both .result_async() and .wait_async() are awaitable and can be used anywhere in your async flow. If you only need to know the task has finished (no result), use wait_async(). If you need the value and errors raised, use result_async().
- If your goal is to fan out work across multiple task workers on Kubernetes, prefer .delay() over .submit().
If you’re still seeing hangs when using `wait_async()`/`result_async()`, please share a minimal snippet and the flow/task logs so we can dig in.
References
- Prefect async utilities (gather, task groups)
- Background tasks in Prefect (overview)
- Futures implementation (source)Syméon del Marmol
10/13/2025, 3:04 PMSTEPS = {
"step1": task_step1, # these are Prefect tasks
"step2": task_step2,
"step3": task_step3,
}
@flow
async def my_flow():
await asyncio.gather(*[process_doc(id) for id in doc_ids])
async def process_doc(id: str):
doc_step = db.get_doc_step(id)
while doc_step in STEPS:
task = STEPS[doc_step]
future = task.submit()
## NEED TO WAIT BEING UNBLOCKING HERE
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, future.wait)
doc_step = db.get_doc_step(id)
Hard to make it work with the wait(tasks) API here.. 😕Syméon del Marmol
10/13/2025, 3:57 PM