Andrew Lane
03/17/2022, 5:01 PMretry_delay
attribute of Task
objects. Based on the description of state signals here, I’d expect to be able to raise signals.RETRY()
and have the task wait retry_delay
before the task is run again. However, the retries appear to be being carried out immediately. Can someone explain where I’m going wrong in the wait_until_success
task in the trailing code snippet?from datetime import timedelta
from time import perf_counter
from typing import Callable
from prefect import task, Flow
from prefect.engine import signals
def get_status_func() -> Callable[[], str]:
"""Returned function mocks an API call that returns 'Running'
for the first three calls, and then returns 'Succeeded' for
every subsequent call.
"""
n_calls = 0
def get_status() -> str:
nonlocal n_calls
n_calls += 1
if n_calls < 3:
return "Running"
else:
return "Succeeded"
return get_status
get_status = get_status_func()
@task(max_retries=10, retry_delay=timedelta(seconds=5))
def wait_until_success() -> None:
status = get_status()
if status == "Succeeded":
return
else:
raise signals.RETRY()
with Flow("test-delay") as flow:
wait_until_success()
if __name__ == '__main__':
start_time = perf_counter()
flow.run()
end_time = perf_counter()
print(f"Run time (s): {end_time - start_time}")
retry_delay
) but the run is complete in under 1 second
$ python delay.py
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-delay'
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Starting task run...
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | RETRY signal raised: RETRY(None)
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Finished task run for task with final state: 'Retrying'
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-delay'
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Starting task run...
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | RETRY signal raised: RETRY(None)
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Finished task run for task with final state: 'Retrying'
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-delay'
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Starting task run...
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Finished task run for task with final state: 'Success'
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Run time (s): 0.187956337002106
Kevin Kho
03/17/2022, 5:05 PMraise FAIL
, it will respect the retry delay because that is a delay before getting queued in the retry state I think. If you raise RETRY
, you bypass that.Andrew Lane
03/17/2022, 5:12 PMsignal.RETRY()
with signal.FAIL()
in cases where something has gone wrong, but we want to retry. Is there a way to bypass the retry step and really fail if something that we know is irrecoverable has happened?state_handler
perhaps?Kevin Kho
03/17/2022, 5:22 PMENDRUN
signal for that. ENDRUN is on a task level, not Flow levelAndrew Lane
03/17/2022, 5:25 PM