<@ULVA73B9P> When using wait_for_input and if i ha...
# ask-marvin
a
@Marvin When using wait_for_input and if i have setup caching in flows. If the task fails and we do a retry it gives error that input is already defined.
m
thought for 505.8 seconds
Thanks for flagging this — I’ve seen this when combining wait_for_input with retries/caching. What’s happening - In Prefect 3.x, wait_for_input/receive_input stores a “run input” in the backend keyed by a unique key (or key prefix). That key must be unique per flow run. - When a task that calls wait_for_input is retried, the code re-runs and tries to register the same input key again. Since that key already exists, the backend raises “input is already defined.” Ways to fix it (pick the one that matches your intent) 1) Don’t retry the human step - Move the wait_for_input call out of any retried/cached task. - Pattern: ask for input once at the flow level, then pass the value into a retried task.
Copy code
from prefect import flow, task
from prefect.input import receive_input

@task(retries=3, retry_delay_seconds=10)
def risky_step(approval: str):
    # do the thing that may fail/retry
    ...

@flow
def my_flow():
    get = receive_input(str)  # waits once
    approval = get("Type APPROVE to continue")
    risky_step(approval)
2) Make input keys unique per attempt (if you really need wait_for_input inside a retried task) - Generate a unique key_prefix per attempt using the task run attempt number.
Copy code
from prefect import task
from prefect.input import receive_input
from prefect.runtime import task_run

@task(retries=3)
def task_with_input():
    attempt = task_run.run_count()  # 1-based attempt counter
    get = receive_input(str, key_prefix=f"approval/{attempt}")
    answer = get("Approve? (yes/no)")
    ...
3) Reuse the first answer across retries (don’t re-prompt) - Split the prompt out so it runs once (no retries, no cache needed), then let downstream work retry. - Alternatively, if you must keep it together, read existing input if present and only prompt when it’s missing (advanced: use read/filter APIs to check for an existing run input by key_prefix before calling receive_input). 4) If you truly want to re-prompt on each retry for the same key - Delete the prior input before waiting/sending again using the input actions API. - Example (async) cleanup:
Copy code
from prefect import get_client
from prefect.input.actions import filter_flow_run_input, delete_flow_run_input
from prefect.runtime import flow_run

async def clear_previous_inputs(prefix: str):
    async with get_client() as client:
        existing = await filter_flow_run_input(
            client, key_prefix=prefix, flow_run_id=flow_run.id()
        )
        for item in existing:
            await delete_flow_run_input(client, key=item.key, flow_run_id=flow_run.id())
Docs and references - Interactive (wait_for_input / run inputs): Interactive flows and inputs - Caching overview: Caching in Prefect 3 A couple of clarifying questions so I can tailor this to your setup: - Which Prefect version are you on (assuming 3.x)? - Is wait_for_input inside a task that has retries or a cache policy? - Do you want to re-prompt on every retry, or should retries reuse the first answer? If you can share a small code snippet of where wait_for_input is placed and how retries/caching are configured, I’ll suggest the minimal change to fix the “input is already defined” error for your case.
👍 1