I'm encountering a problem where the run-time stat...
# ask-community
a
I'm encountering a problem where the run-time state DAG visualization doesn't match with the flow's creation-time DAG. You can see in the first screenshot that
perform_permutation
and
generate_summary_df
are both upstream dependencies of
process_report_df
. However, the flow fails at
process_report_df
without ever running the two upstream tasks. The error is due to not having data from upstream task
generate_summary_df
. The post-hoc state visualization doesn't show the two upstream tasks at all. Why does this happen? I will post the log and task results in the comments.
Debug level log:
Copy code
.....
'get_permute_odds_ratio_object': Starting task run...
[2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'get_permute_odds_ratio_object': Handling state change from Pending to Mapped
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'get_permute_odds_ratio_object': Finished task run for task with final state: 'Mapped'
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'perform_permutation': Starting task run...
[2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'perform_permutation': Handling state change from Pending to Mapped
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'perform_permutation': Finished task run for task with final state: 'Mapped'
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'generate_summary_df': Starting task run...
[2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'generate_summary_df': Handling state change from Pending to Mapped
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'generate_summary_df': Finished task run for task with final state: 'Mapped'
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'process_report_df': Starting task run...
[2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'process_report_df': Handling state change from Pending to Running
[2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'process_report_df': Calling task.run() method...
[2022-01-05 14:11:47-0500] ERROR - prefect.TaskRunner | Task 'process_report_df': Exception encountered during task execution!
Traceback (most recent call last):
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/corradin_ovp_utils/prefect_flows/step2.py", line 175, in process_report_df
    all_report_df_melted = pd.concat( previous_steps_result_files + report_df_melted_list)
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/pandas/util/_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/pandas/core/reshape/concat.py", line 294, in concat
    op = _Concatenator(
  File "/lab/corradin_biobank/FOR_AN/OVP/corradin_ovp_utils/.venv/lib/python3.8/site-packages/pandas/core/reshape/concat.py", line 351, in __init__
    raise ValueError("No objects to concatenate")
ValueError: No objects to concatenate
[2022-01-05 14:11:47-0500] DEBUG - prefect.TaskRunner | Task 'process_report_df': Handling state change from Running to Failed
[2022-01-05 14:11:47-0500] INFO - prefect.TaskRunner | Task 'process_report_df': Finished task run for task with final state: 'Failed'
....
[2022-01-05 14:11:47-0500] DEBUG - prefect.FlowRunner | Flow 'OVP_step2': Handling state change from Running to Failed
Task results
k
What do you see under the UI in schematic? Is the DAG equivalent?
Or are you just using
flow.run()
?
a
I'm just using
flow.run()
right now
k
I suspect this is Dask related. Are you using autoscaling?
a
What you saw is the default linear executor . I have disabled Dask. My flow works for some parameters but not others. Trying to figure out why
k
I suspect the mapping lists might not have the same length in some step?
Or are you using
.map()
with a string input like
.map("test")
which can treat this as a list of 4 letters. You need an explicit
unmapped
for that
a
You are so right! One of the very upstream task returns an empty list so none of the tasks in that branch ran at all. Thank you for the hint!!! Now to find out why it returned an empty list.... On a related note, some error like "Mapped over empty list" or "mapped over uneven lists" from Prefect would be super helpful 😅. What would you do to guard against this problem for every mapped task in your flow? Would you just do an
assert list_to_be_mapped
in each of the task? What if your flow has tons of mapped tasks?
k
I think we are fine with mapping over empty lists because there are scenarios like checking a folder for new files and then mapping over the new files. This is tricky because using the
map
normally suggests dynamicism so you don’t know the length ahead of time. State handlers are also done per mapped item so I’m not seeing a good place to put an assert other than the downstream task, which errors as a result
👍 1