hey, I would like to to a talk about prefect (on a...
# ask-community
f
hey, I would like to to a talk about prefect (on a French discord) but I have issues with retries delay the
retry_delay_seconds
args is ignored in 3rc1 but also in latest stable
Copy code
from typing import Annotated
import random

from prefect import flow, get_run_logger, task
from pydantic import StringConstraints

DomainName = Annotated[
    str,
    StringConstraints(
        pattern=r"^(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$"
    ),
]


@task(retries=20, retry_delay_seconds=5)
def buy_domain(domain: DomainName):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Buying domain {domain}...")

    # some_external_service_buy_domain(domain)
    # in a real example, this would be a call to a domain registrar API

    if random.randint(1, 100) > 98:  # simulate a 2% failure rate
        raise ValueError("Failed to buy domain :(")

    <http://logger.info|logger.info>(f"Bought domain {domain}!")


@task(retries=300, retry_delay_seconds=15)
def wait_for_delivery(domain: DomainName):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Waiting for domain {domain} to be available...")

    if random.randint(1, 100) > 5:  # simulate a 95% failure rate
        <http://logger.info|logger.info>(f"Domain {domain} is not yet available...")
        raise ValueError("Domain not available ...")


@task(retries=5, retry_delay_seconds=10)
def configure_dns(domain: DomainName):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Configuring DNS for domain {domain}...")

    # in a real example, this would be a call to a DNS provider API
    # some_external_service_configure_dns(domain)

    if random.randint(1, 100) > 95:  # simulate a 5% failure rate
        raise ValueError("Failed to configure DNS :(")

    <http://logger.info|logger.info>(f"DNS configured for domain {domain}!")

@task(retries=5, retry_delay_seconds=10)
def configure_reverse_proxy(domain: DomainName):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Configuring reverse proxy for domain {domain}...")

    # configure_nginx_or_caddy_or_traefik_or_whatever_technology(domain)
    if random.randint(1, 100) > 99:  # simulate a 1% failure rate
        raise ValueError("Failed to configure reverse proxy :(")


@task(retries=5, retry_delay_seconds=10)
def configure_ssl(domain: DomainName):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Configuring SSL for domain {domain}...")

    if random.randint(1, 100) > 95:  # simulate a 5% failure rate
        raise ValueError("Failed to configure SSL :(")


@flow
def buy_domain_flow(domain: DomainName):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Buying domain {domain}...")

    buy_domain(domain)
    wait_for_delivery(domain)
    configure_dns(domain)
    configure_reverse_proxy(domain)
    configure_ssl(domain)

    <http://logger.info|logger.info>(f"Domain {domain} is ready to use!")


if __name__ == "__main__":
    buy_domain_flow('<http://example.com|example.com>')
as example here the tasks are spammed
Copy code
10:50:55.227 | INFO    | Flow run 'auspicious-goat' - Buying domain example.com...
10:50:55.541 | INFO    | prefect.engine - Created task run 'buy_domain-0' for task 'buy_domain'
10:50:55.638 | INFO    | Task run 'buy_domain-0' - Buying domain example.com...
10:50:55.639 | INFO    | Task run 'buy_domain-0' - Bought domain <http://example.com|example.com>!
10:50:55.672 | INFO    | Task run 'buy_domain-0' - Finished in state Completed()
10:50:55.705 | INFO    | prefect.engine - Created task run 'wait_for_delivery-0' for task 'wait_for_delivery'
10:50:55.769 | INFO    | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.770 | INFO    | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.812 | INFO    | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.813 | INFO    | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.852 | INFO    | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.853 | INFO    | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.891 | INFO    | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.892 | INFO    | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.931 | INFO    | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.931 | INFO    | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.970 | INFO    | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
another example :
Copy code
from prefect import flow, task


@task(retries=5, retry_delay_seconds=5000000)
def foo():
    import time
    print(time.time())
    raise ValueError("foo")


@flow
def get_data_flow():
    foo()

if __name__ == "__main__":
    get_data_flow()
Copy code
rye run python .\test.py
11:09:20.684 | INFO    | prefect.engine - Created flow run 'pistachio-cheetah' for flow 'get-data-flow'
11:09:20.686 | INFO    | prefect.engine - View at <http://localhost:4200/flow-runs/flow-run/4a6c1c36-2ade-4088-a2f5-b61fff2e2ca2>
11:09:21.043 | INFO    | prefect.engine - Created task run 'foo-0' for task 'foo'
1717492161.1064706
1717492161.1481755
1717492161.1873064
1717492161.2281282
11:09:21.256 | ERROR   | Task run 'foo-0' - Finished in state Failed('Task run encountered an exception ValueError: foo')
11:09:21.257 | ERROR   | Flow run 'pistachio-cheetah' - Encountered exception during execution:
Traceback (most recent call last):
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flow_engine.py", line 654, in run_flow_sync
    result = cast(R, flow.fn(*call_args, **call_kwargs))  # type: ignore
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\test.py", line 13, in get_data_flow
    foo()
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\tasks.py", line 713, in __call__
    return run_task(
           ^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 715, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 598, in run_task_sync
    return run.result()
           ^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 335, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\client\schemas\objects.py", line 248, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\states.py", line 61, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 363, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 240, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\.rye\py\cpython@3.12.2\install\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 223, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 353, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\states.py", line 81, in _get_state_result
    raise await get_state_exception(state)
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 580, in run_task_sync
    result = task.fn(*call_args, **call_kwargs)  # type: ignore
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\test.py", line 8, in foo
    raise ValueError("foo")
ValueError: foo
11:09:21.295 | ERROR   | Flow run 'pistachio-cheetah' - Finished in state Failed('Flow run encountered an exception: ValueError: foo')
Traceback (most recent call last):
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\test.py", line 16, in <module>
    get_data_flow()
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flows.py", line 1256, in __call__
    return run_flow_sync(**run_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flow_engine.py", line 670, in run_flow_sync
    return run.result()
           ^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flow_engine.py", line 197, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)  # type: ignore
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\client\schemas\objects.py", line 248, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\states.py", line 61, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 363, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 240, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\admin\.rye\py\cpython@3.12.2\install\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 223, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 353, in ctx_call
    result = await async_fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[truncated du to slack limit]
I'm out of idea
Copy code
from datetime import datetime
from prefect import flow, task


@task(retries=5, retry_delay_seconds=30, log_prints=True)
def task_foo():
    print(datetime.now())
    raise ValueError("foo")


@flow(retries=5, retry_delay_seconds=30, log_prints=True)
def get_data_flow():
    print(datetime.now())
    task_foo()
    # foo.submit().wait()

if __name__ == "__main__":
    get_data_flow()
=> flow run are delayed and retried with correct delay but not task
c
Thank you @flapili -- this does appear to be a bug! We'll resolve for the next rc release
f
I would like to help but I have no clue how to do it
I have issue due to get_result (I setted the state to await retry)
@Chris White I believe since v2 task delay was never impl ?
did not found any impl in git histories
c
thank you, no worries we can fix this and cut a release this week - I'll let you know when we've done so. I don't think I udnerstand your follow up questions, could you ask them again?
f
Copy code
def handle_retry(self, exc: Exception) -> bool:
        """
        If the task has retries left, and the retry condition is met, set the task to retrying.
        - If the task has no retries left, or the retry condition is not met, return False.
        - If the task has retries left, and the retry condition is met, return True.
        """
        if self.retries < self.task.retries and self.can_retry:
            if self.task.retry_delay_seconds:
                self.set_state(AwaitingRetry(scheduled_time=pendulum.now("UTC").add(seconds=self.task.retry_delay_seconds)))
            else:
                self.set_state(Retrying(), force=True)
            self.retries = self.retries + 1
            return True
        return False
I monkey patched like that
but get_result need to be patched too
probably Future.get_result
c
yea we can handle that in a more first class way, we'll cut a release later today and i'll get a fix included
f
(not anymore on the computer)
👍 1
@Chris White I think after digging in the prefect's src I got it to work
it is good in your opinion ?