Matthias
05/13/2020, 3:02 PMLarge object of size 5.49 MB detected in task graph:
Am I doing something wrong? This is the simplest example I could come up with, that shows this behaviour.from prefect import task, Flow
import random
import string
def randomString(stringLength=8):
letters = string.ascii_lowercase+" "
return ''.join(random.choice(letters) for i in range(stringLength))
@task
def extract():
ls =[]
for i in range(200):
ls.append({
"name": randomString(random.randint(5,15)),
"description": randomString(200)
})
return ls
@task
def extract_details(m):
ls = []
for i in range(random.randint(23,100)):
ls.append({
"name": randomString(random.randint(20,30)),
"whatsthis": randomString(200),
"whatsthat": randomString(200)
})
return ls
@task
def transform(l):
return [item for sublist in l for item in sublist]
@task
def load(l):
print(l['name'])
with Flow(
"ETL Test"
) as flow:
data = extract()
detail_data = extract_details.map(data)
transformed = transform(detail_data)
load.map(transformed)
David Ojeda
05/13/2020, 3:32 PMMatthias
05/13/2020, 3:34 PMRemoteEnvironment(
executor="prefect.engine.executors.DaskExecutor",
executor_kwargs={
"address": "<tcp://dask-scheduler:8786>" # Address of a Dask scheduler
},
)
David Ojeda
05/13/2020, 3:37 PMdask-scheduler
and dask-worker
with no extra arguments?Matthias
05/13/2020, 3:40 PMDavid Ojeda
05/13/2020, 3:48 PM.register
your flow or do you .run
it ?
The fact that you use a RemoteEnvironment
confused me because I thought you were not using a prefect server… I think I misunderstood: you are on prefect open-source but you are using your own local server, is that correct?Matthias
05/13/2020, 3:50 PMDavid Ojeda
05/13/2020, 3:51 PMJim Crist-Harif
05/13/2020, 4:20 PMHi, the following code crashes on me in Dask, due toThe message you posted is used as a warning in dask (it's there to indicate that there may be potential performance issues with your workload), but it shouldn't crash. Can you post the traceback that you're seeing?Large object of size 5.49 MB detected in task graph:
David Ojeda
05/13/2020, 4:22 PM[2020-05-13 16:20:50] INFO - prefect.CloudTaskRunner | Task 'load': Starting task run...
/Users/david/.virtualenvs/iguazu-venv/lib/python3.8/site-packages/distributed/worker.py:3342: UserWarning: Large object of size 5.53 MB detected in task graph:
(None, 0, {<Edge (key=l): transform to load>: <Suc ... succeeded.">})
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
Jim Crist-Harif
05/13/2020, 4:23 PMMatthias
05/13/2020, 4:25 PMDavid Ojeda
05/13/2020, 4:27 PMMatthias
05/13/2020, 4:29 PMJim Crist-Harif
05/13/2020, 4:30 PMMatthias
05/13/2020, 4:33 PMJim Crist-Harif
05/13/2020, 4:46 PMMatthias
05/13/2020, 4:50 PMJim Crist-Harif
05/13/2020, 5:10 PMload
tasks is huge in serialized size. This has two causes:
• The way prefect is currently written, each serialized function contains a bunch of local state, and is slow to pickle/large to serialize.
• The graph your submitting ends up with at minimum 4600 parallelized load
tasks (maximum 20000, depending on the randint result).
The 4600 tasks * 1MiB per large function = 4 GiB just to serialize the graph alone. Your graph would eventually complete, but it's hanging on the step communicating the load
tasks to the scheduler, which is naturally slow given the size of the graph (if I run it locally, I see scheduler memory usage skyrocket).
Unfortunately there's no good workaround until the mapping refactor is done here, as it's baked into prefect's current internals.
One possible option (if you don't care about process-level parallelism) is to use a different executor. Anything contained in a single process won't have this restriction (the LocalDaskExecutor
or LocalExecutor
).Matthias
05/13/2020, 5:23 PMtransform
I json.dump
the list of dicts to a file and then in the next function json.load
the file again and map against that, while having an upstream dependency made explicit. Works in my environment, as the docker Containers share a volume.David Ojeda
05/14/2020, 11:59 AM