I have a mapped task that needs to wait until a pr...
# ask-community
t
I have a mapped task that needs to wait until a previous mapped task is 100% complete before starting. If I add upstream_tasks=[fp_validation_predicted_locations] it kicks off task 1 when task 1 of fp_validation_predicted_locations finishes. How can I have it wait until they are all finished? Can I do upstream_tasks=[unmapped(fp_validation_predicted_locations)]?
k
Will look into this, @Trevor Kramer.
Can you tell me a bit more about your use case in needing breadth first exceution?
t
it is essentially a reduce step in map reduce
the last step in the pipeline is to summarize the previous steps and that needs to be done over all mapped runs at once
k
I think you can apply the reduce at the end and it will only execute when the mapped tasks are done.
Copy code
from prefect import task, Flow
import prefect

@task
def xyz(x):
    return x + 1

@task
def dummy(list_x):
    return sum(list_x)

@task
def xyz2(x):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(x*10)
    return x*10


with Flow(name="Example") as flow:
    a = xyz.map([1,2,3])
    b = xyz2.map(a)
    c = dummy(b)       # this is a reduce

flow.run()
Then all of b will finish before it goes to the reduce
t
the issue is that in our case the input to the summary step is "a" in your example but needs to wait until "b" is done
Copy code
from prefect import task, Flow
import prefect

@task
def xyz(x):
    return x + 1

@task
def dummy(list_x):
    return sum(list_x)

@task
def xyz2(x):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(x*10)
    return x*10



with Flow(name="Example") as flow:
    a = xyz.map([1,2,3])
    b = xyz2.map(a)
    c = dummy(a, upstream_tasks[b])       # this is a reduce

flow.run()
k
Gotcha! Thanks for clarifying. Will think.
You can include it in the reduce but not use it like this:
Copy code
@task
def dummy(list_x, list_y):
    return sum(list_x)
    
with Flow(name="Example") as flow:
    a = xyz.map([1,2,3])
    b = xyz2.map(a)
    c = dummy(a, b)
I know that’s a bit hacky so will think a bit more
Sorry, I’m really not seeing anything else.
Btw, I tested your example with the upstream_tasks and it seems to work as you want. Could you try running this?
Copy code
from prefect import task, Flow
import prefect

@task
def xyz(x):
    return x + 1

@task
def dummy(list_x):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(sum(list_x))
    return sum(list_x)

@task
def xyz2(x):
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(x*10)
    return x*10

with Flow(name="Example") as flow:
    a = xyz.map([1,2,3])
    b = xyz2.map(a)
    c = dummy(a, upstream_tasks=[b])       # this is a reduce

flow.run()
If you share more about your code and configuration, maybe we can identify what’s causing your issue?