I'm trying to implement <flat-mapping> in a flow, ...
# prefect-community
b
I'm trying to implement flat-mapping in a flow, but the final task doesn't seem to run the correct number of times. Code in thread.
NAMESPACES
has 4 elements which I map over in the
determine_files
task to return a list of dictionaries representing the files to copy for that namespace. Then I want to map
copy_files
over each dictionary, but I only see
copy_files
running 4 times, not the 4 x len(determine_files_result) I would expect.
Copy code
determine_files_result = determine_files.map(
    namespace=NAMESPACES,
    effective_date=unmapped(effective_date_result),
    upstream_tasks=[check_data_available_result],
)
copy_files.map(
    job=flatten(determine_files_result),
    effective_date=unmapped(effective_date_result),
    storage=unmapped(storage_result),
    upstream_tasks=[ensure_dataset_result, check_data_available_result],
 )
k
Is your upstream_task 1:1 with mapped entries? If not, try adding unmapped around them also
upstream_tasks=[unmapped(ensure_dataset_result), unmapped(check_data_available_result)],
b
That was it, thanks Kevin!
@Kevin Kho one follow up question, when the upstream task fails here I get a prefect error.
check_data_available
is mapped over 4 things and I want to map
determine_files
over the same 4 things.
Copy code
determine_files_result = determine_files.map(
        namespace=list(NAMESPACES),
        effective_date=unmapped(effective_date_result),
        upstream_tasks=[check_data_available_result],
    )
Copy code
[2022-02-23 08:40:59-0500] ERROR - prefect.FlowRunner | Unexpected error: TypeError("object of type 'TRIGGERFAIL' has no len()")
Traceback (most recent call last):
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
    executors.prepare_upstream_states_for_mapping(
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 638, in prepare_upstream_states_for_mapping
    mapped_children[edge.upstream_task] = flatten_mapped_children(
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in flatten_mapped_children
    [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in <listcomp>
    [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/executors/local.py", line 28, in submit
    return fn(*args, **kwargs)
  File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in <lambda>
    [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
TypeError: object of type 'TRIGGERFAIL' has no len()
k
this is failing because the corresponding
check_data_available_result
upstream failed right? If you want it to push through, just set a
trigger=always_run
so it will run even if the upstream errors out
b
I don't want to push it through, I'm just confused by the prefect type error
I would expect the flow to just fail gracefully
k
Ohh I know what you are saying
This will be solved by this
b
Thanks for the link, I'll leave a note to revisit after this is merged