Constantino Schillebeeckx
06/30/2025, 1:47 PMsuccess
and then rerun the flow, that previously failed task is run again instead of skipped. This is new behavior compared to Prefect 1 and 2 - how can I get Prefect 3 to behave the same way? That is, not rerun the task that is now in success
state?Marvin
06/30/2025, 1:47 PMMarvin
06/30/2025, 1:48 PMcache_key_fn
parameter when defining your task. This allows you to specify how Prefect should identify previously successful runs of the task.
Here's an example of how to implement caching to skip successful tasks:
python
from prefect import task, flow
from datetime import timedelta
@task(
cache_key_fn=lambda context: context.task_run_name, # Use task name as cache key
cache_expiration=timedelta(days=1) # Cache results for 1 day
)
def my_task():
return "some result"
@flow
def my_flow():
return my_task()
If you're still seeing unexpected behavior where successfully completed tasks are being rerun, this might be a bug. Could you share:
1. How exactly you're setting the task state to success?
2. The code for your flow and tasks?
3. How you're triggering the rerun?
This would help me provide more specific guidance for your use case.Constantino Schillebeeckx
06/30/2025, 1:54 PMsuccess
it still gets rerun.Marvin
06/30/2025, 1:54 PMMarvin
06/30/2025, 1:58 PMpython
from prefect.client import get_client
from prefect.states import Completed
async with get_client() as client:
# Replace with your actual result data
result_data = {"your": "result", "data": "here"}
new_state = Completed(data=result_data)
await client.set_task_run_state(
task_run_id="your-task-run-id",
state=new_state
)
2. Alternatively, you can implement result persistence at the task level to ensure results are saved even if the task fails:
python
from prefect import task
@task(
persist_result=True, # This ensures results are always persisted
retry_delay_seconds=0,
retries=2
)
def my_task():
# Your task logic here
return result
3. You can also configure this globally in your Prefect settings:
bash
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
If you're still seeing the task rerun after setting both the state and result, please check:
1. That you're using the correct task run ID when setting the state
2. That the result data is being properly serialized
3. That you've configured a result storage block if you're persisting large results
Would you like me to provide more specific guidance based on your particular setup?Constantino Schillebeeckx
06/30/2025, 2:19 PMa
as successful and rerun the flow, the flow runs a
again.
from prefect import task, flow
def foo():
# some logic here
@task(cache_key_fn=foo)
def a():
raise ValueError
@task(cache_key_fn=foo)
def b():
return True
@flow(name="foo")
def foo():
A = a()
b(wait_for=[A])
Marvin
06/30/2025, 2:19 PMMarvin
06/30/2025, 2:19 PMcache_key_fn
should return a string or hashable value, but you're passing a function reference itself as the cache_key_fn
2. The function foo()
is defined twice (once as a standalone function and once as a flow)
3. The caching mechanism needs a proper key to identify when to skip execution
Here's the corrected version that will properly cache and skip task a
when it's marked as successful:
python
from prefect import task, flow
def get_cache_key(context):
# Return a consistent cache key for the task
return f"{context.task_run_name}-{context.flow_run.name}"
@task(cache_key_fn=get_cache_key)
def a():
raise ValueError
@task(cache_key_fn=get_cache_key)
def b():
return True
@flow(name="my_flow")
def my_flow():
A = a()
b(wait_for=[A])
A few important notes:
1. The cache_key_fn
receives a context object that contains information about the current run
2. Using the task run name in the cache key ensures that the same task will be cached across flow runs
3. Make sure your flow name is unique and not conflicting with other function names
If you want to force the task to be skipped when marked as successful regardless of the cache key, you can also use the persist_result=True
parameter:
python
@task(persist_result=True)
def a():
raise ValueError
This will ensure that once the task is marked as successful, its result is persisted and used in subsequent runs instead of re-executing the task.
Would you like me to explain more about how the caching mechanism works or provide additional examples?Constantino Schillebeeckx
06/30/2025, 2:22 PMMarvin
06/30/2025, 2:22 PMMarvin
06/30/2025, 2:23 PMwait_for
creates a dependency between tasks a
and b
, but it doesn't preserve the caching behavior you're looking for. Here's how you can modify your flow to achieve the desired behavior:
python
from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def a():
raise ValueError
@task
def b(upstream_result=None):
return True
@flow(name="my_flow")
def my_flow():
A = a()
return b(upstream_result=A)
Key changes:
1. Use task_input_hash
instead of a custom cache key function - this is Prefect's built-in function that handles caching based on inputs
2. Pass the dependency explicitly as a parameter instead of using wait_for
3. Add cache_expiration
to ensure the cache persists between runs
If you still need to use wait_for
for some reason, you can try this alternative approach:
python
from prefect import task, flow
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def a():
raise ValueError
@task
def b():
return True
@flow(name="my_flow")
def my_flow():
A = a()
# Create an explicit dependency while preserving caching
if A.is_completed():
return b()
else:
return b(wait_for=[A])
If you're still seeing the task being rerun after marking it as successful, could you share:
1. How exactly are you marking the task as successful?
2. Are you running the flow through a deployment or directly?
3. Can you confirm if you see any cache hits in the logs when you rerun the flow?Constantino Schillebeeckx
06/30/2025, 2:37 PM