Brian Phillips
02/22/2022, 11:49 PMNAMESPACES
has 4 elements which I map over in the determine_files
task to return a list of dictionaries representing the files to copy for that namespace. Then I want to map copy_files
over each dictionary, but I only see copy_files
running 4 times, not the 4 x len(determine_files_result) I would expect.
determine_files_result = determine_files.map(
namespace=NAMESPACES,
effective_date=unmapped(effective_date_result),
upstream_tasks=[check_data_available_result],
)
copy_files.map(
job=flatten(determine_files_result),
effective_date=unmapped(effective_date_result),
storage=unmapped(storage_result),
upstream_tasks=[ensure_dataset_result, check_data_available_result],
)
Kevin Kho
upstream_tasks=[unmapped(ensure_dataset_result), unmapped(check_data_available_result)],
Brian Phillips
02/23/2022, 12:41 PMcheck_data_available
is mapped over 4 things and I want to map determine_files
over the same 4 things.
determine_files_result = determine_files.map(
namespace=list(NAMESPACES),
effective_date=unmapped(effective_date_result),
upstream_tasks=[check_data_available_result],
)
[2022-02-23 08:40:59-0500] ERROR - prefect.FlowRunner | Unexpected error: TypeError("object of type 'TRIGGERFAIL' has no len()")
Traceback (most recent call last):
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 638, in prepare_upstream_states_for_mapping
mapped_children[edge.upstream_task] = flatten_mapped_children(
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in flatten_mapped_children
[executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in <listcomp>
[executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/executors/local.py", line 28, in submit
return fn(*args, **kwargs)
File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in <lambda>
[executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
TypeError: object of type 'TRIGGERFAIL' has no len()
Kevin Kho
check_data_available_result
upstream failed right? If you want it to push through, just set a trigger=always_run
so it will run even if the upstream errors outBrian Phillips
02/23/2022, 1:48 PMKevin Kho
Brian Phillips
02/23/2022, 2:09 PM