https://prefect.io logo
Title
s

Sylvain Hazard

11/24/2021, 3:16 PM
Hello there ! I'm having trouble using
get_task_run_result
to access data from a subflow. This example should reproduce the issue :
from prefect import Flow, task, Parameter, unmapped
from prefect.tasks.prefect import create_flow_run, get_task_run_result

@task(slug="plus-one")
def plus_one(x: int):
    return x+1

with Flow("Child") as child_flow:
    x = Parameter("x", default=10)
    po = plus_one(x)

@task
def get_params():
    return [{"x": i} for i in range(5)]

with Flow("Parent") as parent_flow:
    params = get_params()
    flow_run_ids = create_flow_run.map(flow_name=unmapped("Child"), parameters=params)
    results = get_task_run_result.map(flow_run_ids, unmapped("plus-one"))
When running the parent flow, I get issues that look like :
Error during execution of task: ValueError("No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'plus-one'}}, 'flow_run_id': {'_eq': '3316e187-adda-461d-814f-29a6ffc059a4'}, 'map_index': {'_eq': -1}}")
Is there something I'm missing with how slugs work ?
a

Anna Geller

11/24/2021, 3:18 PM
I think you need to add -1 to get the slug name, so it would be:
unmapped("plus-one-1")
additionally, your child flow must be registered before using it in a parent flow (you probably know that)
s

Sylvain Hazard

11/24/2021, 3:20 PM
I get the same issue with
results = get_task_run_result.map(flow_run_ids, unmapped("plus-one-1"))
unfortunately. Both flows are registered in the script, I just removed it in order to not paste too much code in here.
a

Anna Geller

11/24/2021, 3:21 PM
does it work if you don’t use mapping?
s

Sylvain Hazard

11/24/2021, 3:25 PM
Does not seem to be working with either
plus-one-1
or
plus-one
when mapping is removed. Running on
0.15.9
if at all relevant.
Seems like the issue is with the
slug
parameter in task declaration, this works :
@task
def plus_one(x: int):
    return x+1

with Flow("Child") as child_flow:
    x = Parameter("x", default=10)
    po = plus_one(x)

@task
def get_params():
    return [{"x": i} for i in range(5)]

with Flow("Parent") as parent_flow:
    params = get_params()
    flow_run_ids = create_flow_run.map(flow_name=unmapped("Child"), parameters=params)
    results = get_task_run_result.map(flow_run_ids, unmapped("plus_one-1"))
🙌 1