https://prefect.io logo
Title
a

Adi Gandra

02/27/2022, 4:55 PM
Hey, i was trying to figure out is it possible for me to setup a task that if it fails - I can have it retry 3 times (i figured this part out). But, if it still fails I want to kick off another task so the rest of the flow can continue. So basically i’m trying to swap my task if it fails to another task as a backup, so my entire flow can continue on
a

Anna Geller

02/27/2022, 5:27 PM
That’s an interesting question. Here is an example using state handler for that use case - here, after 3 unsuccessful task runs, we’re sending a Slack message as part of a 3rd retry - you can replace it with any action you would like to take:
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.tasks.notifications import SlackTask


def take_action_after_threee_failed_task_runs(task, old_state, new_state):
    # allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
    if new_state.is_retrying() and new_state.run_count == 3:
        # take some action
        SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
    return new_state


@task(
    max_retries=3,
    retry_delay=timedelta(seconds=1),
    state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
    logger = prefect.context.get("logger")
    run_count = prefect.context.get("task_run_count")
    <http://logger.info|logger.info>("%s. TaskRun", run_count)
    raise Exception("Failing in order to test retries...")


with Flow("retry-tester") as flow:
    retry_test()

if __name__ == "__main__":
    flow.run()
The output when running this:
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:02+0100] INFO - prefect.retry_test | 1. TaskRun
[2022-02-27 18:27:02+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:02+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:03.683739+00:00
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:03+0100] INFO - prefect.retry_test | 2. TaskRun
[2022-02-27 18:27:03+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:03+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:04.732207+00:00
[2022-02-27 18:27:04+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:04+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:04+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 18:27:04+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:05+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:05.763641+00:00
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:05+0100] INFO - prefect.retry_test | 4. TaskRun
[2022-02-27 18:27:05+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Failed'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
a

Adi Gandra

02/27/2022, 6:06 PM
But, wouldn’t the flow fail because the task failed?
How can i have the rest of the flow continue on after the state handler takes care of it
can i manipulate the new state that is returned?
a

Anna Geller

02/27/2022, 6:36 PM
Absolutely, you can just return any state you want
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.engine.state import Success
from prefect.tasks.notifications import SlackTask


def take_action_after_threee_failed_task_runs(task, old_state, new_state):
    # allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
    if new_state.is_retrying() and new_state.run_count == 3:
        # take some action
        SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
        return Success(message="We now consider this task run successful")
    return new_state


@task(
    max_retries=3,
    retry_delay=timedelta(seconds=1),
    state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
    logger = prefect.context.get("logger")
    run_count = prefect.context.get("task_run_count")
    <http://logger.info|logger.info>("%s. TaskRun", run_count)
    raise Exception("Failing in order to test retries...")


with Flow("retry-tester") as flow:
    retry_test()

if __name__ == "__main__":
    flow.run()
[2022-02-27 19:35:59+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 19:35:59+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 26, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 19:35:59+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Success'
[2022-02-27 19:35:59+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
a

Adi Gandra

02/27/2022, 8:41 PM
Perfect, this makes sense! One last question, if i’m in is_retrying and new_state.run_count == 3 block, does that mean the task is running again?
or does returning success mean the task won’t run again
a

Anna Geller

02/27/2022, 8:45 PM
you're correct, returning success means it won't run again
👍 1
a

Adi Gandra

02/27/2022, 8:52 PM
Thanks!