Mike Lev

    Mike Lev

    8 months ago
    Hey all have recently revisited running dependent flows... I have issues accessing a result of a task from a flow that another flow is dependent on... what am I doing incorrectly?
    flow_a
    @task(name='second_task_a', log_stdout=True)
    def second_task(mssg: str):
        time.sleep(10)
        return f'{mssg} and second_task_a_done'
    
    
    
    @task(name='third_task_a', log_stdout=True, slug="last_task")
    def third_task(mssg: str):
        time.sleep(10)
        return f'{mssg} and second_task_a_done'
    
    
    with Flow('flow_a') as flow_a:
        first_task_message = first_task()
        second_task_message = second_task(first_task_message)
        third_task_message = third_task(second_task_message)
    runtime= KubernetesRun DaskExecutor S3Results S3Storage using prefect cloud backend .... all flows are registered as expected including parent flow
    Anna Geller

    Anna Geller

    8 months ago
    There are two things I would do:1. Just by calling the
    create_flow_run
    , you may be trying to get the result of the task run before the child flow run is completed. A better approach would be to add wait_for_flow_run() in between 2.
    get_task_run_result
    expects the task slug rather than the task name. The slug usually ends with a number (e.g. -1) indicating the order in which this task was added to the flow in case you called this task twice in your flow This may solve the issue:
    from prefect import Flow
    from prefect.tasks.prefect import (
        create_flow_run,
        wait_for_flow_run,
        get_task_run_result,
    )
    
    
    with Flow("parent_flow") as parent_flow:
        flow_run_id_flow_a = create_flow_run(
            flow_name="flow_a", project_name="example_template_project"
        )
        flowrunview_flow_a = wait_for_flow_run(
            flow_run_id_flow_a, raise_final_state=True, stream_logs=True
        )
        flow_a_result = get_task_run_result(
            flow_run_id_flow_a, "last_task-1", upstream_tasks=[flowrunview_flow_a]
        )
    Mike Lev

    Mike Lev

    8 months ago
    is there anyway to ensure a concrete task slug assignment?
    thanks a ton
    Anna Geller

    Anna Geller

    8 months ago
    itโ€™s really always -1 ๐Ÿ˜„ unless you called the same task multiple times in your flow.
    Mike Lev

    Mike Lev

    8 months ago
    so still not working with your feedback unfortunately ๐Ÿ˜ž flow_a
    @task(name='first_task_a', log_stdout=True)
    def first_task():
        time.sleep(10)
        mssg = 'first_task_a_done'
        <http://logger.info|logger.info>(mssg)
        return mssg
    
    
    
    @task(name='second_task_a', log_stdout=True)
    def second_task(mssg: str):
        time.sleep(10)
        return f'{mssg} and second_task_a_done'
    
    
    
    @task(name='third_task_a', log_stdout=True, slug="last_task")
    def third_task(mssg: str):
        time.sleep(10)
        return f'{mssg} and second_task_a_done'
    
    
    with Flow('flow_a') as flow_a:
        first_task_message = first_task()
        second_task_message = second_task(first_task_message)
        third_task_message = third_task(second_task_message)
    parent_flow
    with Flow('parent_flow') as parent_flow:
        child_flow_a_run_id =  create_flow_run(
            flow_name="flow_a",
            project_name="example_template_project")
    
        child_flow_b_run_id =  create_flow_run(
            flow_name="flow_b",
            project_name="example_template_project")
    
        child_flow_a_flowrunview = wait_for_flow_run(
            child_flow_a_run_id, raise_final_state=True, stream_logs=True
        )
    
        flow_a_result = get_task_run_result(
            child_flow_a_run_id, "last_task-1", upstream_tasks=[child_flow_a_flowrunview]
        )
    
        child_flow_c_run_id =  create_flow_run(
            flow_name="flow_c",
            parameters={
                "flow_a_result": flow_a_result
            },
            project_name="example_template_project")
    error---
    ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'last_task'}}, 'flow_run_id': {'_eq': 'e65eee5a-037f-4578-82c9-dc0ab671be1f'}, 'map_index': {'_eq': -1}}
    Anna Geller

    Anna Geller

    8 months ago
    You must provide the real name of the task in the task slug ๐Ÿ™‚
    flow_a_result = get_task_run_result(
            child_flow_a_run_id, "third_task_a-1", upstream_tasks=[child_flow_a_flowrunview]
        )
    you donโ€™t need to set the slug here:
    @task(name='third_task_a', log_stdout=True, slug="last_task")
    Mike Lev

    Mike Lev

    8 months ago
    works!!!