Hello, we have a workflow for a single dataset tha...
# prefect-server
a
Hello, we have a workflow for a single dataset that we want to run in batch over many datasets. For this we are attempting to create a "super flow" (instead of starting many separate flows in a loop) which creates child flows with create_flow_run. However, the
create_flow_run
task always fails when we use it with
map
. Also, the log does not contain any exceptions or reasons for errors. • Are we doing something wrong or is it a bug? • How can we get full logs of the tasks that are run within create_flow_run? (in parent_flow_succeeding) • The prefect UI shows no runs (zero count, no run history) for parent_flow_succeeding although it was run, it only shows runs for child_flow.
Minimal example:
Copy code
import os

os.environ["PREFECT__LOGGING__LEVEL"] = "DEBUG"

from prefect import Client, Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


@task
def log_child_task(parameter):
    print(f"child_task(parameter={parameter})")


@task
def log_parent_task(children_parameters):
    print(f"parent_task(children_parameters={children_parameters})")


with Flow("child_flow") as child_flow:
    parameter = Parameter("parameter")
    log_child_task(parameter)


with Flow("parent_flow_failing") as parent_flow_failing:
    children_parameters = Parameter("children_parameters")

    a = log_parent_task(children_parameters)

    # Run a child_flow for every dict of parameters
    child_run_ids = create_flow_run.map(
        parameters=children_parameters,
        flow_name=unmapped("child_flow"),
        upstream_tasks=[a],
    )
    flow_runs_finished = wait_for_flow_run.map(flow_run_id=child_run_ids)


with Flow("parent_flow_succeeding") as parent_flow_succeeding:
    children_parameters = Parameter("children_parameters")

    a = log_parent_task(children_parameters)

    # Same, but only one child_flow
    child_run_id = create_flow_run(
        parameters=children_parameters[0],
        flow_name="child_flow",
        upstream_tasks=[a],
    )
    flow_run_finished = wait_for_flow_run(flow_run_id=child_run_id)


def run_parent_flow():
    # Register the flows so that they can be found on the server:
    project_name = "test"
    Client().create_project(project_name=project_name)
    parent_flow_failing.register(project_name=project_name)
    child_flow.register(project_name=project_name)
    parent_flow_succeeding.register(project_name=project_name)

    print("A flow using create_flow_run.map")
    state = parent_flow_failing.run(
        parameters={
            "children_parameters": [
                {"parameter": 1},
                {"parameter": 2},
            ]
        },
    )

    print("\nA flow using create_flow_run without map")
    state = parent_flow_succeeding.run(
        parameters={
            "children_parameters": [
                {"parameter": 1},
                {"parameter": 2},
            ]
        },
    )


if __name__ == "__main__":
    run_parent_flow()
The log shows:
Copy code
[2021-09-22 10:55:32+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Starting task run...
[2021-09-22 10:55:32+0200] DEBUG - prefect.TaskRunner | Task 'create_flow_run': Handling state change from Pending to Failed
[2021-09-22 10:55:32+0200] INFO - prefect.TaskRunner | Task 'create_flow_run': Finished task run for task with final state: 'Failed'
k
Hey @Andreas Eisenbarth, the code looks alright. I made a quick test on my end.
create_flow_run
gets the state of the subflow. So could you go to the logs on the subflow and check what happened there?
a
Thanks! In the debugger,
state.results[<Task: create_flow_run>].message
showed "At least one upstream state has an unmappable result." This was caused because
map
expected
upstream_tasks
to be mappable. I had thought upstream_tasks was an internal argument and not one that the map function will try to map over. So when using
upstream_tasks
in
map
you should wrap it as
map(..., upstream_tasks=unmapped([upstream_task1]))
? But that causes a TypeError("'unmapped' object is not iterable") in flow.py:set_dependencies.
k
upstream_tasks takes a list so I think it should be
upstream_tasks=[unmapped(upstream_task
)]`
1