Juan Alvarado
09/17/2024, 10:39 PMfrom prefect.task_worker import serve
from prefect import task
from prefect.cache_policies import INPUTS
@task(cache_policy=INPUTS)
def simple(msg:str)->str:
print(f'simple {msg}')
return msg
if __name__ == "__main__":
serve(simple)
and the flow definition:
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@flow(log_prints=True, task_runner=ThreadPoolTaskRunner)
def hello_world():
f = simple.delay('foo')
f.wait()
print(f'simple {f.result()}')
f = simple.delay('foo')
f.wait()
print(f'simple {f.result()}')
if __name__ == "__main__":
hello_world.serve(name="hello-world-deployment",
tags=["onboarding"],
parameters={})
Whenever I run the flow, the simple task is always executed (never fulfilled from cache). I have set the flag PREFECT_RESULTS_PERSIST_BY_DEFAULT in all processes. I have tried different cache policies, as well as cache key functions. Tasks running locally are being cached OK. The only thing I have noticed is that the task worker places the cache files in the current directory, and does not seem to follow the setting PREFECT_LOCAL_STORAGE_PATH.Nate
09/18/2024, 3:38 PMNate
09/18/2024, 3:38 PMJuan Alvarado
09/18/2024, 10:25 PMNate
09/18/2024, 10:25 PM