Thread
#prefect-community
    Brian Phillips

    Brian Phillips

    7 months ago
    I'm trying to implement flat-mapping in a flow, but the final task doesn't seem to run the correct number of times. Code in thread.
    NAMESPACES
    has 4 elements which I map over in the
    determine_files
    task to return a list of dictionaries representing the files to copy for that namespace. Then I want to map
    copy_files
    over each dictionary, but I only see
    copy_files
    running 4 times, not the 4 x len(determine_files_result) I would expect.
    determine_files_result = determine_files.map(
        namespace=NAMESPACES,
        effective_date=unmapped(effective_date_result),
        upstream_tasks=[check_data_available_result],
    )
    copy_files.map(
        job=flatten(determine_files_result),
        effective_date=unmapped(effective_date_result),
        storage=unmapped(storage_result),
        upstream_tasks=[ensure_dataset_result, check_data_available_result],
     )
    Kevin Kho

    Kevin Kho

    7 months ago
    Is your upstream_task 1:1 with mapped entries? If not, try adding unmapped around them also
    upstream_tasks=[unmapped(ensure_dataset_result), unmapped(check_data_available_result)],
    Brian Phillips

    Brian Phillips

    7 months ago
    That was it, thanks Kevin!
    @Kevin Kho one follow up question, when the upstream task fails here I get a prefect error.
    check_data_available
    is mapped over 4 things and I want to map
    determine_files
    over the same 4 things.
    determine_files_result = determine_files.map(
            namespace=list(NAMESPACES),
            effective_date=unmapped(effective_date_result),
            upstream_tasks=[check_data_available_result],
        )
    [2022-02-23 08:40:59-0500] ERROR - prefect.FlowRunner | Unexpected error: TypeError("object of type 'TRIGGERFAIL' has no len()")
    Traceback (most recent call last):
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
        executors.prepare_upstream_states_for_mapping(
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 638, in prepare_upstream_states_for_mapping
        mapped_children[edge.upstream_task] = flatten_mapped_children(
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in flatten_mapped_children
        [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in <listcomp>
        [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/executors/local.py", line 28, in submit
        return fn(*args, **kwargs)
      File "/Users/bphillips/workspace/exos-chronos-flows/.venv/lib/python3.10/site-packages/prefect/utilities/executors.py", line 728, in <lambda>
        [executor.submit(lambda c: len(c._result.value), c) for c in mapped_children]
    TypeError: object of type 'TRIGGERFAIL' has no len()
    Kevin Kho

    Kevin Kho

    7 months ago
    this is failing because the corresponding
    check_data_available_result
    upstream failed right? If you want it to push through, just set a
    trigger=always_run
    so it will run even if the upstream errors out
    Brian Phillips

    Brian Phillips

    7 months ago
    I don't want to push it through, I'm just confused by the prefect type error
    I would expect the flow to just fail gracefully
    Kevin Kho

    Kevin Kho

    7 months ago
    Ohh I know what you are saying
    This will be solved by this
    Brian Phillips

    Brian Phillips

    7 months ago
    Thanks for the link, I'll leave a note to revisit after this is merged