https://prefect.io logo
t

Tom Shaffner

01/21/2022, 12:04 AM
Is it possible to add a single task at the END of a complex mapped task set? E.g. in the https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines, have a single task that runs after the apply_map tasks. i.e. it runs only once, but only when all the mapped tasks are complete. I tried just giving apply_map a result and setting that as upstream, but doing so set the new task before the mapped tasks in the schematic.
k

Kevin Kho

01/21/2022, 12:44 AM
I gave this a shot and it seemed to work. Here is my code:
Copy code
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x):
    return x + 1

@task
def negate(x):
    return -x

@task
def is_even(x):
    return x % 2 == 0

@task
def something():
    return "end"

def inc_or_negate(x):
    cond = is_even(x)
    # If x is even, increment it
    with case(cond, True):
        res1 = inc(x)
    # If x is odd, negate it
    with case(cond, False):
        res2 = negate(x)
    return merge(res1, res2)

with Flow("apply-map example") as flow:
    result = apply_map(inc_or_negate, range(4))
    x = something()
    x.set_upstream(result)
and schematic generated from a flow run
t

Tom Shaffner

01/21/2022, 3:27 PM
That did it. Verifying it worked helped me debug; it turns out the result I was passing came from earlier in my equivalent of the inc_or_negate function, and it was thus assigning the new flow after that returned item. I reworked to return the last thing in my mapping and it works fine now; thanks!
👍 1
3 Views