John Shearer
10/14/2021, 11:07 AMAnna Geller
task_args
if you’re on Prefect Cloud. I have an example when it’s used to set a dynamic number of retries. You could do something similar on retry delays:
"""
First set the key:
from prefect.backend import set_key_value
key_value_uuid = set_key_value(key="variable_nr_retries", value=5)
"""
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.backend import get_key_value
@task(max_retries=2, retry_delay=timedelta(seconds=2))
def retry_test():
logger = prefect.context.get("logger")
run_count = prefect.context.get("task_run_count")
<http://logger.info|logger.info>("%s. TaskRun", run_count)
raise Exception("Failing to test retries...")
with Flow("retry-tester") as flow:
nr_retries = int(get_key_value(key="variable_nr_retries"))
retry_test(task_args=dict(max_retries=nr_retries))
if __name__ == "__main__":
flow.run()
You would then have a task that sets a new integer value for retry delay in KV store. In fact, for your use case, where API with rate limits gives you info about how long do you have to wait till you can make a subsequent request, I think this solution with KV Store is much better and more flexible then exponential backoffs in Airflow, because it allows you to set the EXACT retry delay that you need.John Shearer
10/14/2021, 11:44 AMZanie