Hi has someone tried caching with task workers? I ...
# ask-community
j
Hi has someone tried caching with task workers? I have not been able to make this work. I have a setup with two processes, the task worker:
Copy code
from prefect.task_worker import serve
from prefect import task
from prefect.cache_policies import INPUTS

@task(cache_policy=INPUTS)
def simple(msg:str)->str:
    print(f'simple {msg}')
    return msg

if __name__ == "__main__":
    serve(simple)
and the flow definition:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@flow(log_prints=True, task_runner=ThreadPoolTaskRunner)
def hello_world():
    f = simple.delay('foo')
    f.wait()
    print(f'simple {f.result()}')
    f = simple.delay('foo')
    f.wait()
    print(f'simple {f.result()}')

if __name__ == "__main__":
    hello_world.serve(name="hello-world-deployment",
                      tags=["onboarding"],
                      parameters={})
Whenever I run the flow, the simple task is always executed (never fulfilled from cache). I have set the flag PREFECT_RESULTS_PERSIST_BY_DEFAULT in all processes. I have tried different cache policies, as well as cache key functions. Tasks running locally are being cached OK. The only thing I have noticed is that the task worker places the cache files in the current directory, and does not seem to follow the setting PREFECT_LOCAL_STORAGE_PATH.
n
hi @Juan Alvarado thank you for raising this! i will take a look at this as soon as possible, in the meantime if you're willing to create an issue that would be very helpful
j
n
thank you!