Isaac
10/08/2024, 10:17 PMdef cache_results_within_flow_run(
context: TaskRunContext, parameters: dict[str, Any]
) -> str:
"""Caches a task result within the context of the flow it is run in."""
return f"{context.task_run.flow_run_id}:{context.task_run.task_key}"
@task(
name="example",
tags=["pipelines"],
version=get_version(),
retries=2,
retry_delay_seconds=exponential_backoff(backoff_factor=60),
retry_jitter_factor=0.5,
on_failure=[alert_slack_on_task_failure],
cache_key_fn=cache_results_within_flow_run,
)
def trademark_etl() -> None:
"""Task for running the earnings calls etl Prefect deployment."""
deployment_name = "example-flow/example-deployment"
run_prefect_deployment_check_successful(deployment_name=deployment_name)
We have been overhauling our orchestration and aren't seeing the expected behavior for caching. Most likely we are doing something incorrectly but not sure what. Our goal is to cache task results in the context of the flow they were run in, so that if the flow fails due to any of its tasks failing, we can retry the flow, and only the tasks that have not run successfully (in the flow being retried) will be run. I implemented a caching function that attempts to do this, however, this morning when one of our tasks failed and I went to retry the flow, each task started running as normal, without regard to having already completed in the same flow. Could it be that this is happening because we are not returning anything from our tasks?Nate
10/08/2024, 10:36 PMCould it be that this is happening because we are not returning anything from our tasks?short answer, I think yes! are you using 2.x or 3.x? results are one of the few cases where there's some nuanced changes between the major versions
Isaac
10/08/2024, 10:44 PMIsaac
10/08/2024, 11:22 PMNone
even though it is not explicitly stated. My expectation would be that this None
value would be considered a result that could be cached. Needing to return something other than None
in order to enable caching would only make sense if there was an explicit check for a non None
value and only caching if one is found.Nate
10/09/2024, 3:21 AMfrom prefect import flow, task
from prefect.context import FlowRunContext
@task(persist_result=True)
def some_side_effect() -> None:
print("some side effect")
@flow(log_prints=True, retries=1)
def some_etl() -> None:
some_side_effect()
if (ctx := FlowRunContext.get()) and ctx.flow_run and ctx.flow_run.run_count == 1:
raise ValueError("This is the first run")
elif ctx and ctx.flow_run and ctx.flow_run.run_count == 2:
print("Second run, above this log `some_side_effect` should be cached")
if __name__ == "__main__":
some_etl()
because the default cache key computation considers the flow run id
22:19:35.142 | INFO | prefect.engine - Created flow run 'flying-nyala' for flow 'some-etl'
22:19:35.381 | INFO | Task run 'some_side_effect-24f' - Created task run 'some_side_effect-24f' for task 'some_side_effect'
22:19:35.394 | INFO | Task run 'some_side_effect-24f' - some side effect
22:19:35.398 | INFO | Task run 'some_side_effect-24f' - Finished in state Completed()
22:19:35.398 | ERROR | Flow run 'flying-nyala' - Encountered exception during execution: ValueError('This is the first run')
...
raise ValueError("This is the first run")
ValueError: This is the first run
22:19:35.540 | INFO | Flow run 'flying-nyala' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
22:19:35.809 | INFO | Task run 'some_side_effect-0fa' - Created task run 'some_side_effect-0fa' for task 'some_side_effect'
22:19:35.821 | INFO | Task run 'some_side_effect-0fa' - Finished in state Cached(type=COMPLETED)
22:19:35.822 | INFO | Flow run 'flying-nyala' - Second run, above this log `some_side_effect` should be cached
22:19:35.954 | INFO | Flow run 'flying-nyala' - Finished in state Completed()
in 2.x you should be able to get the behavior you want with your cache key
from typing import Any
from prefect import flow, task
from prefect.context import FlowRunContext, TaskRunContext
def cache_results_within_flow_run(
context: TaskRunContext, parameters: dict[str, Any]
) -> str:
"""Caches a task result within the context of the flow it is run in."""
return f"{context.task_run.flow_run_id}:{context.task_run.task_key}"
@task(persist_result=True, cache_key_fn=cache_results_within_flow_run)
def some_side_effect() -> None:
print("some side effect")
@flow(log_prints=True, retries=1)
def some_etl() -> None:
some_side_effect()
if (ctx := FlowRunContext.get()) and ctx.flow_run and ctx.flow_run.run_count == 1:
raise ValueError("This is the first run")
elif ctx and ctx.flow_run and ctx.flow_run.run_count == 2:
print("Second run, above this log `some_side_effect` should be cached")
if __name__ == "__main__":
some_etl()
...
ValueError: This is the first run
22:20:59.129 | INFO | Flow run 'real-puffin' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
22:20:59.667 | INFO | Flow run 'real-puffin' - Created task run 'some_side_effect-0' for task 'some_side_effect'
22:20:59.668 | INFO | Flow run 'real-puffin' - Executing 'some_side_effect-0' immediately...
22:21:00.456 | INFO | Task run 'some_side_effect-0' - Finished in state Cached(type=COMPLETED)
22:21:00.470 | INFO | Flow run 'real-puffin' - Second run, above this log `some_side_effect` should be cached
22:21:00.766 | INFO | Flow run 'real-puffin' - Finished in state Completed('All states completed.')