Hi, the following code crashes on me in Dask, due ...
# prefect-community
Hi, the following code crashes on me in Dask, due to
Large object of size 5.49 MB detected in task graph:
Am I doing something wrong? This is the simplest example I could come up with, that shows this behaviour.
Copy code
from prefect import task, Flow
import random
import string

def randomString(stringLength=8):
    letters = string.ascii_lowercase+" "
    return ''.join(random.choice(letters) for i in range(stringLength))

def extract():
    ls =[]
    for i in range(200):
            "name": randomString(random.randint(5,15)),
            "description": randomString(200)
    return ls

def extract_details(m):
    ls = []
    for i in range(random.randint(23,100)):
            "name": randomString(random.randint(20,30)),
            "whatsthis": randomString(200),
            "whatsthat": randomString(200)
    return ls

def transform(l):
    return [item for sublist in l for item in sublist]

def load(l):

with Flow(
    "ETL Test"
) as flow:
    data = extract()
    detail_data = extract_details.map(data)
    transformed = transform(detail_data)
All the randomization is just to be sure, that no Caching is going in somewhere
Can you share more info on how the flow is executed? Is it on prefect core with a DaskExecutor?
Sure, it is on prefect core with DaskExecutor. My environment:
Copy code
        "address": "<tcp://dask-scheduler:8786>"  # Address of a Dask scheduler
Dask has a scheduler and one worker running right now, no further configuration done.
So the dask scheduler and dask worker are launched separately with
with no extra arguments?
yep, each in its own Docker Container
Sorry, I was trying to reproduce it on my side, but there is something I don’t understand… do you
your flow or do you
it ? The fact that you use a
confused me because I thought you were not using a prefect server… I think I misunderstood: you are on prefect open-source but you are using your own local server, is that correct?
ha, sorry. I mixed up the product names. I have a prefect server running. I register the flow and ran it from the UI
ok got it, let me do that too on my side then
👍 1
Hi, the following code crashes on me in Dask, due to
Large object of size 5.49 MB detected in task graph:
The message you posted is used as a warning in dask (it's there to indicate that there may be potential performance issues with your workload), but it shouldn't crash. Can you post the traceback that you're seeing?
Got a reproduction here:
Copy code
[2020-05-13 16:20:50] INFO - prefect.CloudTaskRunner | Task 'load': Starting task run...
/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/distributed/worker.py:3342: UserWarning: Large object of size 5.53 MB detected in task graph:
  (None, 0, {<Edge (key=l): transform to load>: <Suc ...  succeeded.">})
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
Right, but that's a warning, not an error. Is it erroring somewhere later on?
I suspect its running out of memory at some point. Thats the last thing I see.
Thanks for reproducing, @David Ojeda
I don’t think it will run out of memory… It is currently blocked at that last step and dask gets unresponsive (holding the GIL)… but I don’t see the memory growing
True, does it fail because of missing heartbeat then? It btw. does not matter what you do in transform(). The issue is related to the mapping, smaller datasets it works.
Smaller in data size, or smaller in number of mapped tasks?
Ah, got it reproduced
Smaller in number of mapped tasks. I could not find the point where it breaks
I'm going to keep debugging for a bit, but so far it looks like this has to do with the way prefect is currently using dask to handle mapped tasks (which isn't ideal). I suspect this issue will go away with the upcoming mapping refactor (see https://github.com/PrefectHQ/prefect/issues/2041).
Any idea on the best workaround for the time being?
So the issue here is that the graph being submitted for the mapped
tasks is huge in serialized size. This has two causes: • The way prefect is currently written, each serialized function contains a bunch of local state, and is slow to pickle/large to serialize. • The graph your submitting ends up with at minimum 4600 parallelized
tasks (maximum 20000, depending on the randint result). The 4600 tasks * 1MiB per large function = 4 GiB just to serialize the graph alone. Your graph would eventually complete, but it's hanging on the step communicating the
tasks to the scheduler, which is naturally slow given the size of the graph (if I run it locally, I see scheduler memory usage skyrocket). Unfortunately there's no good workaround until the mapping refactor is done here, as it's baked into prefect's current internals. One possible option (if you don't care about process-level parallelism) is to use a different executor. Anything contained in a single process won't have this restriction (the
Alright. I would like to have the parallelism though. I will see how to proceed here 🙂 Thanks for figuring it out!
@David Ojeda @Jim Crist-Harif I found a nasty and kinda disgusting workaround. Basically, in
the list of dicts to a file and then in the next function
the file again and map against that, while having an upstream dependency made explicit. Works in my environment, as the docker Containers share a volume.
I don’t think I would say nasty: In my case most of the input/output of our tasks is a class encapsulating a file that is stored somewhere else. That way the data moving around is small. This was way before there were result handlers in prefect, which may also be a reasonable solution (I don’t know them well yet)