https://prefect.io logo
Title
a

Alex Prokop

03/29/2022, 3:14 PM
Still learning Prefect concepts here and would love some advice. I'm trying to create a flow where I map a bunch of tasks. then when some of them fail, run a recovery task for each failing task, passing it the input of the original task to do an alternative action. At first it seemed like employing state handlers would help, then Kevin pointed me towards using the "any_failed" trigger instead, but it seems like that is built for a scenario where you have X tasks and you want to run 1 recovery task- but I have X tasks and want to run Y recovery tasks (somewhat less than X). It seems from the docs that prefect caches the inputs to a task, but I don't see how to access those inputs in a subsequent task. Is this possible? Thanks!
Sample code:
@task()
def get_data_batches():
    return [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]

@task()
def task_that_fails(input_array):
    raise ValueError('This task always fails.')

@task(trigger=any_failed)
def recovery_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Retry task running...")
    <http://logger.info|logger.info>(state)
    # Can I access the input of the original task here to help with the recovery work?
    return True

with Flow("test_recovery_flow", executor=LocalExecutor()) as flow:
    data_batches = get_data_batches()
    failing_tasks = task_that_fails.map(input_array=data_batches)
    # How do I run a recovery task for *each* failing task (it seems like .map does not work on the recovery_task)
    # and capture the input of the original failing task and pass it to the recovery task?
    recovery_task(upstream_tasks=failing_tasks)
k

Kevin Kho

03/29/2022, 3:15 PM
I don’t think it’s meant for the scenario where you have X tasks and you want to run 1 recovery task. Let me make a quick example
👍 1
Try this and see the logs:
from prefect import Flow, task
from prefect.triggers import any_failed, all_successful
import prefect

@task
def get_inputs():
    return [1,2,3,4,5]

@task
def get_other_inputs():
    return ["a","b","c","d","e"]

@task
def add_one(x):
    if x == 2 or x == 3:
        raise ValueError()
    return x+1

@task(trigger=all_successful)
def success_task(x, letter):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Succeeded for value {x-1}-{letter}")
    return

@task(trigger=any_failed)
def failure_task(x, letter):
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Failed for value {x}-{letter}")
    return

with Flow("test") as flow:
    numbers = get_inputs()
    letters = get_other_inputs()
    added = add_one.map(numbers)    # two value errors
    s = success_task.map(added, letters)
    f = failure_task.map(numbers, letters, upstream_tasks=[added]) # i used original input here

flow.run()
a

Alex Prokop

03/29/2022, 3:28 PM
Oh wow - I think I see what I was doing wrong. The map function is smarter than I thought, I just have to pass it all of the original inputs AND the failed tasks, it looks like. Thanks!
This should make things very easy, appreciate it.
k

Kevin Kho

03/29/2022, 3:30 PM
Of course!