Hi. I have a flow where I get a token for an api t...
# ask-community
t
Hi. I have a flow where I get a token for an api that goes stale after some known time. Is there a way to indicate that a depending task should be rerun. An example of the flow could me as follows:
Copy code
from prefect import task, Flow, unmapped
import requests

def task_that_goes_stale():
    token = code_to_get_token_at_some_cost()
    return token 

def task_using_token(user,token):
    r = requests.get('<https://api.github.com/user/{user}>', headers = {'TOKEN': token})
    
with Flow("my_flow") as my_flow:
    all_users = [i for i in range(1000)]
    token = task_that_goes_stale()
    task_using_token.map(all_users,unmapped(token))
I know i could just get a new token for each call but this seems like the wrong way to do it. Is there a Prefect way to do it? I can see that there is output caching but as I read it this is meant for bringing results between flows https://docs.prefect.io/core/concepts/persistence.html#output-caching
n
Hi @Thomas Hoeck - there are a few ways you could handle this; I think the easiest would be to turn the token fetching into a class method and check its expiration in your mapped task. Something like this:
Copy code
class TaskUsingToken(Task):
  def get_token()
    token = code_to_get_token()
    return token

  def run(self, user, token):
    if (token.is_stale) token = self.get_token()
    r = requests.get('<https://api.github.com/user/{user}>', headers = {'TOKEN': token})
t
Will this work if running on a dask cluster/local cluster with process? Won't this mean that after the first expiry it will keep calling self.get_token?