Florian Guily04/26/2022, 10:20 AM
emre04/26/2022, 10:54 AM
requests to get better request concurrency is a good solution imo. Since async coroutines are much more lightweigt than threads for concurrency. Another option, that I'm not sure whether it will work or not, is to send an
request per task, but await it in a downstream task. This way you can let your worker pool start another task, while your http request is delegated to the async event loop.
Florian Guily04/26/2022, 12:12 PM
emre04/26/2022, 12:28 PM
if you are going the mini batch route though. Its extra complexity, sure, but the difference in speed is night and day.
will be your friend since you will end up with a list of mini-batches (lists). I met this bad boy too late in my journey with prefect.
Florian Guily04/26/2022, 12:33 PM
emre04/26/2022, 1:43 PM
when executing tasks, so you need to submit your async requests to the event loop from within your task. Just write your async coroutine as in normal python, and start it with
import asyncio from prefect import task, Flow, case from prefect.executors import LocalDaskExecutor, DaskExecutor @task def sleep(): async def process_minibatch(): coros = [asyncio.sleep(0.1) for _ in range(1000)] await asyncio.gather(*coros) asyncio.run(process_minibatch()) with Flow("async_submits") as flow: sleep.map(upstream_tasks=[list(range(100))]) if __name__ == "__main__": flow.run(executor=DaskExecutor(cluster_kwargs=dict(n_workers=10))) # flow.run(executor=LocalDaskExecutor(cluster_kwargs=dict(num_workers=10)))
Florian Guily04/26/2022, 1:45 PM
emre04/26/2022, 1:47 PM