I'm working on a Flow which calls a rate-limited A...
# prefect-community
s
I'm working on a Flow which calls a rate-limited API, which will occasionally through a 429 exception. Within the 429 exception, it will be explicit in how long it wants the client to wait until it reties, but we already have the retry logic specified in the
@task
decorator. So it gets stuck in a perpetual loop, because the server wants the client to wait longer than what was specified in the
@task
decorator. The Task Concurrency Limiting feature would reduce the frequency this happens, but would not catch all 429 exceptions. Sometimes this specific server gets overloaded by other traffic, and will dynamically rate-limit all traffic until it has scaled up to handle the additional traffic. I'm guessing that I'd need to write some custom retry logic within the
task
to handle the 429 exceptions, but curious if anyone else has a way to pipe the
Retry-After
from a 429 into the prefect engine's
retry_delay
parameter for similar rate-limited API calls?
e
I did a quick check, and modifying self.retry_delay in a task worked:
Copy code
class DelayTask(Task):
    def run(self):
        print(self.retry_delay)
        self.retry_delay = datetime.timedelta(minutes=10)
        raise ValueError(self.retry_delay)

with Flow("delay_example") as flow:
    DelayTask(retry_delay=datetime.timedelta(seconds=10), max_retries=3)()

flow.run()
Next available task run had to wait 10 minutes, which is dynamically set in task.run()
I’m shocked that it works though 😅
s
I was using the
@task
decorator, instead of the class approach .. but if the class approach works, it's easy enough to modify! Thanks emre!