Syméon del Marmol
10/13/2025, 2:10 PMMarvin
10/13/2025, 2:10 PMMarvin
10/13/2025, 2:16 PMfrom prefect.client.orchestration import get_client
from prefect.client.schemas.actions import FlowRunUpdate
async def update_params(flow_run_id, new_params):
async with get_client() as client:
await client.update_flow_run(
flow_run=FlowRunUpdate(parameters=new_params),
flow_run_id=flow_run_id,
)
Note: This only affects runs that haven’t started yet; once the worker has dequeued and begun execution, this won’t change behavior.
Option B: Pause and receive input mid-run (human or service-in-the-loop)
In your flow:
from pydantic import BaseModel
from prefect import flow
from prefect.flow_runs import pause_flow_run
class Adjustments(BaseModel):
threshold: float | None = None
@flow
def my_flow(x: int):
# Do some work...
adj = pause_flow_run(wait_for_input=Adjustments) # pauses until input is sent
# Use adj to influence subsequent logic
if adj and adj.threshold is not None:
...
# Continue work...
From your backend (or another flow/service), send the input to resume:
from prefect.input.run_input import send_input
# flow_run_id is the ID of the paused flow run
send_input({"threshold": 0.9}, flow_run_id)
Option C: Start a new run with the new parameters
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import DeploymentFlowRunCreate
async def start_new_run(deployment_id, params):
async with get_client() as client:
fr = await client.create_flow_run_from_deployment(
deployment_id=deployment_id,
flow_run=DeploymentFlowRunCreate(parameters=params),
)
return fr.id
Docs and references:
- Interactive workflows (pause, send/receive input): Interactive workflows
- API client usage: API client docs
If you can share your exact use case (e.g., human approval, dynamic tuning by another service, etc.), I can suggest the cleanest pattern for your setup.