Constantino Schillebeeckx
07/08/2025, 6:33 PMa
and aa
to "Completed" and I rerun the flow expecting that the downstream tasks of b
and bb
will run. However the flow reruns tasks a
and aa
- how can I change this behavior so that the tasks marked as "Completed" get skipped when the flow is rerun?
from prefect import task, flow
from typing import Any
from prefect.cache_policies import INPUTS
from prefect.context import TaskRunContext
from prefect.runtime import flow_run, task_run
def cache_key_fn(context: TaskRunContext, parameters: dict[str, Any]) -> str:
input_key = INPUTS.compute_key(task_ctx=context, inputs=parameters, flow_parameters=None) # type: ignore[arg-type]
return f"{flow_run.name}/{task_run.task_name}/{input_key}.json"
@task(cache_key_fn=cache_key_fn)
def a():
raise ValueError
@task(cache_key_fn=cache_key_fn)
def b():
return True
@task()
def aa():
raise ValueError
@task()
def bb():
return True
@flow(name="foo", alert_on_failure=False)
def foo():
A = a.submit()
AA = aa.submit()
b(wait_for=[A])
bb(wait_for=[AA])
Marvin
07/08/2025, 6:33 PMMarvin
07/08/2025, 6:33 PMa
and b
use a custom cache key function
2. Tasks aa
and bb
don't use caching
To achieve what you want - skipping previously completed tasks on rerun - you have a few options:
1. Use Task Cache Keys Without Custom Function
python
from prefect import task, flow
@task(cache_key_fn="auto") # This will cache based on inputs automatically
def a():
raise ValueError
@task(cache_key_fn="auto")
def b():
return True
2. Use Task Run Context in Cache Key
If you need a custom cache key, include the task run ID to ensure proper caching:
python
from prefect import task, flow
from prefect.context import TaskRunContext
def cache_key_fn(context: TaskRunContext, parameters: dict) -> str:
return f"{context.task_run.id}"
@task(cache_key_fn=cache_key_fn)
def a():
raise ValueError
3. Use Task Version
python
from prefect import task, flow
@task(version="1") # Version will ensure consistent caching
def a():
raise ValueError
The issue with your current implementation is that the cache key includes the flow run name, which changes with each run. This causes Prefect to treat each run as a new execution, ignoring the previous state.
For tasks aa
and bb
that don't use caching, you'll need to add some form of caching mechanism if you want them to respect previous states.
Here's a complete corrected version of your flow:
python
from prefect import task, flow
@task(cache_key_fn="auto") # Using automatic caching
def a():
raise ValueError
@task(cache_key_fn="auto")
def b():
return True
@task(cache_key_fn="auto") # Added caching to aa and bb
def aa():
raise ValueError
@task(cache_key_fn="auto")
def bb():
return True
@flow(name="foo", alert_on_failure=False)
def foo():
A = a.submit()
AA = aa.submit()
b(wait_for=[A])
bb(wait_for=[AA])
With these changes, when you manually mark tasks as "Completed" and rerun the flow, Prefect will respect the cached states and skip those tasks.
Remember that task caching is designed to work with task inputs - if your tasks don't have inputs but you want to cache their results, using cache_key_fn="auto"
or a version number is the simplest approach.
For more details on task caching, you can refer to the Prefect documentation on caching.Constantino Schillebeeckx
07/08/2025, 6:37 PMThe issue with your current implementation is that the cache key includes the flow run name, which changes with each run.This is not true, the flow run name stays that same when rerun - the return value from my cache function does not change within the same flow run since neither the flow run name, task name or inputs change.
Marvin
07/08/2025, 6:37 PMMarvin
07/08/2025, 6:38 PMpython
from prefect import task, flow
from prefect.states import Completed
@task
def a():
raise ValueError
@task
def b(upstream_result=None):
if upstream_result.state.is_completed():
print("Upstream task completed, continuing...")
return True
else:
print("Upstream task not completed, skipping...")
return False
@flow
def foo():
A = a.submit()
b(A)
2. Use Task Results Storage
python
from prefect import task, flow
@task(persist_result=True) # This will persist the result
def a():
raise ValueError
@task
def b():
return True
@flow(persist_result=True)
def foo():
A = a.submit()
b(wait_for=[A])
3. Use Task-Level Retry Policy
If you need more control over when tasks rerun, you can configure a retry policy:
python
from prefect import task, flow
@task(retries=2, retry_delay_seconds=0)
def a():
raise ValueError
@task
def b():
return True
@flow
def foo():
A = a.submit()
b(wait_for=[A])
Unfortunately, there isn't currently a built-in way to exactly replicate the Prefect 1.x behavior where manually marked completed tasks are skipped on rerun. This is a known limitation that has been reported by several users.
The best practice would be to design your flows to be idempotent and use state dependencies or result persistence to manage task execution based on previous runs' states. If this is a critical requirement for your workflow, you might want to consider implementing custom state handlers or restructuring your flow to handle partial reruns differently.