Sylvain Hazard

    Sylvain Hazard

    10 months ago
    Hello there ! I'm having trouble using
    get_task_run_result
    to access data from a subflow. This example should reproduce the issue :
    from prefect import Flow, task, Parameter, unmapped
    from prefect.tasks.prefect import create_flow_run, get_task_run_result
    
    @task(slug="plus-one")
    def plus_one(x: int):
        return x+1
    
    with Flow("Child") as child_flow:
        x = Parameter("x", default=10)
        po = plus_one(x)
    
    @task
    def get_params():
        return [{"x": i} for i in range(5)]
    
    with Flow("Parent") as parent_flow:
        params = get_params()
        flow_run_ids = create_flow_run.map(flow_name=unmapped("Child"), parameters=params)
        results = get_task_run_result.map(flow_run_ids, unmapped("plus-one"))
    When running the parent flow, I get issues that look like :
    Error during execution of task: ValueError("No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'plus-one'}}, 'flow_run_id': {'_eq': '3316e187-adda-461d-814f-29a6ffc059a4'}, 'map_index': {'_eq': -1}}")
    Is there something I'm missing with how slugs work ?
    Anna Geller

    Anna Geller

    10 months ago
    I think you need to add -1 to get the slug name, so it would be:
    unmapped("plus-one-1")
    additionally, your child flow must be registered before using it in a parent flow (you probably know that)
    Sylvain Hazard

    Sylvain Hazard

    10 months ago
    I get the same issue with
    results = get_task_run_result.map(flow_run_ids, unmapped("plus-one-1"))
    unfortunately. Both flows are registered in the script, I just removed it in order to not paste too much code in here.
    Anna Geller

    Anna Geller

    10 months ago
    does it work if you don’t use mapping?
    Sylvain Hazard

    Sylvain Hazard

    10 months ago
    Does not seem to be working with either
    plus-one-1
    or
    plus-one
    when mapping is removed. Running on
    0.15.9
    if at all relevant.
    Seems like the issue is with the
    slug
    parameter in task declaration, this works :
    @task
    def plus_one(x: int):
        return x+1
    
    with Flow("Child") as child_flow:
        x = Parameter("x", default=10)
        po = plus_one(x)
    
    @task
    def get_params():
        return [{"x": i} for i in range(5)]
    
    with Flow("Parent") as parent_flow:
        params = get_params()
        flow_run_ids = create_flow_run.map(flow_name=unmapped("Child"), parameters=params)
        results = get_task_run_result.map(flow_run_ids, unmapped("plus_one-1"))