<@ULVA73B9P> How do i await a prefectfuture withou...
# ask-marvin
n
@Marvin How do i await a prefectfuture without blocking the event loop?
m
To await a
PrefectFuture
without blocking the event loop, you can use the
add_done_callback
method provided by the
PrefectFuture
class. This method allows you to register a callback function that will be executed once the future completes, thus avoiding the need to block the event loop. Here's a simple example:
Copy code
python
from prefect import flow, task
from prefect.futures import PrefectFuture

@task
def my_task():
    return "result"

def my_callback(future: PrefectFuture):
    print(f"Task completed with result: {future.result()}")

@flow
def my_flow():
    future = my_task.submit()
    future.add_done_callback(my_callback)

my_flow()
In this example,
my_callback
will be called when
my_task
completes, allowing you to handle the result without blocking the event loop. For more details, you can refer to the Prefect source code and documentation: - Prefect futures source code - Prefect task runners documentation
n
i wouldn't do that. short answer is that prefect future methods are currently sync in prefect 3.x by design, though we'd like to introduce methods like
asubmit()
aresult()
etc see https://github.com/PrefectHQ/prefect/issues/15008
n
Hmm, that is worrying. We are trying to offload computationally heavy operations to raytaskrunner, and await them from our prefect worker. Is this not possible?
Or would you reccomend just using ray.remote instead for the time being and not use prefect task caching?
n
well i answered the question at face value; you can totally do work in a non blocking manner but eventually you have to block the event loop in order to resolve the futures. ie .result, whether on a single future or the list of futures you get from map, is a blocking call, even tho ray / dask / thread can do the actual work in a non blocking manner

https://youtu.be/k74tEYSK_t8?si=EC_gUuVaeDNYdGak

you’d be free to use ray directly tho if you choose
n
Ah I see. First, thanks so much for being so prompt. Just have two more questions. We have designed a custom worker that runs its flows asynchronously, because the majority of our use case is network bound tasks. Since it's all in a single process, we really can't afford to block the event loop or it will get in the way of critical worker code like health checks. 1. Can we use something like asyncio.to_thread() on the prefect future? 2. When .wait is called, is the event loop blocked for that duration on the client side? I noticed that you said "eventually you have to block the event loop in order to resolve the future", but I don't quite understand this
n
hmm a couple thoughts: • why did you have to write a custom worker? you can write/run asynchronous python (flows/tasks) using any worker or .serve • .wait() will block if and only if the future has not completed. i say its a blocking call in the sense that in theory it could block if the work is not yet done in the other thread etc, but if the work is already done it will be near instantaneous. i wouldn’t recommend layering asyncio/anyio utils on top of the future interface. i’d either use the submit/map/task runner interface or instead treat tasks like you would any other python function (using asyncio/anyio as needed)
n
We opted for an asynchronous worker since we were noticing that the process worker was creating a new process for every incoming request, and waiting for its full completion regardless of whether it is async. This didn't fit our use case, because a lot of our jobs spend most of their time making network bound calls to outside, and we have higher job volume, so we'd rather parallelize them. To fit this usecase, we decided to just create an asynchronous worker that offloads all compute heavy tasks to ray (so as not to block the event loop). Does prefect natively support any way to run many tasks simultaneously using a default worker? As for point 2, makes sense. It is unlikely that the work will be done at the moment that we submit the ray call, so probably not worth counting on. We'll probably need to invoke ray directly then and use asyncio primitives. Thanks for the help!
n
> We opted for an asynchronous worker since we were noticing that the process worker was creating a new process for every incoming request, and waiting for its full completion regardless of whether it is async this makes me think there's some confusion here. workers are meant to: • poll for scheduled runs • submit flow runs to the execution environment • monitor those submitted runs a flow can be synchronous or asynchronous, the worker (regardless of work pool type) doesn't know or care. its only job is to submit/monitor runs, and in the case of a process pool, it submits the run as a subprocess. It can submit many subprocesses/flow runs at once, it won't be concurrency limited by default. So i'm not sure what you mean "asynchronous worker". the worker shouldn't have anything to do with the event loop of the process it creates
i.e. I don't think you need to create your own worker here
n
The memory overhead caused by the subprocess creation was providing a natural limit on concurrency. It's true that there is some amount of danger in comingling the worker's event loop (health checks, flow monitoring, etc.) with that of the flows, but the performance gains that arise from running all our network bound flows in the same process are quite sizeable. We've crafted our flows such that they don't do anything compute intensive (compute heavy operations are offloaded to ray); this lets them effortlessly handle thousands of concurrent flows where this would not be possible for subprocesses, as each subprocess would take around 100 Mi for its own python process (Max concurrency limit of 80 for a process worker with 8Gi of RAM).