https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Jonah Benton

08/22/2019, 8:02 PM
Hi folks, I'm working locally with Prefect core, stringing together a bunch of functions that hit twitter's API- with a screen name, get the user object; get the user's followees; get tweets of the user's followees; get urls mentioned in tweets, etc. Each function is task decorated, and since these are fan out operations each step in the flow uses map on the results of the previous step. Since twitter has strict rate limits I want to cache outputs of each task during testing, so am using the cache_for option in the task decorator, with an hour as the duration...and this does not seem to be working. Every flow run hits the twitter api rather than pulling results from a cache, and I see "cache is now invalid" in the logs: 2019-08-22 193125,037] INFO - prefect.TaskRunner | Task 'auth_name_to_id': Starting task run... [2019-08-22 193125,038] INFO - prefect.TaskRunner | Task 'auth_name_to_id[1]': Starting task run... [2019-08-22 193125,038] WARNING - prefect.TaskRunner | Task 'auth_name_to_id[1]': can't use cache because it is now invalid [2019-08-22 193125,200] INFO - prefect.TaskRunner | Task 'auth_name_to_id[1]': finished task run for task with final state: 'Cached' [2019-08-22 193125,200] INFO - prefect.TaskRunner | Task 'auth_name_to_id[0]': Starting task run... [2019-08-22 193125,200] WARNING - prefect.TaskRunner | Task 'auth_name_to_id[0]': can't use cache because it is now invalid [2019-08-22 193125,358] INFO - prefect.TaskRunner | Task 'auth_name_to_id[0]': finished task run for task with final state: 'Cached' [2019-08-22 193125,359] INFO - prefect.TaskRunner | Task 'auth_name_to_id': finished task run for task with final state: 'Mapped' [2019-08-22 193125,359] INFO - prefect.TaskRunner | Task 'auth_to_friend': Starting task run... [2019-08-22 193125,360] INFO - prefect.TaskRunner | Task 'auth_to_friend[0]': Starting task run... [2019-08-22 193125,360] WARNING - prefect.TaskRunner | Task 'auth_to_friend[0]': can't use cache because it is now invalid [2019-08-22 193125,605] INFO - prefect.TaskRunner | Unexpected error: TwitterError([{'message': 'Rate limit exceeded', 'code': 88}]) [2019-08-22 193125,605] INFO - prefect.TaskRunner | Task 'auth_to_friend[1]': finished task run for task with final state: 'Retrying' .... Should something like this just work? Without looking too closely at the code, the thought that comes to mind is that the functions themselves take dicts, pulling out particular keys, and return lists of dicts. I don't work a lot in python and don't know how it handles equality tests of dicts- is the use of dicts in the task calls potentially screwing up caching?
j

Jeremiah

08/22/2019, 8:05 PM
@Jonah Benton are you running on 0.6.0? There was a bug that caused the log statement about invalid caches to print even when the cache was valid
0.6.1 corrects this
As for your question about dict equality — you can optionally have your cache invalidate based on inputs, so it’s possible that would result in the behavior you’re describing. However, the default cache is based simply on elapsed time so that shouldn’t be an issue.
j

Jonah Benton

08/22/2019, 8:07 PM
Looks like 0.6.1: pip3 list --user | grep prefect prefect 0.6.1
j

Jeremiah

08/22/2019, 8:08 PM
Ah, I apologize then
j

Jonah Benton

08/22/2019, 8:08 PM
I want to cache, and it doesn't seem to be doing so, as I am running into rate limits
j

Jeremiah

08/22/2019, 8:10 PM
That’s certainly odd. All of your flow runs are in the same python process? Caches in core are shared across the process.
j

Jonah Benton

08/22/2019, 8:13 PM
Is caching memory based? I am running flows by running a script with flow.run() at the end. So new process per flow execution.
j

Jeremiah

08/22/2019, 8:16 PM
Ok, that’s the likely culprit in this case. The caching mechanism as implemented in Core is meant principally for testing flows that run on a schedule with
flow.run()
in a single, long-running process, so they are cached in memory. Caching is really meant to be used with a distributed, stateful backend so that cached results can be retrieved as needed via API no matter where a deployed flow is running (which is how you’d use it in Prefect Cloud).
However we want to keep expanding the utility of Core without a database as much as we can, so maybe we can work with you on expanding local caching to use a filesystem
Alternatively we could get you early access to Cloud 🙂
j

Jonah Benton

08/22/2019, 8:22 PM
Ok, got it, thank you. I am not at the place where I could benefit from access to cloud but will definitely ask if we get there. Is the cache pluggable, can I put in a quick file system based get/put thingee?
j

Jeremiah

08/22/2019, 8:33 PM
It’s pluggable but a little bit intricately tied to some other (also pluggable) things like
ResultHandlers
. There’s already a filesystem resulthandler so I suspect that @Chris White will tell us this is actually a lot easier than it seems
He’s on vacation until Monday, though
However he seems to be checking in from time to time so maybe we’ll hear from him soon 😉
😎 1
j

Jonah Benton

08/22/2019, 8:39 PM
Cool, thank you
c

Chris White

08/22/2019, 8:49 PM
👋 I would actually recommend using a
state_handler
on your tasks which require caching; if the task is in a
Pending
-like state, you can extract the stored
Cached
state from your local filesystem and return it from the handler, allowing Prefect Core to determine if the cache is still valid or not
j

Jonah Benton

08/23/2019, 12:38 AM
Thank you, makes sense!
3 Views