Was hoping for a little help with a task caching i...
# prefect-getting-started
i
Was hoping for a little help with a task caching issue...
Copy code
def cache_results_within_flow_run(
    context: TaskRunContext, parameters: dict[str, Any]
) -> str:
    """Caches a task result within the context of the flow it is run in."""
    return f"{context.task_run.flow_run_id}:{context.task_run.task_key}"


@task(
    name="example",
    tags=["pipelines"],
    version=get_version(),
    retries=2,
    retry_delay_seconds=exponential_backoff(backoff_factor=60),
    retry_jitter_factor=0.5,
    on_failure=[alert_slack_on_task_failure],
    cache_key_fn=cache_results_within_flow_run,
)
def trademark_etl() -> None:
    """Task for running the earnings calls etl Prefect deployment."""
    deployment_name = "example-flow/example-deployment"

    run_prefect_deployment_check_successful(deployment_name=deployment_name)
We have been overhauling our orchestration and aren't seeing the expected behavior for caching. Most likely we are doing something incorrectly but not sure what. Our goal is to cache task results in the context of the flow they were run in, so that if the flow fails due to any of its tasks failing, we can retry the flow, and only the tasks that have not run successfully (in the flow being retried) will be run. I implemented a caching function that attempts to do this, however, this morning when one of our tasks failed and I went to retry the flow, each task started running as normal, without regard to having already completed in the same flow. Could it be that this is happening because we are not returning anything from our tasks?
n
hi @Isaac
Could it be that this is happening because we are not returning anything from our tasks?
short answer, I think yes! are you using 2.x or 3.x? results are one of the few cases where there's some nuanced changes between the major versions
i
this was with 2.X. I'd rather not return anything if it can be avoided. Are you aware of whether we would need to return something with prefect 3.X?
What confuses me though is that even though not explicitly stated, the function under the task does in fact return
None
even though it is not explicitly stated. My expectation would be that this
None
value would be considered a result that could be cached. Needing to return something other than
None
in order to enable caching would only make sense if there was an explicit check for a non
None
value and only caching if one is found.
n
in 3.x I think this works the way you want by default, without a special cache_key_fn
Copy code
from prefect import flow, task
from prefect.context import FlowRunContext


@task(persist_result=True)
def some_side_effect() -> None:
    print("some side effect")


@flow(log_prints=True, retries=1)
def some_etl() -> None:
    some_side_effect()
    if (ctx := FlowRunContext.get()) and ctx.flow_run and ctx.flow_run.run_count == 1:
        raise ValueError("This is the first run")
    elif ctx and ctx.flow_run and ctx.flow_run.run_count == 2:
        print("Second run, above this log `some_side_effect` should be cached")


if __name__ == "__main__":
    some_etl()
because the default cache key computation considers the flow run id
Copy code
22:19:35.142 | INFO    | prefect.engine - Created flow run 'flying-nyala' for flow 'some-etl'
22:19:35.381 | INFO    | Task run 'some_side_effect-24f' - Created task run 'some_side_effect-24f' for task 'some_side_effect'
22:19:35.394 | INFO    | Task run 'some_side_effect-24f' - some side effect
22:19:35.398 | INFO    | Task run 'some_side_effect-24f' - Finished in state Completed()
22:19:35.398 | ERROR   | Flow run 'flying-nyala' - Encountered exception during execution: ValueError('This is the first run')
...
    raise ValueError("This is the first run")
ValueError: This is the first run
22:19:35.540 | INFO    | Flow run 'flying-nyala' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
22:19:35.809 | INFO    | Task run 'some_side_effect-0fa' - Created task run 'some_side_effect-0fa' for task 'some_side_effect'
22:19:35.821 | INFO    | Task run 'some_side_effect-0fa' - Finished in state Cached(type=COMPLETED)
22:19:35.822 | INFO    | Flow run 'flying-nyala' - Second run, above this log `some_side_effect` should be cached
22:19:35.954 | INFO    | Flow run 'flying-nyala' - Finished in state Completed()
in 2.x you should be able to get the behavior you want with your cache key
Copy code
from typing import Any

from prefect import flow, task
from prefect.context import FlowRunContext, TaskRunContext


def cache_results_within_flow_run(
    context: TaskRunContext, parameters: dict[str, Any]
) -> str:
    """Caches a task result within the context of the flow it is run in."""
    return f"{context.task_run.flow_run_id}:{context.task_run.task_key}"


@task(persist_result=True, cache_key_fn=cache_results_within_flow_run)
def some_side_effect() -> None:
    print("some side effect")


@flow(log_prints=True, retries=1)
def some_etl() -> None:
    some_side_effect()
    if (ctx := FlowRunContext.get()) and ctx.flow_run and ctx.flow_run.run_count == 1:
        raise ValueError("This is the first run")
    elif ctx and ctx.flow_run and ctx.flow_run.run_count == 2:
        print("Second run, above this log `some_side_effect` should be cached")


if __name__ == "__main__":
    some_etl()
Copy code
...
ValueError: This is the first run
22:20:59.129 | INFO    | Flow run 'real-puffin' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
22:20:59.667 | INFO    | Flow run 'real-puffin' - Created task run 'some_side_effect-0' for task 'some_side_effect'
22:20:59.668 | INFO    | Flow run 'real-puffin' - Executing 'some_side_effect-0' immediately...
22:21:00.456 | INFO    | Task run 'some_side_effect-0' - Finished in state Cached(type=COMPLETED)
22:21:00.470 | INFO    | Flow run 'real-puffin' - Second run, above this log `some_side_effect` should be cached
22:21:00.766 | INFO    | Flow run 'real-puffin' - Finished in state Completed('All states completed.')