https://prefect.io logo
Title
z

Zviri

05/01/2020, 12:09 PM
Hi everyone! I have just created my first Prefect flow and deployed it to my manually set up Dask cluster via the Prefect Cloud. Everything went smoothly and I have to say this by far the best data pipeline tool I have used. However, I noticed that when I run the Flow from the cloud instead of just a script (
flow.run
) the memory consumption of my Dask Workers gradually grows until they run out of memory. When I run the same flow from a script everything finishes correctly and there are no memory issues what so ever. I am using the latest
prefect
and
dask
packages. Has anyone experienced something similar? Is there anything that could be done about it? Thanks a lot.
j

Jenny

05/01/2020, 1:13 PM
Hi Zviri! Welcome to Prefect! That's not something we'd expect to happen so we'd like to look into it further. Our Dask experts should be online a little later (they're not on EST) and I'm going to ask for their input.
z

Zviri

05/01/2020, 3:13 PM
Thanks a a lot, I'll be checking this thread regularly to provide more details.
👍 1
j

Jim Crist-Harif

05/01/2020, 3:18 PM
Hi Zviri, The difference between running with Dask and running with a local executor is that Dask will attempt to run your tasks in parallel. This can be nice, your tasks progress through faster since things can be done in parallel. But, if your tasks use lots of memory, running them in parallel may exhaust your resources (say one task takes 1 GB, if you have 8 running at a time on a worker that's 8 GB). Dask takes some of this into account and will generally try to do the right thing, but there are certain task structures that will break things. Generally Dask works best if your individual tasks are quick-ish (0.5-60s each) and use a modest amount of memory (MiBs not GiBs), and you break your work into many smaller tasks. High memory tasks are harder to schedule in parallel without breaking things (especially if you're using mapped tasks) - they can work, but they may require a bit more care. If you're already following the above practices, I'd need to know more about your flow and execution environment to help debug further.
z

Zviri

05/01/2020, 3:40 PM
Hi Jim, thank for your input. I would say that I follow the best practices, see below. I've set up an OCR processing pipeline broken down into the following tasks: 1. fetch URLs from a database (returns list) 2. mapped task: fetch and save files to S3 3. mapped task: preprocess PDF document (loads/stores data from/to S3) 4. mapped task: OCR document (loads/stores data from/to S3) I currently run relatively small flows, each processes at most 2k documents at one time, but ideally I would like to scale it up to much larger numbers. Each task generally takes less than 60 seconds. The workers are set up to use process level parallelism and only 1 thread (most of the code is pure python so threads are not very effective). I currently use 4GB of memory for each worker process, which is plenty (the worker rarely uses more than 2GB of memory) when as I said start the flow from a python script. However, when I start the flow from the Cloud using a RemoteDaskEnvironment the same flow that finished when ran from the script does not go beyond the first two steps (fetching URLs from the db and then fetching/storing them to S3) without exhausting the whole 4GBs of memory.
j

Jim Crist-Harif

05/01/2020, 3:43 PM
Are the outputs from your tasks large in size (the things your task returns, not things you may write directly to s3 within the task)? Depending on execution order, dask workers will store intermediate values between tasks locally, these may build up over time (but generally dask will try to run tasks that let it remove intermediate values before running tasks that create new intermediate values).
z

Zviri

05/03/2020, 6:31 PM
@Jim Crist-Harif sorry, for not responding earlier I was out on Saturday. The output of each task is small, I only use some named tuples with a couple of fields to pass between tasks. I never pass any actual data between tasks, just some URIs, etc. I was reading into debugging/profiling of dask workers, and while there is plenty of materials around CPU performance I have not found much on memory profiling. Is there any tool you guys use to profile the worker's memory to at least get an idea of what is pilling up? Currently, I'm just trying things blindly which is not yielding much.
Ok, so I did some digging around and memory profiled one of my dask-workers using
heapy
. I found out that what is clogging up the memory of my workers are these strings (there are tons of them):
<p>The error code is a string that uniquely identifies an error condition. It is meant to be read and understood by programs that detect and handle errors by type. </p>
<p class="title">
    <b>Amazon S3 error codes</b>
</p>
<ul>
    <li>
        <ul>
            <li>
                <p>
                    <i>Code:</i> AccessDenied </p>
            </li>
            <li>
                <p>
                    <i>Description:</i> Access Denied</p>
            </li>
            <li>
                <p>
                    <i>HTTP Status Code:</i> 403 Forbidden</p>
            </li>
            <li>
                <p>
                    <i>SOAP Fault Code Prefix:</i> Client</p>
            </li>
        </ul>
    </li>
...
So at first, I assumed a library of mine used for accessing s3 is doing this, so I stripped down my flow to only 2 steps: 1. fetch a list of URLs from db 2. download them using requests but do not store the results anywhere So now there was no code of mine doing anything with s3. But the issue was still occurring. So I figured the only remaining culprit could be the S3 storage that I used for my flows. To test this hypothesis I replaced the S3 storage with Local storage. As expected the issue disappeared after this. I have no idea what the root cause of this is so I will continue digging. But it seems to be connected to boto3 and to originate in the S3 storage.
@Jim Crist-Harif I managed to work around this problem using the Local storage in combination with sshfs -- I'm not very happy about it since it is quite fragile, but it will do for the time being. At this point I'm 100% it's the S3 storage causing the huge memory consumption. I was not able to find the root cause yet and the strings I pasted above are probably just some side effects. I checked the S3 storage and S3ResultHandler classes and nothing there struck me as causing any leakage. AFAIK boto3 sessions/clients don't need any explicit release management. I'm using
--nprocs
with the workers so it might have to do something with the forking logic.
j

Jim Crist-Harif

05/05/2020, 12:58 PM
Hmmm, interesting. Great debugging work @Zviri! I'm not sure what would cause that with prefect's s3 storage. If you can create a reproducible example and file an issue, we could help debug and track down what's going on.
z

Zviri

05/05/2020, 2:32 PM
@Jim Crist-Harif will do
@Jim Crist-Harif I noticed that in the meantime (just the last week or so) you went from version 0.10.3 to 0.10.7 in which you also refactored the S3 storage and it seems the issue went away. So I'm not going to submit any issues for now. You guys are on a roll! keep up the good work 💪.
j

Jim Crist-Harif

05/07/2020, 5:51 PM
Glad to hear it!