Hi folks, I'm working locally with Prefect core, stringing together a bunch of functions that hit twitter's API- with a screen name, get the user object; get the user's followees; get tweets of the user's followees; get urls mentioned in tweets, etc. Each function is task decorated, and since these are fan out operations each step in the flow uses map on the results of the previous step. Since twitter has strict rate limits I want to cache outputs of each task during testing, so am using the cache_for option in the task decorator, with an hour as the duration...and this does not seem to be working.
Every flow run hits the twitter api rather than pulling results from a cache, and I see "cache is now invalid" in the logs:
2019-08-22 19:31:25,037] INFO - prefect.TaskRunner | Task 'auth_name_to_id': Starting task run...
[2019-08-22 19:31:25,038] INFO - prefect.TaskRunner | Task 'auth_name_to_id[1]': Starting task run...
[2019-08-22 19:31:25,038] WARNING - prefect.TaskRunner | Task 'auth_name_to_id[1]': can't use cache because it is now invalid
[2019-08-22 19:31:25,200] INFO - prefect.TaskRunner | Task 'auth_name_to_id[1]': finished task run for task with final state: 'Cached'
[2019-08-22 19:31:25,200] INFO - prefect.TaskRunner | Task 'auth_name_to_id[0]': Starting task run...
[2019-08-22 19:31:25,200] WARNING - prefect.TaskRunner | Task 'auth_name_to_id[0]': can't use cache because it is now invalid
[2019-08-22 19:31:25,358] INFO - prefect.TaskRunner | Task 'auth_name_to_id[0]': finished task run for task with final state: 'Cached'
[2019-08-22 19:31:25,359] INFO - prefect.TaskRunner | Task 'auth_name_to_id': finished task run for task with final state: 'Mapped'
[2019-08-22 19:31:25,359] INFO - prefect.TaskRunner | Task 'auth_to_friend': Starting task run...
[2019-08-22 19:31:25,360] INFO - prefect.TaskRunner | Task 'auth_to_friend[0]': Starting task run...
[2019-08-22 19:31:25,360] WARNING - prefect.TaskRunner | Task 'auth_to_friend[0]': can't use cache because it is now invalid
[2019-08-22 19:31:25,605] INFO - prefect.TaskRunner | Unexpected error: TwitterError([{'message': 'Rate limit exceeded', 'code': 88}])
[2019-08-22 19:31:25,605] INFO - prefect.TaskRunner | Task 'auth_to_friend[1]': finished task run for task with final state: 'Retrying'
....
Should something like this just work?
Without looking too closely at the code, the thought that comes to mind is that the functions themselves take dicts, pulling out particular keys, and return lists of dicts. I don't work a lot in python and don't know how it handles equality tests of dicts- is the use of dicts in the task calls potentially screwing up caching?