batuan
10/16/2025, 10:18 AMtask1 -> task2 -> [task3, task4].
Sometimes, my task4 fails because of missing block credentials. I want to retry the flow run from task4 only (after fixing the credentials). How can I make sure that the input from task2 is still available?Marvin
10/16/2025, 10:18 AMMarvin
10/16/2025, 10:24 AMtask2 so it can be re-used when you only re-run task4.
Here are two patterns depending on how you want to retry:
1) Retry only task4 within the same flow run
- Set retries on task4 and mark `task2`’s result as persisted. When you fix the credentials block, Prefect will retry just task4. Because task2 finished successfully and its result is persisted, the input to task4 will be reloaded without re-running task2.
2) Start a new flow run but skip recomputing task2
- If you prefer to trigger a fresh flow run (e.g., via the UI or CLI), add a cache policy to task2 and persist its result. On the new run, task2 will fetch the cached result and be skipped when inputs are unchanged, letting task4 run with the prior output.
Example
from prefect import flow, task
from prefect.cache_policies import INPUTS
@task
def task1():
...
# Persist so the value can be rehydrated by retries or a new run
# Cache by inputs so a new flow run can skip recomputing when inputs are unchanged
@task(persist_result=True, cache_policy=INPUTS)
def task2(x):
...
@task
def task3(y):
...
# Configure retries so only task4 is retried after you fix credentials
@task(retries=3, retry_delay_seconds=30)
def task4(y):
# load and use your credentials block here
...
@flow
def my_flow():
x = task1()
y = task2(x)
task3(y)
task4(y)
Notes and tips
- Make sure you have durable result storage configured so persisted results are accessible to retries that might be picked up by a different worker (e.g., S3/GCS/Azure/local volume). See Prefect’s results docs below.
- If your task2 output isn’t JSON-serializable, specify an appropriate serializer on the task.
- Updating the credentials block will apply to subsequent retries of `task4`; you don’t need to redeploy.
Docs
- Results and result storage
- Caching policies
Question for you: do you plan to retry task4 within the same flow run, or do you want to start a fresh run and have task2 be skipped? I can tailor the steps (and storage setup) accordingly.