<@ULVA73B9P> What is the recommended way to wait f...
# ask-marvin
s
@Marvin What is the recommended way to wait for a submitted task to finish, without blocking the thread ? I need to run tasks concurrently and cannot wait for them at the same place. When I call
future = my_task.submit()
I get a PrefectConcurrentFuture which is not compatible with asyncio. I tried to run the
wait
method in a separate executor like this:
Copy code
future = my_task.submit()
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, future.wait)
This generally worked, but it happened that the process keeps waiting forever even though the task is completed. Running self-hosted prefect 3.4.23 on kubernetes.
m
thought for 218.0 seconds
n
you have to block somewhere to resolve the future the recommendation is usually
Copy code
futures = foo.map(iterable)
# do other things

wait(futures) # resolve all futures concurrently
m
Short answer: don’t wrap
future.wait()
in a thread. In Prefect 3.x, use the future’s async APIs directly. Recommended patterns - If you want the work to run on task workers (distributed/background): - Use
.delay()
to submit, then await with
.result_async()
or
.wait_async()
later. - Example:
Copy code
# submit background tasks to workers
  futs = [
      my_task.with_options(name=f"job-{i}").delay(i)
      for i in range(10)
  ]

  # await results later, without blocking the thread
  results = await asyncio.gather(*(f.result_async() for f in futs))
  # or just ensure completion
  await asyncio.gather(*(f.wait_async() for f in futs))
- If you’re keeping execution in the flow process (local concurrency): - Use
.submit()
to get a
PrefectConcurrentFuture
and still await via
.result_async()
/ `.wait_async()`:
Copy code
f1 = my_task.submit(a=1)
  f2 = my_task.submit(a=2)

  # elsewhere in your async code
  r1 = await f1.result_async()
  r2 = await f2.result_async()
  # or collectively
  r1, r2 = await asyncio.gather(f1.result_async(), f2.result_async())
Why your
run_in_executor(..., future.wait)
can hang -
future.wait()
is a blocking call designed for sync code. Running it in a thread can deadlock or stall due to how Prefect’s engine and AnyIO manage threading and synchronization. The async methods are designed to integrate cleanly with asyncio and avoid these pitfalls. Notes - Both
.result_async()
and
.wait_async()
are awaitable and can be used anywhere in your async flow. If you only need to know the task has finished (no result), use
wait_async()
. If you need the value and errors raised, use
result_async()
. - If your goal is to fan out work across multiple task workers on Kubernetes, prefer
.delay()
over
.submit()
. If you’re still seeing hangs when using `wait_async()`/`result_async()`, please share a minimal snippet and the flow/task logs so we can dig in. References - Prefect async utilities (gather, task groups) - Background tasks in Prefect (overview) - Futures implementation (source)
s
Thanks @Nate My issue is that these concurrent tasks are not called from the same place in the code: they run in a loop and call the next one dynamically. Here is a very simplified version of my situation:
Copy code
STEPS = {
    "step1": task_step1, # these are Prefect tasks
    "step2": task_step2,
    "step3": task_step3,
}

@flow
async def my_flow():
    await asyncio.gather(*[process_doc(id) for id in doc_ids])

async def process_doc(id: str):
    doc_step = db.get_doc_step(id)
    while doc_step in STEPS:
        task = STEPS[doc_step]
        future = task.submit()
        
        ## NEED TO WAIT BEING UNBLOCKING HERE
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, future.wait)

        doc_step = db.get_doc_step(id)
Hard to make it work with the
wait(tasks)
API here.. 😕
@Nate any other suggestion ? 🙏 the best would be to have a way to get a PrefectDistributedFuture for tasks submitted to a ProcessPoolTaskRunner, like we have with background tasks !