https://prefect.io logo
j

Jitesh Khandelwal

02/02/2021, 10:49 AM
Hello everyone, I have a question. Is it possible to rerun only a subset of the mapped tasks of a flow ? Like only the ones which have failed.
p

Peter Roelants

02/02/2021, 11:20 AM
Not that I know of. However, you might be able to use Triggers to only run a downstream task on failed tasks
j

Jitesh Khandelwal

02/02/2021, 11:26 AM
I didn't understand how Triggers can be used here. Below is a toy program I am working with. Can you show an example of how a trigger can be used here ?
Copy code
import random

import prefect
from prefect import task, Parameter, Flow


@task
def say_world(x):
    if random.random() > .5:
        1 / 0
    return 'world {}'.format(x)

@task
def say_hello(x):
    return 'hello {}'.format(x)

@task
def pliss_print(p):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('### {}'.format(p))


with Flow("param") as flow:
    p = Parameter("roots", default=['A', 'B', 'C'])
    r1 = say_world.map(p)
    r2 = say_hello.map(r1)
    pliss_print.map(r2)


if __name__ == '__main__':
    flow.run()
p

Peter Roelants

02/02/2021, 12:03 PM
What you could try:
Copy code
@task(trigger=prefect.triggers.any_failed)
def pliss_print(p):
  ...
j

Jitesh Khandelwal

02/03/2021, 7:21 AM
This will ensure that
pliss_print
will run even if the upstream task failed right ?
My goal is to somehow run only the branch that failed. For example, in the below flow, if
say_world(A)
fails, I want to restart just that and not
say_world(B) and say_world(C)
Copy code
__ say_world(A) __                   __ say_hello(r1A) __                  __ pliss_print(r2A)
       /                    \               /                      \              /
[A,B,C] --  say_world(B) --  [r1A, r1B, r1C] --  say_hello(r1B) -- [r2A, r2B, r2C]  -- pliss_print(r2B)
       \ __              __ /               \ __                __ /               \__            
            say_world(C)                         say_hello(r1C)                        pliss_print(r2C)
• I tried to use results caching to achieve this, but apparently caching doesn't work on mapped tasks I guess ? • I am able to achieve this using some logic inside the task function, to check if the task was already run before and if yes then raise a success signal.
p

Peter Roelants

02/03/2021, 9:07 AM
I think I misunderstood you before. What about retrying the
say_world
task at the point of failure?, e.g.:
Copy code
@task(max_retries=5, retry_delay=timedelta(0))
def say_world(x):
    ...
This should only restart a single task for a single mapped input, it shouldn't restart the all mapped tasks afaik.
Btw, it should be possible to combine result caching with mapped tasks: https://docs.prefect.io/core/advanced_tutorials/using-results.html#mapping
j

Jitesh Khandelwal

02/03/2021, 10:04 AM
• my use case is where the retries didn't help and someone will manually run later • result caching of mapped tasks should be useful, I'll check, thanks! 👍