Alex Furrier
07/28/2021, 10:13 PM[2021-07-28 22:06:32+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'My Flow'
[2021-07-28 22:06:32+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `__main__.`...
Creating scheduler pod on cluster. This may take some time.
[2021-07-28 22:06:37+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-78fcc2c9-3.prefect:8787/status>
distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=2
[2021-07-28 22:07:04+0000] WARNING - prefect.CloudFlowRunner | Flow run is no longer in a running state; the current state is: Error: pods ['prefect-job-054cbaf7-rt2mr'] failed for this job">
The last thing I attempted to change was to have all results for the flow be AzureResults
with a connection to blob storage. The environment variables for connecting to our blob storage are available in the docker image the flow runs on, but I didn't see them on the prefect job. They are on the dask workers that spawn. I don't see successful results showing up in our blob storage. Not sure if the two of these are related but thought it's possible.@task(result=AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
container=os.environ.get('AZURE_STORAGE_CONTAINER'))
)
def my_task(x):
return x + 1
and the environment variables are present on the dask workers, but there are no results being written to blob storage. Locally using the same information I can see that the connection to blob storage is active by checking if an existing blob exists or not.
result = AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
container=os.environ.get('AZURE_STORAGE_CONTAINER'))
result.exists('test-results/00048567-d290-4ba0-9b44-05da127902e8.prefect_result')
# returns True
If I'm running a flow using Prefect server I would assume since checkpointing is turned on the results would go to blob storage but is that not the case?Kevin Kho
07/29/2021, 12:42 AMexport PREFECT__FLOWS__CHECKPOINTING=true
. You can also use it on the task decorator like @task(…, checkpoint=True)
2. The suggestion from Nicholas in the previous thread was less about the persistence of results, and more about how you store it. The suggestion is to not apply the Result to the task decorator, because that will hold things in memory. The suggestion is to save it inside the task, and return the location instead. This might help in reducing the memory footprint. So in sequential steps it would look like: create Azure result class, write out data, keep track of location in a variable, delete the data, call the garbage collector, return the location. In a downstream task, you would then load it from the location and perform operations and repeat the process. This reduces the memory footprint.
3. Are you passing the environment variable to the RunConfig? That is also a good way to get the environment variable into the Flow (though I can see if there are security issues). Either way, I hope that by doing the Result
manually, you get more granularity into if it gets persisted or not.
4. Some people have worked around the memory issues by batching their tasks. Think that instead of E-T-L they do E1-T1-L1-E2-T2-L2…and so on because Dask struggles whenever there is a high number of futures submitted. It seems to blow up in memory, reducing the batch like this would help.
5. Dask has made significant improvements in memory management that might help on the Prefect front. I learned about this between the last time you posted and now. See this and video, which is way better. There is an environment variable prescribed that you can set on your Dask cluster, and it would help in reducing the memory footprint. I specifically have seen a user apply this on Prefect Server and they said it helped, but it still could not handle their scale (1M mapped tasks). For you though, it might be able to help.Alex Furrier
07/29/2021, 10:03 PMFlow("my-flow",
run_config=KubernetesRun(memory_limit='20G, memory_request='6G')
)
but I'm still getting the OOM error.
I think the next thing to try would be to run the mapped task in batches since the numerous futures seems to be causing issues. It seems like Dask won't release any memory of a subtask until they're all complete which is causing these issues since the cumulative memory adds up when there is a large number of mapped tasks.
Any suggestions for how to implement that dynamically if given like a batch size? I can slice up the iterable that's currently being mapped over into smaller batches but not sure what the idiomatic way of taking each batch and mapping a task across it is without running into the same issue all over again (holding all tasks in memory until every future result is complete)LocalDaskExecutor
instead of the DaskExecutor
which spun up a Dask Cluster on k8s with worker pods. Doing this the memory issues are not present. It becomes a cpu intensive task (makes sense with a single worker running multiple threads), but the memory of that pod is only around 500mb compared to what was happening on the Dask worker pods which would almost always error out due to increasing memory usage.
Any idea why the memory consumption is so drastically different using LocalDaskExecutor
vs DaskExecutor
?Kevin Kho
07/30/2021, 1:40 AMStartFlowRun
to start things, you should chain the batches so that they don’t run at the same time and run into resource constraints. I also would not be surprised if there is unmanaged memory (as seen in the Dask resources), when you use E1-T1-L1-E2-T2-L1. I think distributed compute engines (both Spark and Dask), have the same issues where the unmanaged memory just steadily increases over time. In that sense it might be worth splitting up into separate Flow runs.
Now with the executors. I’m not exactly sure what’s going on but here is my guess. The LocalDaskExecutor
does not use distributed
and spins up a local Dask multiprocessing pool. The DaskExecutor
is intended for a cluster and uses distributed
. About the`LocalDaskExecutor` only uses one worker. If LocalDaskExecutor
is working great for you, you might be able to squeeze more out with
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
I think the default is 2 workers.
So there are two parts to this. One is that you provisioned KubernetesRun
resources, but did you specify DaskExecutor
resources? Maybe it’s just a matter of the defaults not utilizing all the resources and you can bump it up by passing the kwargs
?
The second part is the memory, and this one is way beyond me but my best (and probably unsatisfactory) guess is that when you have a distributed system, you use more memory when doing stuff like replication for fault tolerance (from what I know with Spark, it has 3 copies of each partition by default. I assume Dask has a similar mechanism). Aside from that, data does not live on the same machine and has to be shuffled around (called shuffling) to perform operations. A lot of data has to be passed from the workers to the master node. In this specific case with Dask (just note I don’t have as much experience), I think it’s all those Futures going out and the master node holds memory for them. So even if you have enough memory in the system, you have to think if the master node has enough memory to collect the data needed onto one node. Out of memory issues can happen on just the driver.
Of course, don’t believe me on my Dask knowledge, and you are likely better served asking the Dask Slack channel? But to be honest I don’t see a lot of responses there.Alex Furrier
08/06/2021, 5:01 PMLocalDaskExecutor
for now instead of trying to come up with a way to batch things.
I compiled most of the lessons learned from this on memory management + Dask in Prefect, would there be any interest in sharing that somewhere?Kevin Kho
08/06/2021, 5:06 PM