Alex Prokop
03/29/2022, 3:14 PMAlex Prokop
03/29/2022, 3:14 PM@task()
def get_data_batches():
return [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]
@task()
def task_that_fails(input_array):
raise ValueError('This task always fails.')
@task(trigger=any_failed)
def recovery_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Retry task running...")
<http://logger.info|logger.info>(state)
# Can I access the input of the original task here to help with the recovery work?
return True
with Flow("test_recovery_flow", executor=LocalExecutor()) as flow:
data_batches = get_data_batches()
failing_tasks = task_that_fails.map(input_array=data_batches)
# How do I run a recovery task for *each* failing task (it seems like .map does not work on the recovery_task)
# and capture the input of the original failing task and pass it to the recovery task?
recovery_task(upstream_tasks=failing_tasks)
Kevin Kho
Kevin Kho
from prefect import Flow, task
from prefect.triggers import any_failed, all_successful
import prefect
@task
def get_inputs():
return [1,2,3,4,5]
@task
def get_other_inputs():
return ["a","b","c","d","e"]
@task
def add_one(x):
if x == 2 or x == 3:
raise ValueError()
return x+1
@task(trigger=all_successful)
def success_task(x, letter):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Succeeded for value {x-1}-{letter}")
return
@task(trigger=any_failed)
def failure_task(x, letter):
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Failed for value {x}-{letter}")
return
with Flow("test") as flow:
numbers = get_inputs()
letters = get_other_inputs()
added = add_one.map(numbers) # two value errors
s = success_task.map(added, letters)
f = failure_task.map(numbers, letters, upstream_tasks=[added]) # i used original input here
flow.run()
Alex Prokop
03/29/2022, 3:28 PMAlex Prokop
03/29/2022, 3:28 PMKevin Kho