https://prefect.io logo
Title
o

Oscar Krantz

06/17/2022, 1:33 PM
Hi. Beginner question here for prefect 1.0. How would you advise the following type of pattern be implemented? I will have a number of tasks, some of which should have error handling tasks and the final task should only be invoked if the error handlers have either a) not been triggered, or b) succeeded in correcting the error. I've attached a schematic where green arrows represent successes and red failures. If there is no error handler attached to a task then a failure should fail the flow as well
1
a

Anna Geller

06/17/2022, 1:36 PM
thanks for the image, it helps a lot understanding what you're trying to build
specifically you'd need
any_successful
,
any_failed
o

Oscar Krantz

06/17/2022, 1:40 PM
thanks for the quick response. I tried with any_failed for my t_fail() task, but when the task succeeds it reports a TriggerFailed which causes the flow to report failure as well. Maybe I'm misunderstanding the use
k

Kevin Kho

06/17/2022, 1:43 PM
The flow state is dependent on the reference task so maybe you can define it to the the final one
o

Oscar Krantz

06/17/2022, 1:45 PM
Ah I see. With this flow what would be an appropriate trigger for the final task? I would need it to trigger when t3 was successful and for t1, t2 when either the error handler or the task was successful (t_fail, t_success). I've set all the boxes which connect to final_task() as upstream_tasks right now
k

Kevin Kho

06/17/2022, 1:59 PM
It’s very hard to do stuff like “if a succeeds but b fails” with the existing triggers. I am not 100% sure, but you might have an easier time using a state handler for the failure logic?
o

Oscar Krantz

06/17/2022, 2:11 PM
Can I use state handlers to launch a new task in response to failures? Something like
def state_handler(...):
    if task.failed:
        new = other_task(...)
        return new.state
    else:
        continue on
k

Kevin Kho

06/17/2022, 2:29 PM
That you can’t do. Like you can do
task.run()
but it’s just normal Python. It’s not a task with retries
o

Oscar Krantz

06/17/2022, 2:47 PM
Hm okay. That might present a trouble for us using prefect then. Typically we'd need this type of flow for external file deliveries. The first operations here would be polling for files, and if not found then we implement some error handler which can grab an old file or such. Some files, if not found, should terminate the flow (t3 here). The final operation (a database load for instance) should only commence if all files were either found or the error handlers successfully grabbed an old file, and this needs to be transparent in the UI
k

Kevin Kho

06/17/2022, 2:52 PM
Let me think for a bit
I think you need an additional task like this:
from prefect import task, Flow
import prefect
from prefect.triggers import any_failed, any_successful

@task
def task_one(x):
    if x == False:
        raise ValueError
    return x

@task(trigger=any_successful)
def task_one_success():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK ONE SUCCESS")
    return 1

@task(trigger=any_failed)
def task_one_failed():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK ONE FAILED")
    return 2

@task
def task_two(x):
    if x == False:
        raise ValueError
    return x

@task(trigger=any_successful)
def task_two_success():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK TWO SUCCESS")
    return 1

@task(trigger=any_failed)
def task_two_failed():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK TWO FAILED")
    return 2

@task(trigger=any_successful)
def merge():
    return True

@task()
def final_task():
    <http://prefect.context.logger.info|prefect.context.logger.info>("FINAL TASK")
    return 1

with Flow("..") as flow:
    a = task_one(False)
    a_s = task_one_success(upstream_tasks=[a])
    a_f = task_one_failed(upstream_tasks=[a])

    b = task_two(True)
    b_s = task_two_success(upstream_tasks=[b])
    b_f = task_two_failed(upstream_tasks=[b])

    a_res = merge(upstream_tasks=[a_s, a_f])
    b_res = merge(upstream_tasks=[b_s, b_f])
    final_task(upstream_tasks=[a_res, b_res])

flow.run()
o

Oscar Krantz

06/17/2022, 3:59 PM
That seems to work very well. Nice thinking with the merge task! I'll try a solution using
case
as well. Thanks :)
🙌 1