Alex Prokop
03/28/2022, 8:39 PMKevin Kho
class NewError(ValueError):
def __init__(self, message, input):
self.input = input
super().__init__(message)
and then inside the task
raise NewError("This task always fails", input)
and then inside the state handler, access it with
new_state.result.input
Alex Prokop
03/28/2022, 8:50 PMdef retry_fn(task, state):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Retry function triggered:")
<http://logger.info|logger.info>(state)
if isinstance(old_state, state.Failed):
# I want to access the parameters passed to the failing task ("other_context"). How can I do that?
task_that_fails_if_passed_true.run(False, [1])
task_that_fails_if_passed_true.run(False, [2])
task_that_fails_if_passed_true.run(False, [3])
# I want to be able to change the task state from failed to successful here. How can I do that?
return state.Success()
@task(on_failure=retry_fn)
def task_that_fails_if_passed_true(val, other_context):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Task that fails if passed true running...")
if val == True:
<http://logger.info|logger.info>("Fail")
raise ValueError("This task always fails")
else:
<http://logger.info|logger.info>("Pass")
@task(timeout=5, on_failure=retry_fn)
def task_that_times_out(val, other_context):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Task that times out running...")
i = 0
while i < 10:
2 + 2
with Flow("test_retry_logic", executor=LocalExecutor()) as flow:
task_that_fails_if_passed_true(True, [1,2,3])
task_that_times_out(True, [1,2,3])
flow.register(project_name="MyProject")
But I think the problem might be framed wrong here. Did you consider making a task that triggers when the upstream tasks fail?
Kevin Kho
Alex Prokop
03/28/2022, 8:54 PM