Constantino Schillebeeckx
07/08/2025, 6:33 PMaaabbbaaafrom prefect import task, flow
from typing import Any
from prefect.cache_policies import INPUTS
from prefect.context import TaskRunContext
from prefect.runtime import flow_run, task_run
def cache_key_fn(context: TaskRunContext, parameters: dict[str, Any]) -> str:
    input_key = INPUTS.compute_key(task_ctx=context, inputs=parameters, flow_parameters=None)  # type: ignore[arg-type]
    return f"{flow_run.name}/{task_run.task_name}/{input_key}.json"
@task(cache_key_fn=cache_key_fn)
def a():
    raise ValueError
@task(cache_key_fn=cache_key_fn)
def b():
    return True
@task()
def aa():
    raise ValueError
@task()
def bb():
    return True
@flow(name="foo", alert_on_failure=False)
def foo():
    A = a.submit()
    AA = aa.submit()
    b(wait_for=[A])
    bb(wait_for=[AA])Marvin
07/08/2025, 6:33 PMMarvin
07/08/2025, 6:33 PMabaabbpython
from prefect import task, flow
@task(cache_key_fn="auto")  # This will cache based on inputs automatically
def a():
    raise ValueError
@task(cache_key_fn="auto")
def b():
    return Truepython
from prefect import task, flow
from prefect.context import TaskRunContext
def cache_key_fn(context: TaskRunContext, parameters: dict) -> str:
    return f"{context.task_run.id}"
@task(cache_key_fn=cache_key_fn)
def a():
    raise ValueErrorpython
from prefect import task, flow
@task(version="1")  # Version will ensure consistent caching
def a():
    raise ValueErroraabbpython
from prefect import task, flow
@task(cache_key_fn="auto")  # Using automatic caching
def a():
    raise ValueError
@task(cache_key_fn="auto")
def b():
    return True
@task(cache_key_fn="auto")  # Added caching to aa and bb
def aa():
    raise ValueError
@task(cache_key_fn="auto")
def bb():
    return True
@flow(name="foo", alert_on_failure=False)
def foo():
    A = a.submit()
    AA = aa.submit()
    b(wait_for=[A])
    bb(wait_for=[AA])cache_key_fn="auto"Constantino Schillebeeckx
07/08/2025, 6:37 PMThe issue with your current implementation is that the cache key includes the flow run name, which changes with each run.This is not true, the flow run name stays that same when rerun - the return value from my cache function does not change within the same flow run since neither the flow run name, task name or inputs change.
Marvin
07/08/2025, 6:37 PMMarvin
07/08/2025, 6:38 PMpython
from prefect import task, flow
from prefect.states import Completed
@task
def a():
    raise ValueError
@task
def b(upstream_result=None):
    if upstream_result.state.is_completed():
        print("Upstream task completed, continuing...")
        return True
    else:
        print("Upstream task not completed, skipping...")
        return False
@flow
def foo():
    A = a.submit()
    b(A)python
from prefect import task, flow
@task(persist_result=True)  # This will persist the result
def a():
    raise ValueError
@task
def b():
    return True
@flow(persist_result=True)
def foo():
    A = a.submit()
    b(wait_for=[A])python
from prefect import task, flow
@task(retries=2, retry_delay_seconds=0)
def a():
    raise ValueError
@task
def b():
    return True
@flow
def foo():
    A = a.submit()
    b(wait_for=[A])