Andreas Eisenbarth
09/22/2021, 9:13 AMcreate_flow_run
task always fails when we use it with map
. Also, the log does not contain any exceptions or reasons for errors.
• Are we doing something wrong or is it a bug?
• How can we get full logs of the tasks that are run within create_flow_run? (in parent_flow_succeeding)
• The prefect UI shows no runs (zero count, no run history) for parent_flow_succeeding although it was run, it only shows runs for child_flow.import os
os.environ["PREFECT__LOGGING__LEVEL"] = "DEBUG"
from prefect import Client, Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task
def log_child_task(parameter):
print(f"child_task(parameter={parameter})")
@task
def log_parent_task(children_parameters):
print(f"parent_task(children_parameters={children_parameters})")
with Flow("child_flow") as child_flow:
parameter = Parameter("parameter")
log_child_task(parameter)
with Flow("parent_flow_failing") as parent_flow_failing:
children_parameters = Parameter("children_parameters")
a = log_parent_task(children_parameters)
# Run a child_flow for every dict of parameters
child_run_ids = create_flow_run.map(
parameters=children_parameters,
flow_name=unmapped("child_flow"),
upstream_tasks=[a],
)
flow_runs_finished = wait_for_flow_run.map(flow_run_id=child_run_ids)
with Flow("parent_flow_succeeding") as parent_flow_succeeding:
children_parameters = Parameter("children_parameters")
a = log_parent_task(children_parameters)
# Same, but only one child_flow
child_run_id = create_flow_run(
parameters=children_parameters[0],
flow_name="child_flow",
upstream_tasks=[a],
)
flow_run_finished = wait_for_flow_run(flow_run_id=child_run_id)
def run_parent_flow():
# Register the flows so that they can be found on the server:
project_name = "test"
Client().create_project(project_name=project_name)
parent_flow_failing.register(project_name=project_name)
child_flow.register(project_name=project_name)
parent_flow_succeeding.register(project_name=project_name)
print("A flow using create_flow_run.map")
state = parent_flow_failing.run(
parameters={
"children_parameters": [
{"parameter": 1},
{"parameter": 2},
]
},
)
print("\nA flow using create_flow_run without map")
state = parent_flow_succeeding.run(
parameters={
"children_parameters": [
{"parameter": 1},
{"parameter": 2},
]
},
)
if __name__ == "__main__":
run_parent_flow()
The log shows:
[2021-09-22 10:55:32+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-09-22 10:55:32+0200] DEBUG - prefect.TaskRunner | Task 'create_flow_run': Handling state change from Pending to Failed
[2021-09-22 10:55:32+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
Kevin Kho
09/22/2021, 2:19 PMcreate_flow_run
gets the state of the subflow. So could you go to the logs on the subflow and check what happened there?Andreas Eisenbarth
09/23/2021, 3:08 PMstate.results[<Task: create_flow_run>].message
showed "At least one upstream state has an unmappable result." This was caused because map
expected upstream_tasks
to be mappable. I had thought upstream_tasks was an internal argument and not one that the map function will try to map over.
So when using upstream_tasks
in map
you should wrap it as map(..., upstream_tasks=unmapped([upstream_task1]))
?
But that causes a TypeError("'unmapped' object is not iterable") in flow.py:set_dependencies.Kevin Kho
09/23/2021, 3:15 PMupstream_tasks=[unmapped(upstream_task
)]`