```def cache_results_within_flow_run( context:...
# prefect-cloud
i
Copy code
def cache_results_within_flow_run(
    context: TaskRunContext, parameters: dict[str, Any]
) -> str:
    """Caches a task result within the context of the flow it is run in."""
    return f"{context.task_run.flow_run_id}:{context.task_run.task_key}"


@task(
    name="example",
    tags=["pipelines"],
    version=get_version(),
    retries=2,
    retry_delay_seconds=exponential_backoff(backoff_factor=60),
    retry_jitter_factor=0.5,
    on_failure=[alert_slack_on_task_failure],
    cache_key_fn=cache_results_within_flow_run,
)
def trademark_etl() -> None:
    """Task for running the earnings calls etl Prefect deployment."""
    deployment_name = "example-flow/example-deployment"

    run_prefect_deployment_check_successful(deployment_name=deployment_name)
We have been overhauling our orchestration and aren't seeing the expected behavior for caching. Most likely we are doing something incorrectly but not sure what. Our goal is to cache task results in the context of the flow they were run in, so that if the flow fails due to any of its tasks failing, we can retry the flow, and only the tasks that have not run successfully (in the flow being retried) will be run. I implemented a caching function that attempts to do this, however, this morning when one of our tasks failed and I went to retry the flow, each task started running as normal, without regard to having already completed in the same flow. Could it be that this is happening because we are not returning anything from our tasks?