Florian Guily
04/26/2022, 10:20 AMemre
04/26/2022, 10:54 AMasync
requests to get better request concurrency is a good solution imo. Since async coroutines are much more lightweigt than threads for concurrency.
Another option, that I'm not sure whether it will work or not, is to send an async
request per task, but await it in a downstream task. This way you can let your worker pool start another task, while your http request is delegated to the async event loop.Florian Guily
04/26/2022, 12:12 PMemre
04/26/2022, 12:28 PMasync
if you are going the mini batch route though. Its extra complexity, sure, but the difference in speed is night and day.prefect.flatten
will be your friend since you will end up with a list of mini-batches (lists). I met this bad boy too late in my journey with prefect.Florian Guily
04/26/2022, 12:33 PMemre
04/26/2022, 1:43 PMasync
when executing tasks, so you need to submit your async requests to the event loop from within your task. Just write your async coroutine as in normal python, and start it with asyncio.run()
something like:
import asyncio
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor, DaskExecutor
@task
def sleep():
async def process_minibatch():
coros = [asyncio.sleep(0.1) for _ in range(1000)]
await asyncio.gather(*coros)
asyncio.run(process_minibatch())
with Flow("async_submits") as flow:
sleep.map(upstream_tasks=[list(range(100))])
if __name__ == "__main__":
flow.run(executor=DaskExecutor(cluster_kwargs=dict(n_workers=10)))
# flow.run(executor=LocalDaskExecutor(cluster_kwargs=dict(num_workers=10)))
Florian Guily
04/26/2022, 1:45 PMemre
04/26/2022, 1:47 PM