https://prefect.io logo
Title
k

Kelvin DeCosta

12/21/2022, 11:42 PM
Hey! I’m trying to run mapped tasks concurrently. Unfortunately, I’ve noticed that the tasks take much longer than expected and in some cases get completely stuck. For more context: • This synchronous task pings an API and returns a list of strings, that can have a length of anything between 100 to 20k. • The plain Python function for the task usually takes 3-5 seconds on average and at worst it takes 40-60 seconds, regardless of the result size. When run concurrently it takes anywhere between 1 to 5 minutes. • I’ve noticed that the task runs rather quickly for the cases when a small list was returned, while it completely hangs when a large list is returned • I tried disabling caching the results in memory (but I didn’t explicitly enable result persisting) and that didn’t seem to do anything • There are about 1000 of these tasks. I tried running
.map
for all of them (like the daredevil that I am) with a limit of 20 at a time but it started hanging more often. Right now I’ve chosen to create batches of 20, run one batch of tasks concurrently and then move on to the next batch. • Deployments are run via ECS Tasks with 4 vCPUs and 8 GB memory I’m aware of the ongoing concurrency refactor and I’m excited for it, but I want to build something reliable right now. I’d really appreciate it if I could get some of the following questions answered: • Would using
async
improve the reliability of the tasks? • Will explicitly creating a new
ConcurrentTaskRunner
help? • What do you think is the overhead that causes the tasks to take much more time than what feels necessary? I’m open to any suggestions and any help is really really appreciated! Thanks 😊
👀 1
z

Zanie

12/22/2022, 12:02 AM
1. Using async should help. There’s less overhead since we don’t need to submit your tasks to threads and the context switching is faster. 2. I do not believe this would have any effect. 3. I’m not sure! I’ll be investigating this during the refactor. At the very least, we need to make several API calls to orchestrate the run. Are you using Prefect Cloud?
🎈 2
k

Kelvin DeCosta

12/22/2022, 12:09 AM
Thank you for the reply! I’ll experiment with async and update my findings. Yes, we use Prefect Cloud
z

Zanie

12/22/2022, 12:12 AM
I’m not sure why it’d be slow when returning larger lists, you may want to turn off persistence explicitly rather than disabling in memory caching.
k

Kelvin DeCosta

12/22/2022, 9:32 AM
As you suggested, I tried setting
persist_result=False
for the tasks that were running long and it seemed to reduce the run duration by 10-20%. Edit: I was getting some errors with
async
, but I seem to have resolved them
Actually, I think I figured it out. And wow, its much much faster and doesnt seem to hang at all
y

Yaron Levi

12/22/2022, 11:57 AM
@Kelvin DeCosta What was the thing that solved it? Adding the
persist_result=False
?
k

Kelvin DeCosta

12/22/2022, 12:01 PM
I migrated my tasks to use
async
,
await
because it seems to be much more smoother. Some points to consider: • have tested it only locally so far, but previously my sync mapped tasks couldn't even run on my computer •
persist_result
might not be necessary, since most of the overhead seems to be the context switching and dispatching of threads for sync tasks, which isnt the case for async • In my case, I needed the
State
(to check whether there was an error and not to immediately fail the flow), so I had to use
asyncio.gather
to consume the results
Also, I think you can specify task concurrency limit at a flow level (not a global level) using semaphores, which is actually very nice
y

Yaron Levi

12/22/2022, 12:04 PM
Very useful! Thank you for those points. We are also getting into refactor an ETL job to Prefect with some level of concurrency so we might face similar issues.
k

Kelvin DeCosta

12/22/2022, 1:22 PM
I eventually ended up with a combination of
async
tasks and batching. A batch of
n
async tasks executes via
.map(params, return_state=True
. Later the states are consumed via
asyncio.gather()
.