Mike Lev
01/06/2022, 1:56 PMflow_a
@task(name='second_task_a', log_stdout=True)
def second_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
@task(name='third_task_a', log_stdout=True, slug="last_task")
def third_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
with Flow('flow_a') as flow_a:
first_task_message = first_task()
second_task_message = second_task(first_task_message)
third_task_message = third_task(second_task_message)
runtime=
KubernetesRun
DaskExecutor
S3Results
S3Storage
using prefect cloud backend
.... all flows are registered as expected including parent flowAnna Geller
create_flow_run
, you may be trying to get the result of the task run before the child flow run is completed. A better approach would be to add `wait_for_flow_run`() in between
2. get_task_run_result
expects the task slug rather than the task name. The slug usually ends with a number (e.g. -1) indicating the order in which this task was added to the flow in case you called this task twice in your flow
This may solve the issue:
from prefect import Flow
from prefect.tasks.prefect import (
create_flow_run,
wait_for_flow_run,
get_task_run_result,
)
with Flow("parent_flow") as parent_flow:
flow_run_id_flow_a = create_flow_run(
flow_name="flow_a", project_name="example_template_project"
)
flowrunview_flow_a = wait_for_flow_run(
flow_run_id_flow_a, raise_final_state=True, stream_logs=True
)
flow_a_result = get_task_run_result(
flow_run_id_flow_a, "last_task-1", upstream_tasks=[flowrunview_flow_a]
)
Mike Lev
01/06/2022, 2:13 PMMike Lev
01/06/2022, 2:13 PMAnna Geller
Mike Lev
01/06/2022, 2:26 PM@task(name='first_task_a', log_stdout=True)
def first_task():
time.sleep(10)
mssg = 'first_task_a_done'
<http://logger.info|logger.info>(mssg)
return mssg
@task(name='second_task_a', log_stdout=True)
def second_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
@task(name='third_task_a', log_stdout=True, slug="last_task")
def third_task(mssg: str):
time.sleep(10)
return f'{mssg} and second_task_a_done'
with Flow('flow_a') as flow_a:
first_task_message = first_task()
second_task_message = second_task(first_task_message)
third_task_message = third_task(second_task_message)
_parent_flow_
with Flow('parent_flow') as parent_flow:
child_flow_a_run_id = create_flow_run(
flow_name="flow_a",
project_name="example_template_project")
child_flow_b_run_id = create_flow_run(
flow_name="flow_b",
project_name="example_template_project")
child_flow_a_flowrunview = wait_for_flow_run(
child_flow_a_run_id, raise_final_state=True, stream_logs=True
)
flow_a_result = get_task_run_result(
child_flow_a_run_id, "last_task-1", upstream_tasks=[child_flow_a_flowrunview]
)
child_flow_c_run_id = create_flow_run(
flow_name="flow_c",
parameters={
"flow_a_result": flow_a_result
},
project_name="example_template_project")
error---
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'last_task'}}, 'flow_run_id': {'_eq': 'e65eee5a-037f-4578-82c9-dc0ab671be1f'}, 'map_index': {'_eq': -1}}
Anna Geller
flow_a_result = get_task_run_result(
child_flow_a_run_id, "third_task_a-1", upstream_tasks=[child_flow_a_flowrunview]
)
Anna Geller
@task(name='third_task_a', log_stdout=True, slug="last_task")
Mike Lev
01/06/2022, 2:54 PM