Nope, not I was thinking of. If you have the following flow, register it in the cloud, and run with a Kubernetes agent you get the expected result, where task2 will fail, the flow will keep running and then will it retry task2 and succed.
from prefect import Flow, task
import prefect
from datetime import timedelta
retry_delay = timedelta(seconds=2)
@task
def task1():
return 1
@task(max_retries=3, retry_delay=retry_delay)
def task2(input):
logger = prefect.context.get("logger")
retry_count = prefect.context.get("task_run_count")
<http://logger.info|logger.info>(f"Input: {input}")
<http://logger.info|logger.info>(f"Retry count: {retry_count}")
if retry_count < 2:
raise Exception
with Flow("myflow") as flow:
result1 = task1()
task2(result1)
If you use the following instead, task2 will fail first run, then be put in Pending state. The flow will be resubmitted to be run later.
retry_delay = timedelta(minutes=15)
If you check the logs in the cloud logging you can see a new kuberenetes job is created.