flapili
06/04/2024, 8:52 AMretry_delay_seconds
args is ignored in 3rc1 but also in latest stableflapili
06/04/2024, 8:53 AMfrom typing import Annotated
import random
from prefect import flow, get_run_logger, task
from pydantic import StringConstraints
DomainName = Annotated[
str,
StringConstraints(
pattern=r"^(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$"
),
]
@task(retries=20, retry_delay_seconds=5)
def buy_domain(domain: DomainName):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Buying domain {domain}...")
# some_external_service_buy_domain(domain)
# in a real example, this would be a call to a domain registrar API
if random.randint(1, 100) > 98: # simulate a 2% failure rate
raise ValueError("Failed to buy domain :(")
<http://logger.info|logger.info>(f"Bought domain {domain}!")
@task(retries=300, retry_delay_seconds=15)
def wait_for_delivery(domain: DomainName):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Waiting for domain {domain} to be available...")
if random.randint(1, 100) > 5: # simulate a 95% failure rate
<http://logger.info|logger.info>(f"Domain {domain} is not yet available...")
raise ValueError("Domain not available ...")
@task(retries=5, retry_delay_seconds=10)
def configure_dns(domain: DomainName):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Configuring DNS for domain {domain}...")
# in a real example, this would be a call to a DNS provider API
# some_external_service_configure_dns(domain)
if random.randint(1, 100) > 95: # simulate a 5% failure rate
raise ValueError("Failed to configure DNS :(")
<http://logger.info|logger.info>(f"DNS configured for domain {domain}!")
@task(retries=5, retry_delay_seconds=10)
def configure_reverse_proxy(domain: DomainName):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Configuring reverse proxy for domain {domain}...")
# configure_nginx_or_caddy_or_traefik_or_whatever_technology(domain)
if random.randint(1, 100) > 99: # simulate a 1% failure rate
raise ValueError("Failed to configure reverse proxy :(")
@task(retries=5, retry_delay_seconds=10)
def configure_ssl(domain: DomainName):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Configuring SSL for domain {domain}...")
if random.randint(1, 100) > 95: # simulate a 5% failure rate
raise ValueError("Failed to configure SSL :(")
@flow
def buy_domain_flow(domain: DomainName):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Buying domain {domain}...")
buy_domain(domain)
wait_for_delivery(domain)
configure_dns(domain)
configure_reverse_proxy(domain)
configure_ssl(domain)
<http://logger.info|logger.info>(f"Domain {domain} is ready to use!")
if __name__ == "__main__":
buy_domain_flow('<http://example.com|example.com>')
as example here the tasks are spammedflapili
06/04/2024, 8:53 AM10:50:55.227 | INFO | Flow run 'auspicious-goat' - Buying domain example.com...
10:50:55.541 | INFO | prefect.engine - Created task run 'buy_domain-0' for task 'buy_domain'
10:50:55.638 | INFO | Task run 'buy_domain-0' - Buying domain example.com...
10:50:55.639 | INFO | Task run 'buy_domain-0' - Bought domain <http://example.com|example.com>!
10:50:55.672 | INFO | Task run 'buy_domain-0' - Finished in state Completed()
10:50:55.705 | INFO | prefect.engine - Created task run 'wait_for_delivery-0' for task 'wait_for_delivery'
10:50:55.769 | INFO | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.770 | INFO | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.812 | INFO | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.813 | INFO | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.852 | INFO | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.853 | INFO | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.891 | INFO | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.892 | INFO | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.931 | INFO | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
10:50:55.931 | INFO | Task run 'wait_for_delivery-0' - Domain <http://example.com|example.com> is not yet available...
10:50:55.970 | INFO | Task run 'wait_for_delivery-0' - Waiting for domain <http://example.com|example.com> to be available...
flapili
06/04/2024, 9:07 AMflapili
06/04/2024, 9:09 AMfrom prefect import flow, task
@task(retries=5, retry_delay_seconds=5000000)
def foo():
import time
print(time.time())
raise ValueError("foo")
@flow
def get_data_flow():
foo()
if __name__ == "__main__":
get_data_flow()
flapili
06/04/2024, 9:09 AMrye run python .\test.py
11:09:20.684 | INFO | prefect.engine - Created flow run 'pistachio-cheetah' for flow 'get-data-flow'
11:09:20.686 | INFO | prefect.engine - View at <http://localhost:4200/flow-runs/flow-run/4a6c1c36-2ade-4088-a2f5-b61fff2e2ca2>
11:09:21.043 | INFO | prefect.engine - Created task run 'foo-0' for task 'foo'
1717492161.1064706
1717492161.1481755
1717492161.1873064
1717492161.2281282
11:09:21.256 | ERROR | Task run 'foo-0' - Finished in state Failed('Task run encountered an exception ValueError: foo')
11:09:21.257 | ERROR | Flow run 'pistachio-cheetah' - Encountered exception during execution:
Traceback (most recent call last):
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flow_engine.py", line 654, in run_flow_sync
result = cast(R, flow.fn(*call_args, **call_kwargs)) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\test.py", line 13, in get_data_flow
foo()
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\tasks.py", line 713, in __call__
return run_task(
^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 715, in run_task
return run_task_sync(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 598, in run_task_sync
return run.result()
^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 335, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\client\schemas\objects.py", line 248, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\states.py", line 61, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 363, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 240, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\.rye\py\cpython@3.12.2\install\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 223, in coroutine_wrapper
return await task
^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 353, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\states.py", line 81, in _get_state_result
raise await get_state_exception(state)
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\task_engine.py", line 580, in run_task_sync
result = task.fn(*call_args, **call_kwargs) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\test.py", line 8, in foo
raise ValueError("foo")
ValueError: foo
11:09:21.295 | ERROR | Flow run 'pistachio-cheetah' - Finished in state Failed('Flow run encountered an exception: ValueError: foo')
Traceback (most recent call last):
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\test.py", line 16, in <module>
get_data_flow()
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flows.py", line 1256, in __call__
return run_flow_sync(**run_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flow_engine.py", line 670, in run_flow_sync
return run.result()
^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\flow_engine.py", line 197, in result
_result = self.state.result(raise_on_failure=raise_on_failure, fetch=True) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\client\schemas\objects.py", line 248, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\states.py", line 61, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 363, in coroutine_wrapper
return run_coro_as_sync(ctx_call())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 240, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 312, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 182, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\admin\.rye\py\cpython@3.12.2\install\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\_internal\concurrency\calls.py", line 383, in _run_async
result = await coro
^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 223, in coroutine_wrapper
return await task
^^^^^^^^^^
File "C:\Users\admin\Documents\dev\prefect_kube-demo\demo-prefect\.venv\Lib\site-packages\prefect\utilities\asyncutils.py", line 353, in ctx_call
result = await async_fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[truncated du to slack limit]
flapili
06/04/2024, 9:23 AMflapili
06/04/2024, 9:23 AMflapili
06/04/2024, 9:31 AMfrom datetime import datetime
from prefect import flow, task
@task(retries=5, retry_delay_seconds=30, log_prints=True)
def task_foo():
print(datetime.now())
raise ValueError("foo")
@flow(retries=5, retry_delay_seconds=30, log_prints=True)
def get_data_flow():
print(datetime.now())
task_foo()
# foo.submit().wait()
if __name__ == "__main__":
get_data_flow()
=> flow run are delayed and retried with correct delay but not taskChris White
flapili
06/04/2024, 4:22 PMflapili
06/04/2024, 4:23 PMflapili
06/04/2024, 4:24 PMflapili
06/04/2024, 4:24 PMChris White
flapili
06/04/2024, 4:26 PMdef handle_retry(self, exc: Exception) -> bool:
"""
If the task has retries left, and the retry condition is met, set the task to retrying.
- If the task has no retries left, or the retry condition is not met, return False.
- If the task has retries left, and the retry condition is met, return True.
"""
if self.retries < self.task.retries and self.can_retry:
if self.task.retry_delay_seconds:
self.set_state(AwaitingRetry(scheduled_time=pendulum.now("UTC").add(seconds=self.task.retry_delay_seconds)))
else:
self.set_state(Retrying(), force=True)
self.retries = self.retries + 1
return True
return False
flapili
06/04/2024, 4:27 PMflapili
06/04/2024, 4:27 PMflapili
06/04/2024, 4:29 PMChris White
flapili
06/04/2024, 4:29 PMflapili
06/05/2024, 12:32 AMflapili
06/05/2024, 12:34 AMflapili
06/05/2024, 12:35 AM