Thread
#prefect-community
    a

    An Hoang

    8 months ago
    I'm encountering a problem where the run-time state DAG visualization doesn't match with the flow's creation-time DAG. You can see in the first screenshot that
    perform_permutation
    and
    generate_summary_df
    are both upstream dependencies of
    process_report_df
    . However, the flow fails at
    process_report_df
    without ever running the two upstream tasks. The error is due to not having data from upstream task
    generate_summary_df
    . The post-hoc state visualization doesn't show the two upstream tasks at all. Why does this happen? I will post the log and task results in the comments.
    Debug level log:
    .....
    'get_permute_odds_ratio_object': Starting task run...
    [2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'get_permute_odds_ratio_object': Handling state change from Pending to Mapped
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'get_permute_odds_ratio_object': Finished task run for task with final state: 'Mapped'
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'perform_permutation': Starting task run...
    [2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'perform_permutation': Handling state change from Pending to Mapped
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'perform_permutation': Finished task run for task with final state: 'Mapped'
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'generate_summary_df': Starting task run...
    [2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'generate_summary_df': Handling state change from Pending to Mapped
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'generate_summary_df': Finished task run for task with final state: 'Mapped'
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'process_report_df': Starting task run...
    [2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'process_report_df': Handling state change from Pending to Running
    [2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'process_report_df': Calling task.run() method...
    [2022-01-05 14:11:47-0500] ERROR - prefect.TaskRunner | Task 'process_report_df': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/corradin_ovp_utils/prefect_flows/step2.py", line 175, in process_report_df
        all_report_df_melted = pd.concat( previous_steps_result_files + report_df_melted_list)
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/pandas/util/_decorators.py", line 311, in wrapper
        return func(*args, **kwargs)
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/pandas/core/reshape/concat.py", line 294, in concat
        op = _Concatenator(
      File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/pandas/core/reshape/concat.py", line 351, in __init__
        raise ValueError("No objects to concatenate")
    ValueError: No objects to concatenate
    [2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'process_report_df': Handling state change from Running to Failed
    [2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'process_report_df': Finished task run for task with final state: 'Failed'
    ....
    [2022-01-05 14:11:47-0500] DEBUG - prefect.FlowRunner | Flow 'OVP_step2': Handling state change from Running to Failed
    Task results
    Kevin Kho

    Kevin Kho

    8 months ago
    What do you see under the UI in schematic? Is the DAG equivalent?
    Or are you just using
    flow.run()
    ?
    a

    An Hoang

    8 months ago
    I'm just using
    flow.run()
    right now
    Kevin Kho

    Kevin Kho

    8 months ago
    I suspect this is Dask related. Are you using autoscaling?
    a

    An Hoang

    8 months ago
    What you saw is the default linear executor . I have disabled Dask. My flow works for some parameters but not others. Trying to figure out why
    Kevin Kho

    Kevin Kho

    8 months ago
    I suspect the mapping lists might not have the same length in some step?
    Or are you using
    .map()
    with a string input like
    .map("test")
    which can treat this as a list of 4 letters. You need an explicit
    unmapped
    for that
    a

    An Hoang

    8 months ago
    You are so right! One of the very upstream task returns an empty list so none of the tasks in that branch ran at all. Thank you for the hint!!! Now to find out why it returned an empty list.... On a related note, some error like "Mapped over empty list" or "mapped over uneven lists" from Prefect would be super helpful 😅. What would you do to guard against this problem for every mapped task in your flow? Would you just do an
    assert list_to_be_mapped
    in each of the task? What if your flow has tons of mapped tasks?
    Kevin Kho

    Kevin Kho

    8 months ago
    I think we are fine with mapping over empty lists because there are scenarios like checking a folder for new files and then mapping over the new files. This is tricky because using the
    map
    normally suggests dynamicism so you don’t know the length ahead of time. State handlers are also done per mapped item so I’m not seeing a good place to put an assert other than the downstream task, which errors as a result