Sylvain Hazard
11/24/2021, 3:16 PMget_task_run_result
to access data from a subflow.
This example should reproduce the issue :
from prefect import Flow, task, Parameter, unmapped
from prefect.tasks.prefect import create_flow_run, get_task_run_result
@task(slug="plus-one")
def plus_one(x: int):
return x+1
with Flow("Child") as child_flow:
x = Parameter("x", default=10)
po = plus_one(x)
@task
def get_params():
return [{"x": i} for i in range(5)]
with Flow("Parent") as parent_flow:
params = get_params()
flow_run_ids = create_flow_run.map(flow_name=unmapped("Child"), parameters=params)
results = get_task_run_result.map(flow_run_ids, unmapped("plus-one"))
When running the parent flow, I get issues that look like :
Error during execution of task: ValueError("No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'plus-one'}}, 'flow_run_id': {'_eq': '3316e187-adda-461d-814f-29a6ffc059a4'}, 'map_index': {'_eq': -1}}")
Is there something I'm missing with how slugs work ?Anna Geller
11/24/2021, 3:18 PMunmapped("plus-one-1")
Sylvain Hazard
11/24/2021, 3:20 PMresults = get_task_run_result.map(flow_run_ids, unmapped("plus-one-1"))
unfortunately.
Both flows are registered in the script, I just removed it in order to not paste too much code in here.Anna Geller
11/24/2021, 3:21 PMSylvain Hazard
11/24/2021, 3:25 PMplus-one-1
or plus-one
when mapping is removed.
Running on 0.15.9
if at all relevant.slug
parameter in task declaration, this works :
@task
def plus_one(x: int):
return x+1
with Flow("Child") as child_flow:
x = Parameter("x", default=10)
po = plus_one(x)
@task
def get_params():
return [{"x": i} for i in range(5)]
with Flow("Parent") as parent_flow:
params = get_params()
flow_run_ids = create_flow_run.map(flow_name=unmapped("Child"), parameters=params)
results = get_task_run_result.map(flow_run_ids, unmapped("plus_one-1"))