https://prefect.io logo
Title
r

roady

11/25/2022, 10:24 AM
With prefect 2 how can I add dependence between mapped tasks? I want to skip any mapped downstream tasks if the corresponding mapped upstream task fails but without a direct link between the tasks. This is what I have so far:
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger

@task
def add_one(x):
    if x==1:
        raise Exception("Raised exception")
    return x+1

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return

@flow
def mapped_flow_not_dependent():
    a = list([0,2,3])
    b = add_one.map(a, return_state=True)
    c = add_one.map(b, return_state=True)
    d = do_something.map(a, return_state=True, wait_for = [c])
    
    print(c)
    print(d)
    
    return "Flow completes"

if _name_ == "_main_":
    mapped_flow_not_dependent()
One state in c being failed means none of following do_something tasks run, whereas I would like all of the do_something tasks to run apart from ones where c is failed. I can get the desired behaviour by linking the tasks explicitly: changing the argument of do_something from a to c (and removing the wait_for kwarg).
๐Ÿ‘€ 2
t

Tim-Oliver

11/25/2022, 10:27 AM
I would like to know this too!
๐Ÿ‘ 1
p

Pekka

11/25/2022, 11:17 AM
Do I understand correctly - you're trying to infer from
c
how
do_something
should proceed with
a
?
r

roady

11/25/2022, 11:22 AM
yes - I only want to do_something for the elements in a (or some other unrelated variable) if the corresponding c state is completed
๐Ÿค” 1
p

Pekka

11/25/2022, 11:25 AM
I don't remember the details but is there some alternative to map like submit with a Parallel taskrunner?
would that give the option to parameterize the function call?
or maybe change the tasks to flows?
r

roady

11/25/2022, 11:32 AM
I'm not sure about submit - I would like to do it with map if possible. Likewise for changing the tasks to flows! If you replace
d = do_something.map(a, return_state=True, wait_for = [c])
with
d = do_something.map(c, return_state=True)
then the desired behaviour is retrieved. But I specifically want to be able to enforce dependence for mapped tasks which would otherwise not be dependent.
k

Khuyen Tran

11/30/2022, 5:15 PM
Currently, the only way you can force this to happen to have
c
as an argument of
do_something
. If you want this to be something that Prefect supports, I encourage you to create a GitHub issue for this on Prefect GitHub page
r

roady

12/01/2022, 8:35 AM
Thanks @Khuyen Tran. So wait_for only works for tasks which aren't mapped?
k

Khuyen Tran

12/01/2022, 3:47 PM
It worked when you use
task.map()
, didnโ€™t it? Iโ€™m not sure if I understood your question
r

roady

12/01/2022, 4:25 PM
Sorry @Khuyen Tran for the confusion. There are three different cases that I think help explain what I mean. 1. For unmapped tasks, you can use
wait_for
to not run downstream tasks if an upstream one does not reach a Completed state, even if the downstream tasks do not take the upstream state as an argument. 2. For mapped tasks which do take an upstream state as an argument, only the corresponding downstream tasks do not run if a given upstream task does not enter a Completed state. 3. For mapped tasks which use
wait_for
, all of the downstream tasks do not run if any one of the upstream tasks enters a failed state. Here's an example of the three different cases which I hope will help you understand what I mean:
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger

@task
def add_one(x):
    if x==1:
        raise Exception("Raised exception")
    return x+1

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return

@flow
def wait_for_mwe():
    # No mapping, using wait_for but not argument
    a = 1
    b = add_one(a, return_state=True)
    c_1 = do_something(a, return_state=True, wait_for = b)

    # Mapping, using argument
    a = list([1,2,3])
    b = add_one.map(a, return_state=True)
    c_2 = do_something.map(b, return_state=True)

    # Mapping, using wait_for but not argument
    a = list([1,2,3])
    b = add_one.map(a, return_state=True)
    c_3 = do_something.map(a, return_state=True, wait_for = b)

    return c_1, c_2, c_3

