With prefect 2 how can I add dependence between ma...
# ask-community
r
With prefect 2 how can I add dependence between mapped tasks? I want to skip any mapped downstream tasks if the corresponding mapped upstream task fails but without a direct link between the tasks. This is what I have so far:
Copy code
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger

@task
def add_one(x):
    if x==1:
        raise Exception("Raised exception")
    return x+1

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return

@flow
def mapped_flow_not_dependent():
    a = list([0,2,3])
    b = add_one.map(a, return_state=True)
    c = add_one.map(b, return_state=True)
    d = do_something.map(a, return_state=True, wait_for = [c])
    
    print(c)
    print(d)
    
    return "Flow completes"

if _name_ == "_main_":
    mapped_flow_not_dependent()
One state in c being failed means none of following do_something tasks run, whereas I would like all of the do_something tasks to run apart from ones where c is failed. I can get the desired behaviour by linking the tasks explicitly: changing the argument of do_something from a to c (and removing the wait_for kwarg).
๐Ÿ‘€ 2
t
I would like to know this too!
๐Ÿ‘ 1
p
Do I understand correctly - you're trying to infer from
c
how
do_something
should proceed with
a
?
r
yes - I only want to do_something for the elements in a (or some other unrelated variable) if the corresponding c state is completed
๐Ÿค” 1
p
I don't remember the details but is there some alternative to map like submit with a Parallel taskrunner?
would that give the option to parameterize the function call?
or maybe change the tasks to flows?
r
I'm not sure about submit - I would like to do it with map if possible. Likewise for changing the tasks to flows! If you replace
d = do_something.map(a, return_state=True, wait_for = [c])
with
d = do_something.map(c, return_state=True)
then the desired behaviour is retrieved. But I specifically want to be able to enforce dependence for mapped tasks which would otherwise not be dependent.
k
Currently, the only way you can force this to happen to have
c
as an argument of
do_something
. If you want this to be something that Prefect supports, I encourage you to create a GitHub issue for this on Prefect GitHub page
r
Thanks @Khuyen Tran. So wait_for only works for tasks which aren't mapped?
k
It worked when you use
task.map()
, didnโ€™t it? Iโ€™m not sure if I understood your question
r
Sorry @Khuyen Tran for the confusion. There are three different cases that I think help explain what I mean. 1. For unmapped tasks, you can use
wait_for
to not run downstream tasks if an upstream one does not reach a Completed state, even if the downstream tasks do not take the upstream state as an argument. 2. For mapped tasks which do take an upstream state as an argument, only the corresponding downstream tasks do not run if a given upstream task does not enter a Completed state. 3. For mapped tasks which use
wait_for
, all of the downstream tasks do not run if any one of the upstream tasks enters a failed state. Here's an example of the three different cases which I hope will help you understand what I mean:
Copy code
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger

@task
def add_one(x):
    if x==1:
        raise Exception("Raised exception")
    return x+1

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return

@flow
def wait_for_mwe():
    # No mapping, using wait_for but not argument
    a = 1
    b = add_one(a, return_state=True)
    c_1 = do_something(a, return_state=True, wait_for = b)

    # Mapping, using argument
    a = list([1,2,3])
    b = add_one.map(a, return_state=True)
    c_2 = do_something.map(b, return_state=True)

    # Mapping, using wait_for but not argument
    a = list([1,2,3])
    b = add_one.map(a, return_state=True)
    c_3 = do_something.map(a, return_state=True, wait_for = b)

    return c_1, c_2, c_3

if __name__ == "__main__":
    c_1, c_2, c_3 = wait_for_mwe()

    # State is NotReady
    print("Expecting NotReady state:")
    print(c_1)

    # Two completed states and one NotReady state
    print("Expecting 1 NotReady and 2 Completed:")
    print(c_2)

    # All states are NotReady! :(
    print("Expecting 1 NotReady and 2 Completed:")
    print(c_3)
a
have you tried allow_failure?
Copy code
wait_for = [allow_failure(c)]
example:
Copy code
from prefect import task, flow, get_run_logger, allow_failure


@task
def ingest_data():
    return 42


@task
def transform_data(x: int) -> int:
    if True:
        raise ValueError("Non-deterministic error has occured.")
    else:
        return x * 42


@task
def clean_up_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Cleaning up ๐Ÿงน")


@flow
def allow_flaky_transformation_to_pass():
    data = ingest_data.submit()
    result = transform_data.submit(data)
    clean_up_task.submit(wait_for=[allow_failure(result)])


if __name__ == "__main__":
    allow_flaky_transformation_to_pass()
๐Ÿงน 1
๐Ÿ‘€ 1
r
I tried
allow_failure
in my mwe, but it means that all of the mapped downstream tasks run, even if there was a failure in a corresponding upstream task. ๐Ÿ˜ž
p
๐Ÿ™Œ 2
gratitude thank you 1
โœ… 1
k
This one should work as expected:
Copy code
from prefect import flow, task, get_run_logger


@task
def add_one(x):
    if x == 2:
        raise Exception("Raised exception")
    return x + 1

@task 
def add_two(x):
    if x == 2:
        raise Exception("Raised exception")
    return x + 2 

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return


@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):

    b = add_one.map(a)
    c = add_two.map(b)
    d = [
        do_something.submit(item)
        for future, item in zip(c, a)
        if future.wait().type == "COMPLETED"
    ]
    return "Flow completes"
๐Ÿ™Œ 1
โœ… 1
Graph:
Or you can do this based on @Anna Gellerโ€™s suggestion:
Copy code
@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):

    b = add_one.map(a)
    c = add_two.map(b)
    d = [
        do_something.submit(item, return_state=True, wait_for=future)
        for item, future in zip(a, c)
    ]
    return "Flow completes"
๐Ÿ™Œ 1
โœ… 1
a
some more examples:
Copy code
from prefect import task, flow


@task
def upstream_task(item):
    if item == "c":
        raise Exception("this upstream task failed")
    return str(item) + "+1"


@task
def downstream_task(item):
    return str(item) + "+2"


@flow
def demo():
    items = ["a", "b", "c", "d"]
    first = upstream_task.map(items)
    downstream_task.map(first)  # runs only for a, b, and d. c is in NotReady state


if __name__ == "__main__":
    demo()
โŒ 1
Copy code
from prefect import flow, task, get_run_logger, allow_failure


@task
def extract():
    return [1, 2, 3]


@task
def add_one(x):
    if x == 2:
        raise Exception("Something is not right")
    return x + 1


@task
def add_two(x):
    return x + 2


@task
def cleanup_task():
    get_run_logger().info("Cleaning up e.g. removing temp Ray cluster")


@flow
def map_with_cleanup_task():
    a = extract()
    b = add_one.map(a)
    c = add_two.map(b)
    cleanup_task.submit(wait_for=[allow_failure(c)])


if __name__ == "__main__":
    map_with_cleanup_task()
โŒ 1
r
Thanks guys! Edit: Didn't realise at first but those last 2 suggestions don't result in the desired behaviour: in the first one the task takes an upstream task as an argument which I was trying to avoid, and in the second one the cleanup task seems to only be submitted once despite there being two completed upstream tasks.
๐Ÿ™Œ 2