Tim Enders
08/05/2021, 1:18 PMKeyError: 110
is? I think it is a Prefect error, though it may be Dask. The stack trace has it coming out of the executor.py code. (trace to follow in thread)Tim Enders
08/05/2021, 1:18 PM── 08:48:01 | ERROR | Unexpected error: KeyError(110)
Traceback (most recent call last):
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 643, in get_flow_run_state
final_states = executor.wait(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/executors/dask.py", line 638, in wait
return dask.compute(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/base.py", line 568, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/threaded.py", line 79, in get
results = get_async(
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 514, in get_async
raise_exception(exc, tb)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 325, in reraise
raise exc
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/local.py", line 223, in execute_task
result = _execute_task(task, data)
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/tenders/.cache/pypoetry/virtualenvs/platformsh-prefect-bgAYIS-0-py3.8/lib/python3.8/site-packages/prefect/utilities/executors.py", line 564, in _build_flattened_state
new_state.result = state._result.from_value(state.result[index]) # type: ignore
KeyError: 110
└── 08:48:01 | ERROR | Unexpected error occured in FlowRunner: KeyError(110)
Kevin Kho
Tim Enders
08/06/2021, 1:41 PMKevin Kho
Tim Enders
08/06/2021, 1:44 PMproj_envs = list_project_environments()
batch_files = fetch_environment_details.map(unmapped(client), proj_envs)
real_details = really_get_the_details.map(unmapped(client), batch_files)
flat_data = flatten_json.map(
real_details, "platform", "project_environment_details"
)
schema = generate_schema(flatten(flat_data))
object_uri = upload_gcs(flatten(flat_data), ["project", "id"], "project_environment_details")
Kevin Kho
flat_data = flatten_json.map(
real_details, "platform", "project_environment_details"
)
This “platform” and “project_environment_details” supposed to be unmapped? I think it’s mapping over the string letter by letterTim Enders
08/06/2021, 2:04 PMKevin Kho
unmapped(client)
could be causing it alsoTim Enders
08/06/2021, 2:07 PMKevin Kho
Tim Enders
08/06/2021, 2:09 PMTim Enders
08/06/2021, 2:39 PMTim Enders
08/06/2021, 2:40 PMflatten_json
and generate_schema
in the flowKevin Kho
proj_envs
and batch_files
the same length?Tim Enders
08/06/2021, 2:42 PMTim Enders
08/06/2021, 2:44 PMflat_data
Kevin Kho
proj_envs
gives me length 3, does batch_files
give me more than length 3? and then is real_details
longer than batch_files
too?Tim Enders
08/06/2021, 3:03 PMflatten
callKevin Kho
proj_envs
is length 10, batch_files
may be length 2, but it tries to map over 10 things because the upstream dependency has 10 items.Kevin Kho
Kevin Kho
Tim Enders
08/06/2021, 3:20 PMproj_envs = [{k: v for k, v in row.items()} for row in result]
for x in range(0, len(proj_envs), batch):
bat = proj_envs[x : x + batch]
env_batches.append(bat)
return env_batches[:2]
This is doen on the results of a DB query. Instead of returning each row (proj_envs
) I return a list of batches of rows. Currently only 2 batches to limit the dataset for testingTim Enders
08/06/2021, 3:21 PMTim Enders
08/06/2021, 3:25 PMflatten_json
I don't get the right data into the functionTim Enders
08/06/2021, 3:26 PMflatten
there I run out of memory (which is a problem)Tim Enders
08/06/2021, 3:31 PMKevin Kho
@task
def abc():
df = get_data()
# save df
df.to_parquet(location)
del df
gc.collect()
return location
Tim Enders
08/06/2021, 3:46 PMKevin Kho
Tim Enders
08/06/2021, 3:51 PMgc.collect()
might be a nice feature add.... something like garbage(value)
that wraps up the del and gc calls together.Tim Enders
08/06/2021, 3:51 PMgc
and if it is needed in a code reviewKevin Kho
Tim Enders
08/06/2021, 4:25 PMKevin Kho