if __name__ == "__main__":
    c_1, c_2, c_3 = wait_for_mwe()

    # State is NotReady
    print("Expecting NotReady state:")
    print(c_1)

    # Two completed states and one NotReady state
    print("Expecting 1 NotReady and 2 Completed:")
    print(c_2)

    # All states are NotReady! :(
    print("Expecting 1 NotReady and 2 Completed:")
    print(c_3)
a

Anna Geller

12/01/2022, 7:05 PM
have you tried allow_failure?
wait_for = [allow_failure(c)]
example:
from prefect import task, flow, get_run_logger, allow_failure


@task
def ingest_data():
    return 42


@task
def transform_data(x: int) -> int:
    if True:
        raise ValueError("Non-deterministic error has occured.")
    else:
        return x * 42


@task
def clean_up_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Cleaning up ๐Ÿงน")


@flow
def allow_flaky_transformation_to_pass():
    data = ingest_data.submit()
    result = transform_data.submit(data)
    clean_up_task.submit(wait_for=[allow_failure(result)])


if __name__ == "__main__":
    allow_flaky_transformation_to_pass()
๐Ÿงน 1
๐Ÿ‘€ 1
r

roady

12/02/2022, 11:07 AM
I tried
allow_failure
in my mwe, but it means that all of the mapped downstream tasks run, even if there was a failure in a corresponding upstream task. ๐Ÿ˜ž
p

Peyton Runyan

12/02/2022, 1:40 PM
@Tim-Oliver I answered the question here: https://prefect-community.slack.com/archives/CL09KU1K7/p1669805071115219
๐Ÿ™Œ 2
:gratitude-thank-you: 1
โœ… 1
k

Khuyen Tran

12/02/2022, 4:12 PM
This one should work as expected:
from prefect import flow, task, get_run_logger


@task
def add_one(x):
    if x == 2:
        raise Exception("Raised exception")
    return x + 1

@task 
def add_two(x):
    if x == 2:
        raise Exception("Raised exception")
    return x + 2 

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return


@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):

    b = add_one.map(a)
    c = add_two.map(b)
    d = [
        do_something.submit(item)
        for future, item in zip(c, a)
        if future.wait().type == "COMPLETED"
    ]
    return "Flow completes"
๐Ÿ™Œ 1
โœ… 1
Graph:
Or you can do this based on @Anna Gellerโ€™s suggestion:
@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):

    b = add_one.map(a)
    c = add_two.map(b)
    d = [
        do_something.submit(item, return_state=True, wait_for=future)
        for item, future in zip(a, c)
    ]
    return "Flow completes"
๐Ÿ™Œ 1
โœ… 1
a

Anna Geller

12/02/2022, 4:16 PM
some more examples:
from prefect import task, flow


@task
def upstream_task(item):
    if item == "c":
        raise Exception("this upstream task failed")
    return str(item) + "+1"


@task
def downstream_task(item):
    return str(item) + "+2"


@flow
def demo():
    items = ["a", "b", "c", "d"]
    first = upstream_task.map(items)
    downstream_task.map(first)  # runs only for a, b, and d. c is in NotReady state


if __name__ == "__main__":
    demo()
โŒ 1
from prefect import flow, task, get_run_logger, allow_failure


@task
def extract():
    return [1, 2, 3]


@task
def add_one(x):
    if x == 2:
        raise Exception("Something is not right")
    return x + 1


@task
def add_two(x):
    return x + 2


@task
def cleanup_task():
    get_run_logger().info("Cleaning up e.g. removing temp Ray cluster")


@flow
def map_with_cleanup_task():
    a = extract()
    b = add_one.map(a)
    c = add_two.map(b)
    cleanup_task.submit(wait_for=[allow_failure(c)])


if __name__ == "__main__":
    map_with_cleanup_task()
โŒ 1
r

roady

12/05/2022, 10:03 AM
Thanks guys! Edit: Didn't realise at first but those last 2 suggestions don't result in the desired behaviour: in the first one the task takes an upstream task as an argument which I was trying to avoid, and in the second one the cleanup task seems to only be submitted once despite there being two completed upstream tasks.
๐Ÿ™Œ 2