Adi Gandra
02/27/2022, 4:55 PMAnna Geller
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.tasks.notifications import SlackTask
def take_action_after_threee_failed_task_runs(task, old_state, new_state):
# allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
if new_state.is_retrying() and new_state.run_count == 3:
# take some action
SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
return new_state
@task(
max_retries=3,
retry_delay=timedelta(seconds=1),
state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
logger = prefect.context.get("logger")
run_count = prefect.context.get("task_run_count")
<http://logger.info|logger.info>("%s. TaskRun", run_count)
raise Exception("Failing in order to test retries...")
with Flow("retry-tester") as flow:
retry_test()
if __name__ == "__main__":
flow.run()
The output when running this:
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:02+0100] INFO - prefect.retry_test | 1. TaskRun
[2022-02-27 18:27:02+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:02+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:03.683739+00:00
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:03+0100] INFO - prefect.retry_test | 2. TaskRun
[2022-02-27 18:27:03+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:03+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:04.732207+00:00
[2022-02-27 18:27:04+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:04+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:04+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 18:27:04+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:05+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:05.763641+00:00
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:05+0100] INFO - prefect.retry_test | 4. TaskRun
[2022-02-27 18:27:05+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Failed'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Adi Gandra
02/27/2022, 6:06 PMAdi Gandra
02/27/2022, 6:07 PMAdi Gandra
02/27/2022, 6:07 PMAnna Geller
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.engine.state import Success
from prefect.tasks.notifications import SlackTask
def take_action_after_threee_failed_task_runs(task, old_state, new_state):
# allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
if new_state.is_retrying() and new_state.run_count == 3:
# take some action
SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
return Success(message="We now consider this task run successful")
return new_state
@task(
max_retries=3,
retry_delay=timedelta(seconds=1),
state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
logger = prefect.context.get("logger")
run_count = prefect.context.get("task_run_count")
<http://logger.info|logger.info>("%s. TaskRun", run_count)
raise Exception("Failing in order to test retries...")
with Flow("retry-tester") as flow:
retry_test()
if __name__ == "__main__":
flow.run()
Anna Geller
[2022-02-27 19:35:59+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 19:35:59+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 26, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 19:35:59+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Success'
[2022-02-27 19:35:59+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Adi Gandra
02/27/2022, 8:41 PMAdi Gandra
02/27/2022, 8:41 PMAnna Geller
Adi Gandra
02/27/2022, 8:52 PM