Hi, the following code crashes on me in Dask, due ...
# prefect-community
m
Hi, the following code crashes on me in Dask, due to
Large object of size 5.49 MB detected in task graph:
Am I doing something wrong? This is the simplest example I could come up with, that shows this behaviour.
Copy code
from prefect import task, Flow
import random
import string

def randomString(stringLength=8):
    letters = string.ascii_lowercase+" "
    return ''.join(random.choice(letters) for i in range(stringLength))


@task
def extract():
    ls =[]
    for i in range(200):
        ls.append({
            "name": randomString(random.randint(5,15)),
            "description": randomString(200)
        })
    return ls

@task
def extract_details(m):
    ls = []
    for i in range(random.randint(23,100)):
        ls.append({
            "name": randomString(random.randint(20,30)),
            "whatsthis": randomString(200),
            "whatsthat": randomString(200)
        })
    return ls

@task
def transform(l):
    return [item for sublist in l for item in sublist]

@task
def load(l):
    print(l['name'])

with Flow(
    "ETL Test"
) as flow:
    data = extract()
    detail_data = extract_details.map(data)
    transformed = transform(detail_data)
    load.map(transformed)
All the randomization is just to be sure, that no Caching is going in somewhere
d
Can you share more info on how the flow is executed? Is it on prefect core with a DaskExecutor?
m
Sure, it is on prefect core with DaskExecutor. My environment:
Copy code
RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "address": "<tcp://dask-scheduler:8786>"  # Address of a Dask scheduler
    },
)
Dask has a scheduler and one worker running right now, no further configuration done.
d
So the dask scheduler and dask worker are launched separately with
dask-scheduler
and
dask-worker
with no extra arguments?
m
yep, each in its own Docker Container
d
Sorry, I was trying to reproduce it on my side, but there is something I don’t understand… do you
.register
your flow or do you
.run
it ? The fact that you use a
RemoteEnvironment
confused me because I thought you were not using a prefect server… I think I misunderstood: you are on prefect open-source but you are using your own local server, is that correct?
m
ha, sorry. I mixed up the product names. I have a prefect server running. I register the flow and ran it from the UI
d
ok got it, let me do that too on my side then
👍 1
j
Hi, the following code crashes on me in Dask, due to
Large object of size 5.49 MB detected in task graph:
The message you posted is used as a warning in dask (it's there to indicate that there may be potential performance issues with your workload), but it shouldn't crash. Can you post the traceback that you're seeing?
d
Got a reproduction here:
Copy code
[2020-05-13 16:20:50] INFO - prefect.CloudTaskRunner | Task 'load': Starting task run...
/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/distributed/worker.py:3342: UserWarning: Large object of size 5.53 MB detected in task graph:
  (None, 0, {<Edge (key=l): transform to load>: <Suc ...  succeeded.">})
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
j
Right, but that's a warning, not an error. Is it erroring somewhere later on?
m
I suspect its running out of memory at some point. Thats the last thing I see.
Thanks for reproducing, @David Ojeda
d
I don’t think it will run out of memory… It is currently blocked at that last step and dask gets unresponsive (holding the GIL)… but I don’t see the memory growing
m
True, does it fail because of missing heartbeat then? It btw. does not matter what you do in transform(). The issue is related to the mapping, smaller datasets it works.
j
Smaller in data size, or smaller in number of mapped tasks?
Ah, got it reproduced
m
Smaller in number of mapped tasks. I could not find the point where it breaks
j
I'm going to keep debugging for a bit, but so far it looks like this has to do with the way prefect is currently using dask to handle mapped tasks (which isn't ideal). I suspect this issue will go away with the upcoming mapping refactor (see https://github.com/PrefectHQ/prefect/issues/2041).
m
Any idea on the best workaround for the time being?
j
So the issue here is that the graph being submitted for the mapped
load
tasks is huge in serialized size. This has two causes: • The way prefect is currently written, each serialized function contains a bunch of local state, and is slow to pickle/large to serialize. • The graph your submitting ends up with at minimum 4600 parallelized
load
tasks (maximum 20000, depending on the randint result). The 4600 tasks * 1MiB per large function = 4 GiB just to serialize the graph alone. Your graph would eventually complete, but it's hanging on the step communicating the
load
tasks to the scheduler, which is naturally slow given the size of the graph (if I run it locally, I see scheduler memory usage skyrocket). Unfortunately there's no good workaround until the mapping refactor is done here, as it's baked into prefect's current internals. One possible option (if you don't care about process-level parallelism) is to use a different executor. Anything contained in a single process won't have this restriction (the
LocalDaskExecutor
or
LocalExecutor
).
m
Alright. I would like to have the parallelism though. I will see how to proceed here 🙂 Thanks for figuring it out!
@David Ojeda @Jim Crist-Harif I found a nasty and kinda disgusting workaround. Basically, in
transform
I
json.dump
the list of dicts to a file and then in the next function
json.load
the file again and map against that, while having an upstream dependency made explicit. Works in my environment, as the docker Containers share a volume.
d
I don’t think I would say nasty: In my case most of the input/output of our tasks is a class encapsulating a file that is stored somewhere else. That way the data moving around is small. This was way before there were result handlers in prefect, which may also be a reasonable solution (I don’t know them well yet)