Jitesh Khandelwal
02/02/2021, 10:49 AMPeter Roelants
02/02/2021, 11:20 AMJitesh Khandelwal
02/02/2021, 11:26 AMimport random
import prefect
from prefect import task, Parameter, Flow
@task
def say_world(x):
if random.random() > .5:
1 / 0
return 'world {}'.format(x)
@task
def say_hello(x):
return 'hello {}'.format(x)
@task
def pliss_print(p):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>('### {}'.format(p))
with Flow("param") as flow:
p = Parameter("roots", default=['A', 'B', 'C'])
r1 = say_world.map(p)
r2 = say_hello.map(r1)
pliss_print.map(r2)
if __name__ == '__main__':
flow.run()
Peter Roelants
02/02/2021, 12:03 PM@task(trigger=prefect.triggers.any_failed)
def pliss_print(p):
...
Jitesh Khandelwal
02/03/2021, 7:21 AMpliss_print
will run even if the upstream task failed right ?say_world(A)
fails, I want to restart just that and not say_world(B) and say_world(C)
__ say_world(A) __ __ say_hello(r1A) __ __ pliss_print(r2A)
/ \ / \ /
[A,B,C] -- say_world(B) -- [r1A, r1B, r1C] -- say_hello(r1B) -- [r2A, r2B, r2C] -- pliss_print(r2B)
\ __ __ / \ __ __ / \__
say_world(C) say_hello(r1C) pliss_print(r2C)
• I tried to use results caching to achieve this, but apparently caching doesn't work on mapped tasks I guess ?
• I am able to achieve this using some logic inside the task function, to check if the task was already run before and if yes then raise a success signal.Peter Roelants
02/03/2021, 9:07 AMsay_world
task at the point of failure?, e.g.:
@task(max_retries=5, retry_delay=timedelta(0))
def say_world(x):
...
This should only restart a single task for a single mapped input, it shouldn't restart the all mapped tasks afaik.Jitesh Khandelwal
02/03/2021, 10:04 AM