Can anyone tell me what `KeyError: 110` is? I thin...
# ask-community
t
Can anyone tell me what
KeyError: 110
is? I think it is a Prefect error, though it may be Dask. The stack trace has it coming out of the executor.py code. (trace to follow in thread)
Copy code
── 08:48:01 | ERROR   | Unexpected error: KeyError(110)
Traceback (most recent call last):
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
    final_states = executor.wait(
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/executors/dask.py", line 638, in wait
    return dask.compute(
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/base.py", line 568, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
    results = get_async(
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 514, in get_async
    raise_exception(exc, tb)
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 325, in reraise
    raise exc
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 223, in execute_task
    result = _execute_task(task, data)
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 564, in _build_flattened_state
    new_state.result = state._result.from_value(state.result[index])  # type: ignore
KeyError: 110
└── 08:48:01 | ERROR   | Unexpected error occured in FlowRunner: KeyError(110)
k
Hey @Tim Enders, I haven’t seen this before. Does it always happen for the Flow, or just sometimes? Does it happen at the same point? Are you using the LocalDaskExecutor or DaskExecutor?
t
sorry, was AFK for travel most of yesterday. I get the Key Error everytime right now when flattening the data out after a set of map calls
k
No worries, could you share a minimal code example?
t
Flow definition:
Copy code
proj_envs = list_project_environments()
    batch_files = fetch_environment_details.map(unmapped(client), proj_envs)
    real_details = really_get_the_details.map(unmapped(client), batch_files)
    flat_data = flatten_json.map(
        real_details, "platform", "project_environment_details"
    )
    schema = generate_schema(flatten(flat_data))
    object_uri = upload_gcs(flatten(flat_data), ["project", "id"], "project_environment_details")
k
Copy code
flat_data = flatten_json.map(
        real_details, "platform", "project_environment_details"
    )
This “platform” and “project_environment_details” supposed to be unmapped? I think it’s mapping over the string letter by letter
t
duh, thank you
k
Did that work? I’m not sure if the
unmapped(client)
could be causing it also
t
don't think so. we use that in many places without issue
k
Gotcha on a Dask executor or LocalDaskExecutor?
t
LocalDask
👍 1
Now I get KeyError 62
I is still happening between
flatten_json
and
generate_schema
in the flow
k
Are
proj_envs
and
batch_files
the same length?
t
No, they are multiple levels of mapping out and splitting up the data
but I only want to flatten out the data that becomes
flat_data
k
Sorry I’m a bit confused, but I think this is related to the issue. Let’s say
proj_envs
gives me length 3, does
batch_files
give me more than length 3? and then is
real_details
longer than
batch_files
too?
t
proj_envs is length X, which is large. I then batch things up into files, with gives us batch_files, len Y, small. Then real_details is len Z, which is even bigger than X. but at that point I only care about real_details and flattening that out with the
flatten
call
k
My suspicion is that if you
proj_envs
is length 10,
batch_files
may be length 2, but it tries to map over 10 things because the upstream dependency has 10 items.
I am trying to recreate
Sorry, I’m confused how you’re able to bring down the length in the mapping operation with the consecutive maps. Could you give me an example of how you reduce it?
t
Copy code
proj_envs = [{k: v for k, v in row.items()} for row in result]
    for x in range(0, len(proj_envs), batch):
        bat = proj_envs[x : x + batch]
        env_batches.append(bat)

    return env_batches[:2]
This is doen on the results of a DB query. Instead of returning each row (
proj_envs
) I return a list of batches of rows. Currently only 2 batches to limit the dataset for testing
my batch size is currently 5000, so instead of 10000 items to map, I have 2
if I don't map the
flatten_json
I don't get the right data into the function
and if I call
flatten
there I run out of memory (which is a problem)
What is the memory model of a Flow like? Does every step stay in memory for the entirety of the flow? Does a task that is not needed downstream get cleaned up? It seems to me that it all stays around until the end. That ends up being a problem for me on large datasets. Can you return and map a generator to reduce the memory footprint?
k
I’ll answer the memory footprint first. They do stay into memory unless you explicitly garbage collect so they get cleared (hopefully cleared, since Python garbage is iffy sometimes). We don’t support generators. So in order to manage memory, users do something like:
Copy code
@task
def abc():
    df = get_data()
    # save df
    df.to_parquet(location)

    del df
    gc.collect()

    return location
t
ok, I may need to try that
k
And then the downstream task uses location to load in that data and then perform operations
t
wrapping that call to
gc.collect()
might be a nice feature add.... something like
garbage(value)
that wraps up the del and gc calls together.
I know I would always ask about a import of
gc
and if it is needed in a code review
k
I think this might already provide memory footprint reduction without the gc.collect as I think only the return is held but I’m not super sure.
t
Well, I just found a typo where I am returning the wrong thing in flatten_json. So this may have all been a wild goose chase, but I learned a lot. Thank you.
k
Oh. No worries! Here to help 👍