Thread
#prefect-community
    t

    Trevor Kramer

    1 year ago
    I have a mapped task that needs to wait until a previous mapped task is 100% complete before starting. If I add upstream_tasks=[fp_validation_predicted_locations] it kicks off task 1 when task 1 of fp_validation_predicted_locations finishes. How can I have it wait until they are all finished? Can I do upstream_tasks=[unmapped(fp_validation_predicted_locations)]?
    Kevin Kho

    Kevin Kho

    1 year ago
    Will look into this, @Trevor Kramer.
    Can you tell me a bit more about your use case in needing breadth first exceution?
    t

    Trevor Kramer

    1 year ago
    it is essentially a reduce step in map reduce
    the last step in the pipeline is to summarize the previous steps and that needs to be done over all mapped runs at once
    Kevin Kho

    Kevin Kho

    1 year ago
    I think you can apply the reduce at the end and it will only execute when the mapped tasks are done.
    from prefect import task, Flow
    import prefect
    
    @task
    def xyz(x):
        return x + 1
    
    @task
    def dummy(list_x):
        return sum(list_x)
    
    @task
    def xyz2(x):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(x*10)
        return x*10
    
    
    with Flow(name="Example") as flow:
        a = xyz.map([1,2,3])
        b = xyz2.map(a)
        c = dummy(b)       # this is a reduce
    
    flow.run()
    Then all of b will finish before it goes to the reduce
    t

    Trevor Kramer

    1 year ago
    the issue is that in our case the input to the summary step is "a" in your example but needs to wait until "b" is done
    from prefect import task, Flow
    import prefect
    
    @task
    def xyz(x):
        return x + 1
    
    @task
    def dummy(list_x):
        return sum(list_x)
    
    @task
    def xyz2(x):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(x*10)
        return x*10
    
    
    
    with Flow(name="Example") as flow:
        a = xyz.map([1,2,3])
        b = xyz2.map(a)
        c = dummy(a, upstream_tasks[b])       # this is a reduce
    
    flow.run()
    Kevin Kho

    Kevin Kho

    1 year ago
    Gotcha! Thanks for clarifying. Will think.
    You can include it in the reduce but not use it like this:
    @task
    def dummy(list_x, list_y):
        return sum(list_x)
        
    with Flow(name="Example") as flow:
        a = xyz.map([1,2,3])
        b = xyz2.map(a)
        c = dummy(a, b)
    I know that’s a bit hacky so will think a bit more
    Sorry, I’m really not seeing anything else.
    Btw, I tested your example with the upstream_tasks and it seems to work as you want. Could you try running this?
    from prefect import task, Flow
    import prefect
    
    @task
    def xyz(x):
        return x + 1
    
    @task
    def dummy(list_x):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(sum(list_x))
        return sum(list_x)
    
    @task
    def xyz2(x):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(x*10)
        return x*10
    
    with Flow(name="Example") as flow:
        a = xyz.map([1,2,3])
        b = xyz2.map(a)
        c = dummy(a, upstream_tasks=[b])       # this is a reduce
    
    flow.run()
    If you share more about your code and configuration, maybe we can identify what’s causing your issue?