gabe
03/18/2025, 2:23 PM@task(cache_policy=INPUTS)
def add(x, y):
l = get_run_logger()
l.info(f"running task with {x, y}")
return x + y
@flow(
name="my-flow",
)
def sub_flow():
l = get_run_logger()
l.info(f"entering sub flow")
result = add(1, 3)
l.info(f"exiting sub flow")
return result
@flow(
name="main-flow",
persist_result=True,
)
def main_flow():
l = get_run_logger()
l.info(f"entering main flow")
subflow_result = sub_flow()
l.info(f"exiting main flow")
return {"result": subflow_result}
if __name__ == "__main__":
main_flow()
I see the following logs:
15:21:46.566 | INFO | Flow run 'dazzling-frog' - Beginning flow run 'dazzling-frog' for flow 'main-flow'
15:21:46.569 | INFO | Flow run 'dazzling-frog' - entering main flow
15:21:46.758 | INFO | Flow run 'romantic-cicada' - Beginning subflow run 'romantic-cicada' for flow 'my-flow'
15:21:46.759 | INFO | Flow run 'romantic-cicada' - entering sub flow
15:21:46.794 | INFO | Task run 'add-224' - Finished in state Cached(type=COMPLETED)
15:21:46.795 | INFO | Flow run 'romantic-cicada' - exiting sub flow
15:21:46.852 | INFO | Flow run 'romantic-cicada' - Finished in state Completed()
15:21:46.852 | INFO | Flow run 'dazzling-frog' - exiting main flow
15:21:46.919 | INFO | Flow run 'dazzling-frog' - Finished in state Completed()
Which means that the results of task was cached and reused correctly, but it didn't happen for the flows. persists_results doesnt seem to be doing the job. Are there other ways of caching flows?Marvin
03/18/2025, 2:24 PMadd
is correctly utilizing caching. However, persist_result=True
for flows does not equate to caching; it merely ensures that the results are persisted in the backend for retrieval later.
Currently, Prefect does not support caching at the flow level directly. Flow runs are treated as separate entities, and the caching mechanisms you see with tasks (e.g., using cache_policy
) do not apply to flows. The persist_result
parameter simply saves the flow result but does not prevent the flow from running again.
For more details on task caching, you can refer to the Prefect documentation on task caching.
If you need flow-level caching, you might need to implement a custom solution, such as storing flow results in an external cache or database and checking this cache at the start of the flow to decide whether to proceed with execution.