Chris White
Brett Naul
07/10/2019, 5:48 PMAdam Roderick
07/11/2019, 2:57 AMChris White
Adam Roderick
07/11/2019, 12:59 PMAdam Roderick
07/11/2019, 12:59 PMAdam Roderick
07/11/2019, 1:01 PMDavid Ojeda
07/11/2019, 4:19 PMDavid Ojeda
07/11/2019, 4:27 PMcache_for
task parameter)
3. when are the i/p/r cached?
I came up with these questions because I am trying to leverage the cache for my case and I cannot manage to see the cache hit. I have some map tasks that are sequential to each other, with about 500 elements per map. After I wait until there are about ~200 finished tasks on the first map, I am purposedly killing my workers to see them re-do all of the work again and profit from the cache, but they are still doing each mapped task one by one.
I ended up understanding what my problem was as I finished writing this ( š¦ ) : we were rolling our own kind of ācacheā with our data backend and we raised a state that derives Skipped, but the result caching does not occur when the resulting state is skipped.
Even if I solved my problem, I am still posting these questions to be sure I understand the mechanism.Adam Roderick
07/11/2019, 8:15 PMKalani Murakami
07/12/2019, 6:22 AMYoni Davidson
07/12/2019, 11:06 PMMichael Reeves
07/12/2019, 11:46 PMYoni Davidson
07/13/2019, 12:29 AMjeetu
07/14/2019, 11:12 AMjeetu
07/14/2019, 11:17 AMJohn Ramirez
07/15/2019, 2:12 PMJohn Ramirez
07/15/2019, 5:53 PMSherman
07/15/2019, 11:15 PMJeff Quinn
07/17/2019, 2:35 PMJason Damiani
07/17/2019, 3:52 PMAlex Cano
07/17/2019, 10:11 PMHendrik Pauthner
07/18/2019, 2:35 PMJoe Schmid
07/18/2019, 5:57 PMJoe Schmid
07/18/2019, 5:59 PMDavid Ojeda
07/18/2019, 6:18 PM2019-07-18 20:12:28 ixion.local prefect.TaskRunner[22036] WARNING Task 'SlackTask': can't use cache because it is now invalid
⦠in my opinion, the message is misleading (there is no invalid cache, I have not setup any cache_for for this task) and maybe the warning level is too high for this case.
While I was reading the related code:
if self.task.cache_for is not None:
candidate_states = prefect.context.caches.get(
self.task.cache_key or self.task.name, []
)
sanitized_inputs = {key: res.value for key, res in inputs.items()}
for candidate in candidate_states:
if self.task.cache_validator(
candidate, sanitized_inputs, prefect.context.get("parameters")
):
candidate._result = candidate._result.to_result()
return candidate
self.logger.warning(
"Task '{name}': can't use cache because it "
"is now invalid".format(
name=prefect.context.get("task_full_name", self.task.name)
)
)
I wondered if the problem is just as simple as: the logging instruction was supposed to be indented one level deeper, so it happens when there 1) the cache is enabled and 2) there are cache candidates but all fail to hit.Romain
07/19/2019, 8:17 AMconditions = is_true.map(input_data)
as = do_a.map(input_data)
bs = do_b.map(input_data)
ifelse(conditions, as, bs)
It does not work out because the ifelse function expect the condition to be a boolean, not a list of boolean. Is there a way to do it differently?Jie Lou
07/19/2019, 8:08 PMJie Lou
07/19/2019, 8:09 PMJie Lou
07/19/2019, 8:09 PM