Hi. Beginner question here for prefect 1.0. How wo...
# prefect-community
o
Hi. Beginner question here for prefect 1.0. How would you advise the following type of pattern be implemented? I will have a number of tasks, some of which should have error handling tasks and the final task should only be invoked if the error handlers have either a) not been triggered, or b) succeeded in correcting the error. I've attached a schematic where green arrows represent successes and red failures. If there is no error handler attached to a task then a failure should fail the flow as well
1
a
thanks for the image, it helps a lot understanding what you're trying to build
specifically you'd need
any_successful
,
any_failed
o
thanks for the quick response. I tried with any_failed for my t_fail() task, but when the task succeeds it reports a TriggerFailed which causes the flow to report failure as well. Maybe I'm misunderstanding the use
k
The flow state is dependent on the reference task so maybe you can define it to the the final one
o
Ah I see. With this flow what would be an appropriate trigger for the final task? I would need it to trigger when t3 was successful and for t1, t2 when either the error handler or the task was successful (t_fail, t_success). I've set all the boxes which connect to final_task() as upstream_tasks right now
k
It’s very hard to do stuff like “if a succeeds but b fails” with the existing triggers. I am not 100% sure, but you might have an easier time using a state handler for the failure logic?
o
Can I use state handlers to launch a new task in response to failures? Something like
Copy code
def state_handler(...):
    if task.failed:
        new = other_task(...)
        return new.state
    else:
        continue on
k
That you can’t do. Like you can do
task.run()
but it’s just normal Python. It’s not a task with retries
o
Hm okay. That might present a trouble for us using prefect then. Typically we'd need this type of flow for external file deliveries. The first operations here would be polling for files, and if not found then we implement some error handler which can grab an old file or such. Some files, if not found, should terminate the flow (t3 here). The final operation (a database load for instance) should only commence if all files were either found or the error handlers successfully grabbed an old file, and this needs to be transparent in the UI
k
Let me think for a bit
I think you need an additional task like this:
Copy code
from prefect import task, Flow
import prefect
from prefect.triggers import any_failed, any_successful

@task
def task_one(x):
    if x == False:
        raise ValueError
    return x

@task(trigger=any_successful)
def task_one_success():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK ONE SUCCESS")
    return 1

@task(trigger=any_failed)
def task_one_failed():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK ONE FAILED")
    return 2

@task
def task_two(x):
    if x == False:
        raise ValueError
    return x

@task(trigger=any_successful)
def task_two_success():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK TWO SUCCESS")
    return 1

@task(trigger=any_failed)
def task_two_failed():
    <http://prefect.context.logger.info|prefect.context.logger.info>("TASK TWO FAILED")
    return 2

@task(trigger=any_successful)
def merge():
    return True

@task()
def final_task():
    <http://prefect.context.logger.info|prefect.context.logger.info>("FINAL TASK")
    return 1

with Flow("..") as flow:
    a = task_one(False)
    a_s = task_one_success(upstream_tasks=[a])
    a_f = task_one_failed(upstream_tasks=[a])

    b = task_two(True)
    b_s = task_two_success(upstream_tasks=[b])
    b_f = task_two_failed(upstream_tasks=[b])

    a_res = merge(upstream_tasks=[a_s, a_f])
    b_res = merge(upstream_tasks=[b_s, b_f])
    final_task(upstream_tasks=[a_res, b_res])

flow.run()
o
That seems to work very well. Nice thinking with the merge task! I'll try a solution using
case
as well. Thanks :)
🙌 1