The issue I'm having is a continuation of this <th...
# prefect-server
a
The issue I'm having is a continuation of this thread, which is on issues with a task mapped to many subtasks and Dask. The most current issue is that the K8s job that the flow starts with gets an OOM error and dies which causes the flow to fail.
Copy code
[2021-07-28 22:06:32+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'My Flow'
[2021-07-28 22:06:32+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `__main__.`...
Creating scheduler pod on cluster. This may take some time.
[2021-07-28 22:06:37+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-78fcc2c9-3.prefect:8787/status>
distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=2
[2021-07-28 22:07:04+0000] WARNING - prefect.CloudFlowRunner | Flow run is no longer in a running state; the current state is: Error: pods ['prefect-job-054cbaf7-rt2mr'] failed for this job">
The last thing I attempted to change was to have all results for the flow be
AzureResults
with a connection to blob storage. The environment variables for connecting to our blob storage are available in the docker image the flow runs on, but I didn't see them on the prefect job. They are on the dask workers that spawn. I don't see successful results showing up in our blob storage. Not sure if the two of these are related but thought it's possible.
I don't think the AzureResults is working properly for these. For larger memory tasks I set it like so:
Copy code
@task(result=AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
                         container=os.environ.get('AZURE_STORAGE_CONTAINER'))
      )
def my_task(x):
    return x + 1
and the environment variables are present on the dask workers, but there are no results being written to blob storage. Locally using the same information I can see that the connection to blob storage is active by checking if an existing blob exists or not.
Copy code
result = AzureResult(connection_string=os.environ.get('AZURE_STORAGE_CONNECTION_STRING'),
                     container=os.environ.get('AZURE_STORAGE_CONTAINER'))

result.exists('test-results/00048567-d290-4ba0-9b44-05da127902e8.prefect_result')
# returns True
If I'm running a flow using Prefect server I would assume since checkpointing is turned on the results would go to blob storage but is that not the case?
k
Hey @Alex Furrier, I’ll re-read and go through this more thoroughly in a bit.
👍 1
🙏 1
Ok so a couple of thoughts. 1. Checkpointing is indeed turned on when running with a backend. If you want to be super sure, you can do
export PREFECT__FLOWS__CHECKPOINTING=true
. You can also use it on the task decorator like
@task(…, checkpoint=True)
2. The suggestion from Nicholas in the previous thread was less about the persistence of results, and more about how you store it. The suggestion is to not apply the Result to the task decorator, because that will hold things in memory. The suggestion is to save it inside the task, and return the location instead. This might help in reducing the memory footprint. So in sequential steps it would look like: create Azure result class, write out data, keep track of location in a variable, delete the data, call the garbage collector, return the location. In a downstream task, you would then load it from the location and perform operations and repeat the process. This reduces the memory footprint. 3. Are you passing the environment variable to the RunConfig? That is also a good way to get the environment variable into the Flow (though I can see if there are security issues). Either way, I hope that by doing the
Result
manually, you get more granularity into if it gets persisted or not. 4. Some people have worked around the memory issues by batching their tasks. Think that instead of E-T-L they do E1-T1-L1-E2-T2-L2…and so on because Dask struggles whenever there is a high number of futures submitted. It seems to blow up in memory, reducing the batch like this would help. 5. Dask has made significant improvements in memory management that might help on the Prefect front. I learned about this between the last time you posted and now. See this and

this

video, which is way better. There is an environment variable prescribed that you can set on your Dask cluster, and it would help in reducing the memory footprint. I specifically have seen a user apply this on Prefect Server and they said it helped, but it still could not handle their scale (1M mapped tasks). For you though, it might be able to help.
a
@Kevin Kho Thank you for these tips! Those Dask links and your explanation on using Azure results and returning location only were particularly helpful. After implementing some of those changes (including setting that environment variable for memory trimming and returning only a prefect result location rather than the actual object itself). I'm running into issues now where the prefect k8s job is dying due to OOM error. I've upped the memory request and limit supplied as flow config like so:
Copy code
Flow("my-flow",
run_config=KubernetesRun(memory_limit='20G,                             memory_request='6G')
)
but I'm still getting the OOM error. I think the next thing to try would be to run the mapped task in batches since the numerous futures seems to be causing issues. It seems like Dask won't release any memory of a subtask until they're all complete which is causing these issues since the cumulative memory adds up when there is a large number of mapped tasks. Any suggestions for how to implement that dynamically if given like a batch size? I can slice up the iterable that's currently being mapped over into smaller batches but not sure what the idiomatic way of taking each batch and mapping a task across it is without running into the same issue all over again (holding all tasks in memory until every future result is complete)
Just to see what would happen I ran the flow using a
LocalDaskExecutor
instead of the
DaskExecutor
which spun up a Dask Cluster on k8s with worker pods. Doing this the memory issues are not present. It becomes a cpu intensive task (makes sense with a single worker running multiple threads), but the memory of that pod is only around 500mb compared to what was happening on the Dask worker pods which would almost always error out due to increasing memory usage. Any idea why the memory consumption is so drastically different using
LocalDaskExecutor
vs
DaskExecutor
?
k
Man looks like you made a ton of progress! First, about the canonical way to split things up, I haven’t seen enough users do it quite yet (I think this is something I would need to learn from you if you pull it off). I will say though that if you use
StartFlowRun
to start things, you should chain the batches so that they don’t run at the same time and run into resource constraints. I also would not be surprised if there is unmanaged memory (as seen in the Dask resources), when you use E1-T1-L1-E2-T2-L1. I think distributed compute engines (both Spark and Dask), have the same issues where the unmanaged memory just steadily increases over time. In that sense it might be worth splitting up into separate Flow runs. Now with the executors. I’m not exactly sure what’s going on but here is my guess. The
LocalDaskExecutor
does not use
distributed
and spins up a local Dask multiprocessing pool. The
DaskExecutor
is intended for a cluster and uses
distributed
. About the`LocalDaskExecutor` only uses one worker. If
LocalDaskExecutor
is working great for you, you might be able to squeeze more out with
Copy code
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
I think the default is 2 workers. So there are two parts to this. One is that you provisioned
KubernetesRun
resources, but did you specify
DaskExecutor
resources? Maybe it’s just a matter of the defaults not utilizing all the resources and you can bump it up by passing the
kwargs
? The second part is the memory, and this one is way beyond me but my best (and probably unsatisfactory) guess is that when you have a distributed system, you use more memory when doing stuff like replication for fault tolerance (from what I know with Spark, it has 3 copies of each partition by default. I assume Dask has a similar mechanism). Aside from that, data does not live on the same machine and has to be shuffled around (called shuffling) to perform operations. A lot of data has to be passed from the workers to the master node. In this specific case with Dask (just note I don’t have as much experience), I think it’s all those Futures going out and the master node holds memory for them. So even if you have enough memory in the system, you have to think if the master node has enough memory to collect the data needed onto one node. Out of memory issues can happen on just the driver. Of course, don’t believe me on my Dask knowledge, and you are likely better served asking the Dask Slack channel? But to be honest I don’t see a lot of responses there.
a
@Kevin Kho Thanks for all the help on this. I ended up just sticking with the
LocalDaskExecutor
for now instead of trying to come up with a way to batch things. I compiled most of the lessons learned from this on memory management + Dask in Prefect, would there be any interest in sharing that somewhere?
k
Yeah you can open a Github Discussion in the Prefect repo and I can link people with problems in the future to that.
Would you like to contribute to Prefect docs? Can help with that.