Hey all have recently revisited running dependent ...
# ask-community
m
Hey all have recently revisited running dependent flows... I have issues accessing a result of a task from a flow that another flow is dependent on... what am I doing incorrectly?
flow_a
Copy code
@task(name='second_task_a', log_stdout=True)
def second_task(mssg: str):
    time.sleep(10)
    return f'{mssg} and second_task_a_done'



@task(name='third_task_a', log_stdout=True, slug="last_task")
def third_task(mssg: str):
    time.sleep(10)
    return f'{mssg} and second_task_a_done'


with Flow('flow_a') as flow_a:
    first_task_message = first_task()
    second_task_message = second_task(first_task_message)
    third_task_message = third_task(second_task_message)
runtime= KubernetesRun DaskExecutor S3Results S3Storage using prefect cloud backend .... all flows are registered as expected including parent flow
a
There are two things I would do: 1. Just by calling the
create_flow_run
, you may be trying to get the result of the task run before the child flow run is completed. A better approach would be to add `wait_for_flow_run`() in between 2.
get_task_run_result
expects the task slug rather than the task name. The slug usually ends with a number (e.g. -1) indicating the order in which this task was added to the flow in case you called this task twice in your flow This may solve the issue:
Copy code
from prefect import Flow
from prefect.tasks.prefect import (
    create_flow_run,
    wait_for_flow_run,
    get_task_run_result,
)


with Flow("parent_flow") as parent_flow:
    flow_run_id_flow_a = create_flow_run(
        flow_name="flow_a", project_name="example_template_project"
    )
    flowrunview_flow_a = wait_for_flow_run(
        flow_run_id_flow_a, raise_final_state=True, stream_logs=True
    )
    flow_a_result = get_task_run_result(
        flow_run_id_flow_a, "last_task-1", upstream_tasks=[flowrunview_flow_a]
    )
m
is there anyway to ensure a concrete task slug assignment?
thanks a ton
a
itโ€™s really always -1 ๐Ÿ˜„ unless you called the same task multiple times in your flow.
m
so still not working with your feedback unfortunately ๐Ÿ˜ž _flow_a_
Copy code
@task(name='first_task_a', log_stdout=True)
def first_task():
    time.sleep(10)
    mssg = 'first_task_a_done'
    <http://logger.info|logger.info>(mssg)
    return mssg



@task(name='second_task_a', log_stdout=True)
def second_task(mssg: str):
    time.sleep(10)
    return f'{mssg} and second_task_a_done'



@task(name='third_task_a', log_stdout=True, slug="last_task")
def third_task(mssg: str):
    time.sleep(10)
    return f'{mssg} and second_task_a_done'


with Flow('flow_a') as flow_a:
    first_task_message = first_task()
    second_task_message = second_task(first_task_message)
    third_task_message = third_task(second_task_message)
_parent_flow_
Copy code
with Flow('parent_flow') as parent_flow:
    child_flow_a_run_id =  create_flow_run(
        flow_name="flow_a",
        project_name="example_template_project")

    child_flow_b_run_id =  create_flow_run(
        flow_name="flow_b",
        project_name="example_template_project")

    child_flow_a_flowrunview = wait_for_flow_run(
        child_flow_a_run_id, raise_final_state=True, stream_logs=True
    )

    flow_a_result = get_task_run_result(
        child_flow_a_run_id, "last_task-1", upstream_tasks=[child_flow_a_flowrunview]
    )

    child_flow_c_run_id =  create_flow_run(
        flow_name="flow_c",
        parameters={
            "flow_a_result": flow_a_result
        },
        project_name="example_template_project")
error---
Copy code
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'last_task'}}, 'flow_run_id': {'_eq': 'e65eee5a-037f-4578-82c9-dc0ab671be1f'}, 'map_index': {'_eq': -1}}
a
You must provide the real name of the task in the task slug ๐Ÿ™‚
Copy code
flow_a_result = get_task_run_result(
        child_flow_a_run_id, "third_task_a-1", upstream_tasks=[child_flow_a_flowrunview]
    )
you donโ€™t need to set the slug here:
Copy code
@task(name='third_task_a', log_stdout=True, slug="last_task")
๐Ÿ™Œ 1
m
works!!!