https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Alex Prokop

03/28/2022, 8:39 PM
Hi there, I'm trying to implement some custom logic to handle retries. When a task times out or fails, I'd like to be able to spawn new tasks using the arguments passed to the original task. And if those tasks succeed, I'd like to be able to change the original task's state from failed to successful. Anyone have any advice? I've read the docs on state handlers and played around with it and haven't quite found what I'm looking for. Sample code here:
k

Kevin Kho

03/28/2022, 8:49 PM
Hey @Alex Prokop, could you move the code to the thread when you get the chance? You can’t spawn a new task in a sense that it gets added to the Flow because there is a pre-defined DAG you can’t deviate from. For this specifically, you don’t have access to the parameter inputs during the state handler, unless you explicitly pass it somehow like you make your own error:
Copy code
class NewError(ValueError):
    def __init__(self, message, input):
        self.input = input
        super().__init__(message)
and then inside the task
Copy code
raise NewError("This task always fails", input)
and then inside the state handler, access it with
Copy code
new_state.result.input
But I think the problem might be framed wrong here. Did you consider making a task that triggers when the upstream tasks fail?
a

Alex Prokop

03/28/2022, 8:50 PM
Thanks for the reminder, sorry! Code here:
Copy code
def retry_fn(task, state):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Retry function triggered:")
    <http://logger.info|logger.info>(state)
    if isinstance(old_state, state.Failed):
        # I want to access the parameters passed to the failing task ("other_context"). How can I do that?
        task_that_fails_if_passed_true.run(False, [1])
        task_that_fails_if_passed_true.run(False, [2])
        task_that_fails_if_passed_true.run(False, [3])
    # I want to be able to change the task state from failed to successful here. How can I do that?
    return state.Success()

@task(on_failure=retry_fn)
def task_that_fails_if_passed_true(val, other_context):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Task that fails if passed true running...")

    if val == True:
        <http://logger.info|logger.info>("Fail")
        raise ValueError("This task always fails")
    else:
        <http://logger.info|logger.info>("Pass")

@task(timeout=5, on_failure=retry_fn)
def task_that_times_out(val, other_context):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Task that times out running...")
    i = 0
    while i < 10:
        2 + 2

with Flow("test_retry_logic", executor=LocalExecutor()) as flow:
    task_that_fails_if_passed_true(True, [1,2,3])
    task_that_times_out(True, [1,2,3])

    
flow.register(project_name="MyProject")
But I think the problem might be framed wrong here. Did you consider making a task that triggers when the upstream tasks fail?
No, I didn't consider that. Are you saying that this task is defined in the flow, but is only triggered if the upstream task fails?
Er, I just restated what you said, but it was for my own comprehension. I will look into that, that makes sense about the DAG!
k

Kevin Kho

03/28/2022, 8:54 PM
Yes exactly. A task with trigger any_failed
a

Alex Prokop

03/28/2022, 8:54 PM
That sounds closer to what I actually need, thank you.
6 Views