Hey, i was trying to figure out is it possible for...
# ask-community
a
Hey, i was trying to figure out is it possible for me to setup a task that if it fails - I can have it retry 3 times (i figured this part out). But, if it still fails I want to kick off another task so the rest of the flow can continue. So basically i’m trying to swap my task if it fails to another task as a backup, so my entire flow can continue on
a
That’s an interesting question. Here is an example using state handler for that use case - here, after 3 unsuccessful task runs, we’re sending a Slack message as part of a 3rd retry - you can replace it with any action you would like to take:
Copy code
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.tasks.notifications import SlackTask


def take_action_after_threee_failed_task_runs(task, old_state, new_state):
    # allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
    if new_state.is_retrying() and new_state.run_count == 3:
        # take some action
        SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
    return new_state


@task(
    max_retries=3,
    retry_delay=timedelta(seconds=1),
    state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
    logger = prefect.context.get("logger")
    run_count = prefect.context.get("task_run_count")
    <http://logger.info|logger.info>("%s. TaskRun", run_count)
    raise Exception("Failing in order to test retries...")


with Flow("retry-tester") as flow:
    retry_test()

if __name__ == "__main__":
    flow.run()
The output when running this:
Copy code
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:02+0100] INFO - prefect.retry_test | 1. TaskRun
[2022-02-27 18:27:02+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:02+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:03.683739+00:00
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:03+0100] INFO - prefect.retry_test | 2. TaskRun
[2022-02-27 18:27:03+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:03+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:04.732207+00:00
[2022-02-27 18:27:04+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:04+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:04+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 18:27:04+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:05+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:05.763641+00:00
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:05+0100] INFO - prefect.retry_test | 4. TaskRun
[2022-02-27 18:27:05+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Failed'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
a
But, wouldn’t the flow fail because the task failed?
How can i have the rest of the flow continue on after the state handler takes care of it
can i manipulate the new state that is returned?
a
Absolutely, you can just return any state you want
Copy code
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.engine.state import Success
from prefect.tasks.notifications import SlackTask


def take_action_after_threee_failed_task_runs(task, old_state, new_state):
    # allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
    if new_state.is_retrying() and new_state.run_count == 3:
        # take some action
        SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
        return Success(message="We now consider this task run successful")
    return new_state


@task(
    max_retries=3,
    retry_delay=timedelta(seconds=1),
    state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
    logger = prefect.context.get("logger")
    run_count = prefect.context.get("task_run_count")
    <http://logger.info|logger.info>("%s. TaskRun", run_count)
    raise Exception("Failing in order to test retries...")


with Flow("retry-tester") as flow:
    retry_test()

if __name__ == "__main__":
    flow.run()
Copy code
[2022-02-27 19:35:59+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 19:35:59+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 26, in retry_test
    raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 19:35:59+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Success'
[2022-02-27 19:35:59+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
a
Perfect, this makes sense! One last question, if i’m in is_retrying and new_state.run_count == 3 block, does that mean the task is running again?
or does returning success mean the task won’t run again
a
you're correct, returning success means it won't run again
👍 1
a
Thanks!