Thomas Hoeck
02/10/2022, 3:34 PMKevin Kho
Thomas Hoeck
02/10/2022, 3:44 PMKevin Kho
Thomas Hoeck
02/10/2022, 4:12 PMfrom prefect import Flow, task
import prefect
from datetime import timedelta
retry_delay = timedelta(seconds=2)
@task
def task1():
return 1
@task(max_retries=3, retry_delay=retry_delay)
def task2(input):
logger = prefect.context.get("logger")
retry_count = prefect.context.get("task_run_count")
<http://logger.info|logger.info>(f"Input: {input}")
<http://logger.info|logger.info>(f"Retry count: {retry_count}")
if retry_count < 2:
raise Exception
with Flow("myflow") as flow:
result1 = task1()
task2(result1)
If you use the following instead, task2 will fail first run, then be put in Pending state. The flow will be resubmitted to be run later.
retry_delay = timedelta(minutes=15)
If you check the logs in the cloud logging you can see a new kuberenetes job is created.Kevin Kho
Thomas Hoeck
02/10/2022, 5:13 PMKevin Kho