Hi, we’re getting some mapped child tasks rerun de...
# prefect-community
a
Hi, we’re getting some mapped child tasks rerun despite success on the first run. Normally, our tasks look like this (logs cleaned up a bit):
Copy code
12:10:11 prefect.CloudTaskRunner	Task 'run_simulator[54]': Starting task run...
12:10:11 prefect.run_simulator[54]	Starting simulator run
12:10:11 prefect.run_simulator[54]	cusip_list [{'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}]
12:10:11 prefect.run_simulator[54]	Loading model 'cf621134-8c36-446a-96b5-7ecde88a33e2'
12:10:22 prefect.run_simulator[54]	Simulating pool {'secmnem': 'FNMMA3057', 'cusip': '31418CMF8'}
12:10:31 prefect.run_simulator[54]	Number of replicates 6
12:11:59 prefect.CloudTaskRunner	Task 'run_simulator[54]': finished task run for task with final state: 'Success'
Here is an example though (and they don’t appear super common) where the task succeeded and then was later rerun. One thing you can note is that the model id is different. this is randomly generated (not a big deal) but along with the timestamp just confirms that this is repeated run not a duplicated log.
Copy code
11:55:34 prefect.CloudTaskRunner	Task 'run_simulator[6]': Starting task run...
11:55:35 prefect.run_simulator[6]	Starting simulator run
11:55:35 prefect.run_simulator[6]	cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
11:55:35 prefect.run_simulator[6]	Loading model 'c410358f-4612-4aef-8f12-e9a3642711de'
11:56:23 prefect.run_simulator[6]	Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
11:56:36 prefect.run_simulator[6]	Number of replicates 3
11:57:12 prefect.CloudTaskRunner	Task 'run_simulator[6]': finished task run for task with final state: 'Success'
12:06:17 prefect.CloudTaskRunner	Task 'run_simulator[6]': Starting task run...
12:06:17 prefect.run_simulator[6]	Starting simulator run
12:06:17 prefect.run_simulator[6]	cusip_list [{'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}]
12:06:17 prefect.run_simulator[6]	Loading model '45322fce-d452-4340-9e06-e7bcc2775b84'
12:06:27 prefect.run_simulator[6]	Simulating pool {'secmnem': 'FNMMA3774', 'cusip': '31418DFQ0'}
12:06:40 prefect.run_simulator[6]	Number of replicates 3
12:07:15 prefect.CloudTaskRunner	Task 'run_simulator[6]': finished task run for task with final state: 'Success'
c
Hi Alex - are you running your flow on a dask cluster? If so, this can occur if there is a worker eviction or if dask loses track of the data. In the next release of Core we will be exposing such cluster events in the logs for better visibility. Prefect also has a version locking feature that would prevent this but it currently requires a special cloud license
a
Interesting. So I’m about to write what I thought was a separate post but is about issues with Dask workers. When I finish it in a minute or so, I’ll put it here.
We have a flow that requires loading a model and running simulations on a bunch of different dataframes. We can load one model and one sim object and create many different tasks or group together sim dataframes and run them with a single model (the model is longish to load). Basically, it’s about lots of tasks vs coalescing them together to be a bit more efficient. Either way, we run into problems with Dask. If we have lots of tasks, we get workers that have to run many tasks but this appears to build up memory usage. First, why is this the case? The tasks aren’t returning anything so shouldn’t the worker keep a clean slate? Do the workers run the tasks in subprocesses? Second, because of the build up, if we have too many tasks per worker, eventually at least one worker (usually more) restarts.  This seems to then just break everything. Prefect doesn’t recognize the restart so the task appears to still be running. Dask doesn’t seem to reassign the task to the worker and try it again. And then eventually, although its not clear what’s doing this, a KilledWorker error appears and that gets sent to Prefect. On the other hand, if we group together tasks, Python memory builds up despite “deleting” objects. What are we supposed to do in this case to prevent the build up?
c
So this will be fairly difficult to answer in the abstract but: • dask workers may run tasks in subprocesses (depends on your cluster type and configuration) • While your task may not return data, the task runner does (it returns your task’s prefect state) • Not sure why dask isn’t reassigning your tasks Managing worker memory build up might be a good question to ask to the dask folks
a
Thanks for the response.
are you running your flow on a dask cluster? If so, this can occur if there is a worker eviction or if dask loses track of the data.
Not sure why dask isn’t reassigning your tasks
We studied this a bit more and it seems like the task that runs multiple times does so because its worker restarted due to another task.
We were still running into memory issues and but we think we know why and found a workaround: https://stackoverflow.com/questions/63680134/growing-memory-usage-leak-in-dask-distributed-profiler/63680548#63680548
c
Ah I’m really glad you figured it out - I’ll forward this along internally to see if there’s anything we can do to make this better 👍
a
Thanks!
We were still running into memory issues and but we think we know why and found a workaround
After running another attempt at the flow in question, this did the trick! 1. Tasks were set up so that there were ~2 tasks per worker. Before adjusting those profile intervals, we found that the second task for a worker would start with an elevated memory usage (~4gb compared to .5gb for the first task on a worker). 2. Cutting down the polling by 1000x led to the profiler no longer being picked up by tracemalloc (as reported in that stackoverflow post). 3. Whereas before we ran into a ton of garbage collection warnings from Dask as well as memory usage creeping up to 4,5,6gb etc and eventually bricking the workers and thus the flow, when we set the profile intervals we peaked out approximately at just over 1gb for each worker with no noticeable memory growth as 750mb-1gb is about what we expect. I’m not sure what the side effects are but this is a tremendous improvement for us. I’ll give that stackoverflow post a few days and maybe migrate it to the Dask github to see if anyone has an opinion on it.
c
Excellent - I’ll also archive this info to Prefect’s GitHub for better visibility as well
@Marvin archive “Dask memory usage keeps increasing”
j
Thanks for the report here, the distributed profiler certainly shouldn't have this much overhead. I've opened an issue to track this here: https://github.com/dask/distributed/issues/4091. If possible, could you comment there with a reproducible example demonstrating your issue? It'd be helpful for us in debugging. Thanks!
upvote 1