Hey guys, i have a quick question on good practice...
# prefect-community
f
Hey guys, i have a quick question on good practice. How scalable is the mapping task ? are the children tasks "queued" if the map is pretty big ?
I'm asking this because i want to POST ressources one by one to our server and to speed up the process i would like to map all of the ressources that has to be posted to post them in a parallelized manner. The thing is that i have around 300k post to do so the mapping would create 300k children tasks. I was at first thinking about doing a map on batches of 100 or 1000 ressources so that one children is posting 100 one by one to reduce the number of children but i loose the granularity of which ressource failed to be posted or not. What would you suggest ? Mapping each ressources to post or mapping batches of 100 or 1000 ressources ?
e
Hi Florian, prefect itself wil be able to scale to 300k tasks no problem. However, the tasks will be "queued". Prefect 1.0 uses dask for parallelism, which has a pool of workers. The most lightweight worker you can get is a thread, so for each task you want to execute in parallel, you need a separate thread. Since your tasks aren't CPU bound, you can probably go for a couple hundred threads, but too much will still hinder performance/crash.
💯 1
Batches of 100/1000 and using
async
requests to get better request concurrency is a good solution imo. Since async coroutines are much more lightweigt than threads for concurrency. Another option, that I'm not sure whether it will work or not, is to send an
async
request per task, but await it in a downstream task. This way you can let your worker pool start another task, while your http request is delegated to the async event loop.
upvote 2
f
Thanks a lot @emre for the detailed response. I'm also more favorable to the batch option but i haven't thought about using async requests (i was just using a simple for loop to do the request one by one). I have one last question considering this method: How would you handle request failure ? As every workers will handle batches of requests, if some fails, the tasks didn't succed but didn't failed either. How would you retry the failed one ? For now i'm just considering using basic logging info. Thanks 🙂
e
Honestly depends on how critical a failure is. Since the elements in your batches are actually independent, you probably want to log any failure, but continue processing your batch within each task. To facilitate retries, you first need to separate failed requests from successful ones. Maybe return a 2-tuple per task, outputs of successful requests on one side, and failed request urls on the other. Then you can split the tuple downstream, and "retry" the failed requests with a different task. since you expect a lot less requests in the retrying branch, you can maybe even do 1 task per request, and achieve granularity that way. Seriously think about
async
if you are going the mini batch route though. Its extra complexity, sure, but the difference in speed is night and day.
🙏 1
Also
prefect.flatten
will be your friend since you will end up with a list of mini-batches (lists). I met this bad boy too late in my journey with prefect.
f
Got you, thanks a lot @emre, it's really nice to have such advices so quickly ^^
😊 1
Might be a dumb question but how do i use asyncio with prefect ? Seems that it is included in orion but not in prefect 1.0
e
Not at all! Prefect 1.0 indeed isn't utilizing
async
when executing tasks, so you need to submit your async requests to the event loop from within your task. Just write your async coroutine as in normal python, and start it with
asyncio.run()
something like:
Copy code
import asyncio

from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor, DaskExecutor


@task
def sleep():
    async def process_minibatch():
        coros = [asyncio.sleep(0.1) for _ in range(1000)]
        await asyncio.gather(*coros)

    asyncio.run(process_minibatch())


with Flow("async_submits") as flow:
    sleep.map(upstream_tasks=[list(range(100))])

if __name__ == "__main__":
    flow.run(executor=DaskExecutor(cluster_kwargs=dict(n_workers=10)))
    # flow.run(executor=LocalDaskExecutor(cluster_kwargs=dict(num_workers=10)))
🙏 1
f
oooh ok i define the async function within the task, thanks again !
e
outside should be ok too, I just wanted it to be compact
🙏 1