Berend Wentges
09/23/2025, 7:00 AMMarvin
09/23/2025, 7:00 AMMarvin
09/23/2025, 7:08 AMdeployment_id
and checks “running flow runs < 1” within a time window
- An action of type RunDeployment targeting that deployment_id
Example (shape only; use the UI to avoid schema details)
{
"name": "Ensure one run for deployment X",
"description": "Start a run if none are running",
"trigger": {
"type": "metric",
"posture": "proactive",
"query": {
"metric": "flow_runs.running",
"operator": "<",
"threshold": 1,
"where": {
"deployment_id": "YOUR-DEPLOYMENT-ID"
},
"within": "60s",
"firing_for": "15s"
}
},
"actions": [
{
"type": "run-deployment",
"deployment_id": "YOUR-DEPLOYMENT-ID",
"parameters": {}
}
],
"cooldown_seconds": 60
}
Questions for you
- Are you on Prefect Cloud or open-source Server?
- What deployment (name/ID) should this monitor?
- Do you want to ensure exactly one run is active at all times, or at least one (allowing more in parallel)? This will guide whether we add a concurrency limit as well.
Happy to draft the exact automation with your deployment details.Berend Wentges
09/23/2025, 7:26 AMMarvin
09/23/2025, 7:26 AMMarvin
09/23/2025, 7:34 AMprefect work-pool set-concurrency-limit "YOUR_POOL" 1
You can clear it later with:
prefect work-pool clear-concurrency-limit "YOUR_POOL"
- CLI reference: Work pool CLI
This gives you:
- The automation will start a run when none are running.
- The work pool concurrency limit ensures you never have more than one run at a time.
Option B — In-flow concurrency guard (if you can modify the flow)
- Define a global concurrency limit named for this deployment (UI: Concurrency Limits → create, limit=1).
- Acquire the slot at the start of your flow:
from prefect import flow
from prefect.concurrency import concurrency
@flow
def my_flow():
with concurrency("my-deployment-one-at-a-time"):
# your work here
...
- Docs: Global concurrency limits
Notes
- There is no “create automation” CLI; use the UI or API. You can list/inspect/pause/resume via CLI: prefect automation
- If you just need a cadence, a schedule is simpler — but it won’t “start only when none are running” (it may queue runs). The automation approach matches your requirement precisely.
If you share your deployment name/ID and work pool name, I can outline the exact UI clicks or an API payload you can post to your OSS server to create the automation.Berend Wentges
09/23/2025, 7:38 AMMarvin
09/23/2025, 7:39 AMMarvin
09/23/2025, 7:43 AMprefect work-pool set-concurrency-limit "YOUR_POOL" 1
- Work pool CLI docs: prefect work-pool CLI
- Result:
- If nothing is running, the next scheduled time will start a run.
- If one is running, additional runs will wait (not run concurrently). This may create a small backlog if your runtime exceeds the schedule interval.
Option 2: Lightweight “watcher” script + work pool concurrency limit
- Run a small script (via cron, systemd, or a tiny container) that checks if there are any RUNNING flow runs for your deployment, and if not, starts one. The work pool limit of 1 ensures you never exceed one in parallel even if two checks overlap.
Example watcher (sync, suitable for cron)
from time import sleep
from uuid import UUID
from prefect.client.orchestration import SyncPrefectClient
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterDeploymentId,
FlowRunFilterStateType,
)
from prefect.client.schemas.objects import StateType
from prefect.deployments.flow_runs import run_deployment
DEPLOYMENT_ID = "YOUR-DEPLOYMENT-UUID" # or use the deployment name if you prefer
CHECK_EVERY_SECONDS = 30
def ensure_one_running():
with SyncPrefectClient() as client:
runs = client.read_flow_runs(
flow_run_filter=FlowRunFilter(
deployment_id=FlowRunFilterDeploymentId(any_=[UUID(DEPLOYMENT_ID)]),
state_type=FlowRunFilterStateType(any_=[StateType.RUNNING]),
),
limit=1,
)
if not runs:
# Start a new run. You can also pass parameters or a flow_run_name here.
run_deployment(UUID(DEPLOYMENT_ID), client=client)
if __name__ == "__main__":
while True:
try:
ensure_one_running()
except Exception as exc:
# log and continue; avoid crashing the loop
print(f"Watcher error: {exc}")
sleep(CHECK_EVERY_SECONDS)
- Also cap your pool to 1 to enforce single concurrency:
prefect work-pool set-concurrency-limit "YOUR_POOL" 1
Docs and references
- Event triggers support (why proactive triggers don’t work on OSS): Support for event trigger types
- Work pool concurrency: prefect work-pool CLI
- Global concurrency limits (optional, if you also want a flow-level lock): Global concurrency limits
Would you like me to adapt the watcher for your specific deployment (ID/name) and check cadence? Or would you prefer the schedule + pool-limit approach?