I’m struggling to wrap my head around how state si...
# prefect-community
a
I’m struggling to wrap my head around how state signals interact with the
retry_delay
attribute of
Task
objects. Based on the description of state signals here, I’d expect to be able to raise
signals.RETRY()
and have the task wait
retry_delay
before the task is run again. However, the retries appear to be being carried out immediately. Can someone explain where I’m going wrong in the
wait_until_success
task in the trailing code snippet?
Copy code
from datetime import timedelta
from time import perf_counter
from typing import Callable

from prefect import task, Flow
from prefect.engine import signals

def get_status_func() -> Callable[[], str]:
    """Returned function mocks an API call that returns 'Running'
    for the first three calls, and then returns 'Succeeded' for
    every subsequent call.
    """
    n_calls = 0
    def get_status() -> str:
        nonlocal n_calls
        n_calls += 1
        if n_calls < 3:
            return "Running"
        else:
            return "Succeeded"
    return get_status


get_status = get_status_func()


@task(max_retries=10, retry_delay=timedelta(seconds=5))
def wait_until_success() -> None:
    status = get_status()
    if status == "Succeeded":
        return
    else:
        raise signals.RETRY()


with Flow("test-delay") as flow:
    wait_until_success()


if __name__ == '__main__':
    start_time = perf_counter()
    flow.run()
    end_time = perf_counter()
    print(f"Run time (s): {end_time - start_time}")
I’d expect minimum run time to be 15 seconds (3 retries with 5 second
retry_delay
) but the run is complete in under 1 second
Copy code
$ python delay.py
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-delay'
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Starting task run...
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | RETRY signal raised: RETRY(None)
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Finished task run for task with final state: 'Retrying'
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-delay'
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Starting task run...
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | RETRY signal raised: RETRY(None)
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Finished task run for task with final state: 'Retrying'
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-delay'
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Starting task run...
[2022-03-17 16:42:17+0000] INFO - prefect.TaskRunner | Task 'wait_until_success': Finished task run for task with final state: 'Success'
[2022-03-17 16:42:17+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Run time (s): 0.187956337002106
k
If you
raise FAIL
, it will respect the retry delay because that is a delay before getting queued in the retry state I think. If you
raise RETRY
, you bypass that.
a
Right, so we replace
signal.RETRY()
with
signal.FAIL()
in cases where something has gone wrong, but we want to retry. Is there a way to bypass the retry step and really fail if something that we know is irrecoverable has happened?
A custom
state_handler
perhaps?
k
You can use the
ENDRUN
signal for that. ENDRUN is on a task level, not Flow level
a
Ah, perfect. Thank you!
👍 1