https://prefect.io logo
Title
m

Matthias

05/13/2020, 3:02 PM
Hi, the following code crashes on me in Dask, due to
Large object of size 5.49 MB detected in task graph:
Am I doing something wrong? This is the simplest example I could come up with, that shows this behaviour.
from prefect import task, Flow
import random
import string

def randomString(stringLength=8):
    letters = string.ascii_lowercase+" "
    return ''.join(random.choice(letters) for i in range(stringLength))


@task
def extract():
    ls =[]
    for i in range(200):
        ls.append({
            "name": randomString(random.randint(5,15)),
            "description": randomString(200)
        })
    return ls

@task
def extract_details(m):
    ls = []
    for i in range(random.randint(23,100)):
        ls.append({
            "name": randomString(random.randint(20,30)),
            "whatsthis": randomString(200),
            "whatsthat": randomString(200)
        })
    return ls

@task
def transform(l):
    return [item for sublist in l for item in sublist]

@task
def load(l):
    print(l['name'])

with Flow(
    "ETL Test"
) as flow:
    data = extract()
    detail_data = extract_details.map(data)
    transformed = transform(detail_data)
    load.map(transformed)
All the randomization is just to be sure, that no Caching is going in somewhere
d

David Ojeda

05/13/2020, 3:32 PM
Can you share more info on how the flow is executed? Is it on prefect core with a DaskExecutor?
m

Matthias

05/13/2020, 3:34 PM
Sure, it is on prefect core with DaskExecutor. My environment:
RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
        "address": "<tcp://dask-scheduler:8786>"  # Address of a Dask scheduler
    },
)
Dask has a scheduler and one worker running right now, no further configuration done.
d

David Ojeda

05/13/2020, 3:37 PM
So the dask scheduler and dask worker are launched separately with
dask-scheduler
and
dask-worker
with no extra arguments?
m

Matthias

05/13/2020, 3:40 PM
yep, each in its own Docker Container
d

David Ojeda

05/13/2020, 3:48 PM
Sorry, I was trying to reproduce it on my side, but there is something I don’t understand… do you
.register
your flow or do you
.run
it ? The fact that you use a
RemoteEnvironment
confused me because I thought you were not using a prefect server… I think I misunderstood: you are on prefect open-source but you are using your own local server, is that correct?
m

Matthias

05/13/2020, 3:50 PM
ha, sorry. I mixed up the product names. I have a prefect server running. I register the flow and ran it from the UI
d

David Ojeda

05/13/2020, 3:51 PM
ok got it, let me do that too on my side then
👍 1
j

Jim Crist-Harif

05/13/2020, 4:20 PM
Hi, the following code crashes on me in Dask, due to
Large object of size 5.49 MB detected in task graph:
The message you posted is used as a warning in dask (it's there to indicate that there may be potential performance issues with your workload), but it shouldn't crash. Can you post the traceback that you're seeing?
d

David Ojeda

05/13/2020, 4:22 PM
Got a reproduction here:
[2020-05-13 16:20:50] INFO - prefect.CloudTaskRunner | Task 'load': Starting task run...
/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/distributed/worker.py:3342: UserWarning: Large object of size 5.53 MB detected in task graph:
  (None, 0, {<Edge (key=l): transform to load>: <Suc ...  succeeded.">})
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
j

Jim Crist-Harif

05/13/2020, 4:23 PM
Right, but that's a warning, not an error. Is it erroring somewhere later on?
m

Matthias

05/13/2020, 4:25 PM
I suspect its running out of memory at some point. Thats the last thing I see.
Thanks for reproducing, @David Ojeda
d

David Ojeda

05/13/2020, 4:27 PM
I don’t think it will run out of memory… It is currently blocked at that last step and dask gets unresponsive (holding the GIL)… but I don’t see the memory growing
m

Matthias

05/13/2020, 4:29 PM
True, does it fail because of missing heartbeat then? It btw. does not matter what you do in transform(). The issue is related to the mapping, smaller datasets it works.
j

Jim Crist-Harif

05/13/2020, 4:30 PM
Smaller in data size, or smaller in number of mapped tasks?
Ah, got it reproduced
m

Matthias

05/13/2020, 4:33 PM
Smaller in number of mapped tasks. I could not find the point where it breaks
j

Jim Crist-Harif

05/13/2020, 4:46 PM
I'm going to keep debugging for a bit, but so far it looks like this has to do with the way prefect is currently using dask to handle mapped tasks (which isn't ideal). I suspect this issue will go away with the upcoming mapping refactor (see https://github.com/PrefectHQ/prefect/issues/2041).
m

Matthias

05/13/2020, 4:50 PM
Any idea on the best workaround for the time being?
j

Jim Crist-Harif

05/13/2020, 5:10 PM
So the issue here is that the graph being submitted for the mapped
load
tasks is huge in serialized size. This has two causes: • The way prefect is currently written, each serialized function contains a bunch of local state, and is slow to pickle/large to serialize. • The graph your submitting ends up with at minimum 4600 parallelized
load
tasks (maximum 20000, depending on the randint result). The 4600 tasks * 1MiB per large function = 4 GiB just to serialize the graph alone. Your graph would eventually complete, but it's hanging on the step communicating the
load
tasks to the scheduler, which is naturally slow given the size of the graph (if I run it locally, I see scheduler memory usage skyrocket). Unfortunately there's no good workaround until the mapping refactor is done here, as it's baked into prefect's current internals. One possible option (if you don't care about process-level parallelism) is to use a different executor. Anything contained in a single process won't have this restriction (the
LocalDaskExecutor
or
LocalExecutor
).
m

Matthias

05/13/2020, 5:23 PM
Alright. I would like to have the parallelism though. I will see how to proceed here 🙂 Thanks for figuring it out!
@David Ojeda @Jim Crist-Harif I found a nasty and kinda disgusting workaround. Basically, in
transform
I
json.dump
the list of dicts to a file and then in the next function
json.load
the file again and map against that, while having an upstream dependency made explicit. Works in my environment, as the docker Containers share a volume.
d

David Ojeda

05/14/2020, 11:59 AM
I don’t think I would say nasty: In my case most of the input/output of our tasks is a class encapsulating a file that is stored somewhere else. That way the data moving around is small. This was way before there were result handlers in prefect, which may also be a reasonable solution (I don’t know them well yet)