Shubham
10/11/2025, 1:53 AMMarvin
10/11/2025, 1:53 AMMarvin
10/11/2025, 2:00 AMfrom prefect import flow, get_run_logger
from prefect.schedules import Interval
@flow(name="two-minute-job", retries=5, retry_delay_seconds=30, log_prints=True)
def two_minute_job():
logger = get_run_logger()
<http://logger.info|logger.info>("Starting iteration")
# do one iteration of your work here
# ...
<http://logger.info|logger.info>("Done")
if __name__ == "__main__":
# Deploy with an interval schedule of 120 seconds
# Assumes this file is flows/two_minute_job.py
two_minute_job.from_source(
source=".", # your project root (can be a repo URL too)
entrypoint="flows/two_minute_job.py:two_minute_job",
).deploy(
name="every-2-min",
work_pool_name="my-docker-pool",
schedules=[Interval(interval=120)],
)
Notes:
- retries
and retry_delay_seconds
ensure Prefect re-runs the flow if it fails. If the worker dies (flow marked Crashed), Prefect can re-run it when the worker is back as long as retries are set.
- Keep the work done inside each run idempotent so retries don’t cause double-processing.
2) Use a Docker work pool and a worker
- Create a Docker work pool (once):
prefect work-pool create "my-docker-pool" --type docker
- Start a worker that points at that pool:
prefect worker start --pool "my-docker-pool"
Tip: when starting a Docker worker, ensure the docker extra is installed, e.g. pip install "prefect[docker]"
or run with uv run --with 'prefect[docker]' prefect worker start --pool "my-docker-pool"
.
3) Deploy with the schedule
- The example above deploys via Python with schedules=[Interval(120)]
. You can also use prefect deploy
from a project, but the Python example is the most direct way to attach an interval schedule in 3.x.
Behavior after a worker restart
- Any run that was in-flight when the worker was killed won’t resume mid-step. With flow-level retries configured, Prefect will re-run that flow attempt when the worker comes back (up to the retry limit).
- Future scheduled runs (every 2 min) continue automatically. If your worker is down for a bit, you may see “Late” runs; once the worker is back, it will start picking up runs.
Optional: Automate re-triggers on Crashed
- If you want belt-and-suspenders guarantees, you can add an Automation that triggers a “Run Deployment” when a flow run enters Crashed or Failed. This is rarely needed if you set flow retries, but it’s available.
- Docs: Automations API
Docs and references
- Interval schedules API: prefect.schedules.Interval
- Flow decorator (retries, etc.): prefect.flow
- Work pools and workers: Work pools and workers
- Blog background on workers: Work pools and workers overview
A few questions to make sure this fits your setup:
- Are you on Prefect 3.x and using a Docker work pool?
- Is your current code a single long-lived flow with a while True
loop?
- Do you need to “catch up” missed intervals if the worker is down, or is it okay to just resume on the next tick?Marvin
10/11/2025, 2:00 AMShubham
10/11/2025, 2:23 AMMarvin
10/11/2025, 2:23 AMMarvin
10/11/2025, 2:28 AMprefect work-queue set-concurrency-limit "<your-queue-name>" 5
Docs: Global concurrency limits
2) Discard older scheduled runs and focus on the most recent ones
If the worker is down, runs can pile up and become “Late”. To keep only the most recent N and drop older ones, use one of these patterns:
A) Self-skip guard inside your flow (keep last 5)
- At the start of each run, check how many newer runs (for the same deployment) are queued; if there are 5 or more, skip/cancel this run immediately. This ensures older runs do not do work when they finally start.
from prefect import flow, get_run_logger
from prefect.context import get_run_context
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterDeploymentId,
FlowRunFilterStateType,
)
from prefect.client.schemas.sorting import FlowRunSort
from prefect.client.schemas.objects import StateType
from prefect.states import Cancelled
KEEP = 5
@flow
async def two_minute_job():
logger = get_run_logger()
ctx = get_run_context()
me = ctx.flow_run
if not me.deployment_id or not me.expected_start_time:
logger.warning("No deployment/expected_start_time; skipping backlog check")
else:
async with get_client() as client:
runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
deployment_id=FlowRunFilterDeploymentId(any_=[me.deployment_id]),
state=FlowRunFilterStateType(
any_=[
StateType.SCHEDULED,
StateType.PENDING,
StateType.LATE,
StateType.AWAITINGRETRY,
StateType.RETRYING,
]
),
),
sort=FlowRunSort.EXPECTED_START_TIME_ASC,
)
newer = [
r for r in runs
if r.expected_start_time and r.expected_start_time > me.expected_start_time
]
if len(newer) >= KEEP:
<http://logger.info|logger.info>(f"Skipping this run to keep only the last {KEEP} scheduled runs.")
async with get_client() as client2:
await client2.set_flow_run_state(
me.id, state=Cancelled(name=f"Skipped; keep only last {KEEP} runs")
)
return
# ... do your real work here ...
<http://logger.info|logger.info>("Processing the iteration")
B) Maintenance flow to proactively trim backlog
- Run this small flow (e.g., every minute) to cancel older Pending/Scheduled/Late runs, keeping only the most recent 5 by expected start time.
```
from prefect import flow, get_run_logger
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterDeploymentId,
FlowRunFilterStateType,
DeploymentFilterName,
)
from prefect.client.schemas.sorting import FlowRunSort
from prefect.client.schemas.objects import StateType
from prefect.states import Cancelled
@flow
async def trim_backlog(deployment_name: str, keep: int = 5):
logger = get_run_logger()
async with get_client() as client:
deps = await client.read_deployments(deployment_filter=DeploymentFilterName(any_=[deployment_name]))
if not deps:
logger.warning(f"No deployment found named {deployment_name}")
return
dep_id = deps[0].id
runs = await client.read_flow_runs(Marvin
10/11/2025, 2:28 AMkeep
logger.info(f"Cancelling {len(to_cancel)} older runs; keeping {keep}")
for r in to_cancel:
await client.set_flow_run_state(
r.id, state=Cancelled(name=f"Backlog trim: keeping only latest {keep} runs")
)
```
What this gets you
- With the work-queue concurrency limit, you’ll never run more than 5 at once.
- With either the self-skip guard or the maintenance flow, older late runs are dropped so you focus on the most recent schedules.
Quick check
- Do you want to limit to 5 concurrent runs, 5 total “recent” runs, or both?
- What’s your deployment/work-queue name? I can tailor the exact commands and wiring for your setup.
References
- Work pools/queues overview: Work pools and workers
- Automations (if you want to build an automation to cancel old runs): Automations API