<@ULVA73B9P> I run long running python job in loop...
# ask-marvin
s
@Marvin I run long running python job in loop on docker worker. I restarted the docker worker. Job not resumed again. My script needs to run every 2 min (365 days) and if any failure occurs it should re start
m
thought for 426.1 seconds
Thanks for the details — I think I see what happened. Why your job didn’t “resume” - Restarting a worker does not resume an in-flight Python process. If a worker/container is killed mid-run, the flow run is marked Crashed. It won’t pick up where it left off. - To make this resilient, avoid an infinite loop inside a single flow run. Instead, schedule a short, idempotent flow to run every 2 minutes. If a run fails or crashes, configure retries so Prefect will re-run it. Recommended approach 1) Wrap one “iteration” of your job as a flow with retries - This makes each 2-minute unit independent and retryable.
Copy code
from prefect import flow, get_run_logger
from prefect.schedules import Interval

@flow(name="two-minute-job", retries=5, retry_delay_seconds=30, log_prints=True)
def two_minute_job():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Starting iteration")
    # do one iteration of your work here
    # ...
    <http://logger.info|logger.info>("Done")

if __name__ == "__main__":
    # Deploy with an interval schedule of 120 seconds
    # Assumes this file is flows/two_minute_job.py
    two_minute_job.from_source(
        source=".",  # your project root (can be a repo URL too)
        entrypoint="flows/two_minute_job.py:two_minute_job",
    ).deploy(
        name="every-2-min",
        work_pool_name="my-docker-pool",
        schedules=[Interval(interval=120)],
    )
Notes: -
retries
and
retry_delay_seconds
ensure Prefect re-runs the flow if it fails. If the worker dies (flow marked Crashed), Prefect can re-run it when the worker is back as long as retries are set. - Keep the work done inside each run idempotent so retries don’t cause double-processing. 2) Use a Docker work pool and a worker - Create a Docker work pool (once):
Copy code
prefect work-pool create "my-docker-pool" --type docker
- Start a worker that points at that pool:
Copy code
prefect worker start --pool "my-docker-pool"
Tip: when starting a Docker worker, ensure the docker extra is installed, e.g.
pip install "prefect[docker]"
or run with
uv run --with 'prefect[docker]' prefect worker start --pool "my-docker-pool"
. 3) Deploy with the schedule - The example above deploys via Python with
schedules=[Interval(120)]
. You can also use
prefect deploy
from a project, but the Python example is the most direct way to attach an interval schedule in 3.x. Behavior after a worker restart - Any run that was in-flight when the worker was killed won’t resume mid-step. With flow-level retries configured, Prefect will re-run that flow attempt when the worker comes back (up to the retry limit). - Future scheduled runs (every 2 min) continue automatically. If your worker is down for a bit, you may see “Late” runs; once the worker is back, it will start picking up runs. Optional: Automate re-triggers on Crashed - If you want belt-and-suspenders guarantees, you can add an Automation that triggers a “Run Deployment” when a flow run enters Crashed or Failed. This is rarely needed if you set flow retries, but it’s available. - Docs: Automations API Docs and references - Interval schedules API: prefect.schedules.Interval - Flow decorator (retries, etc.): prefect.flow - Work pools and workers: Work pools and workers - Blog background on workers: Work pools and workers overview A few questions to make sure this fits your setup: - Are you on Prefect 3.x and using a Docker work pool? - Is your current code a single long-lived flow with a
while True
loop? - Do you need to “catch up” missed intervals if the worker is down, or is it okay to just resume on the next tick?
If you share how you’re running the script today (flow code and deployment steps), I can help convert it to this pattern.
s
@Marvin i don't want to schedule more than 5 runs on worker. Discard the older one and focus on recent schedule
m
thought for 258.0 seconds
Got it — two pieces to solve what you want: 1) Limit how many runs execute at once (max 5) - Set a work queue concurrency limit to 5 so your worker never runs more than 5 flow runs concurrently. - This won’t trim backlog by itself, but it guarantees your worker won’t be overwhelmed.
Copy code
prefect work-queue set-concurrency-limit "<your-queue-name>" 5
Docs: Global concurrency limits 2) Discard older scheduled runs and focus on the most recent ones If the worker is down, runs can pile up and become “Late”. To keep only the most recent N and drop older ones, use one of these patterns: A) Self-skip guard inside your flow (keep last 5) - At the start of each run, check how many newer runs (for the same deployment) are queued; if there are 5 or more, skip/cancel this run immediately. This ensures older runs do not do work when they finally start.
Copy code
from prefect import flow, get_run_logger
from prefect.context import get_run_context
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterDeploymentId,
    FlowRunFilterStateType,
)
from prefect.client.schemas.sorting import FlowRunSort
from prefect.client.schemas.objects import StateType
from prefect.states import Cancelled

KEEP = 5

@flow
async def two_minute_job():
    logger = get_run_logger()
    ctx = get_run_context()
    me = ctx.flow_run

    if not me.deployment_id or not me.expected_start_time:
        logger.warning("No deployment/expected_start_time; skipping backlog check")
    else:
        async with get_client() as client:
            runs = await client.read_flow_runs(
                flow_run_filter=FlowRunFilter(
                    deployment_id=FlowRunFilterDeploymentId(any_=[me.deployment_id]),
                    state=FlowRunFilterStateType(
                        any_=[
                            StateType.SCHEDULED,
                            StateType.PENDING,
                            StateType.LATE,
                            StateType.AWAITINGRETRY,
                            StateType.RETRYING,
                        ]
                    ),
                ),
                sort=FlowRunSort.EXPECTED_START_TIME_ASC,
            )

            newer = [
                r for r in runs
                if r.expected_start_time and r.expected_start_time > me.expected_start_time
            ]
            if len(newer) >= KEEP:
                <http://logger.info|logger.info>(f"Skipping this run to keep only the last {KEEP} scheduled runs.")
                async with get_client() as client2:
                    await client2.set_flow_run_state(
                        me.id, state=Cancelled(name=f"Skipped; keep only last {KEEP} runs")
                    )
                return

    # ... do your real work here ...
    <http://logger.info|logger.info>("Processing the iteration")
B) Maintenance flow to proactively trim backlog - Run this small flow (e.g., every minute) to cancel older Pending/Scheduled/Late runs, keeping only the most recent 5 by expected start time. ``` from prefect import flow, get_run_logger from prefect.client.orchestration import get_client from prefect.client.schemas.filters import ( FlowRunFilter, FlowRunFilterDeploymentId, FlowRunFilterStateType, DeploymentFilterName, ) from prefect.client.schemas.sorting import FlowRunSort from prefect.client.schemas.objects import StateType from prefect.states import Cancelled @flow async def trim_backlog(deployment_name: str, keep: int = 5): logger = get_run_logger() async with get_client() as client: deps = await client.read_deployments(deployment_filter=DeploymentFilterName(any_=[deployment_name])) if not deps: logger.warning(f"No deployment found named {deployment_name}") return dep_id = deps[0].id runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter( deployment_id=FlowRunFilterDeploymentId(any_=[dep_id]), state=FlowRunFilterStateType( any_=[StateType.SCHEDULED, StateType.PENDING, StateType.LATE, StateType.AWAITINGRETRY, StateType.RETRYING] ), ), sort=FlowRunSort.EXPECTED_START_TIME_ASC, ) to_cancel = runs[:-keep] # keep only the most recent
keep
logger.info(f"Cancelling {len(to_cancel)} older runs; keeping {keep}") for r in to_cancel: await client.set_flow_run_state( r.id, state=Cancelled(name=f"Backlog trim: keeping only latest {keep} runs") ) ``` What this gets you - With the work-queue concurrency limit, you’ll never run more than 5 at once. - With either the self-skip guard or the maintenance flow, older late runs are dropped so you focus on the most recent schedules. Quick check - Do you want to limit to 5 concurrent runs, 5 total “recent” runs, or both? - What’s your deployment/work-queue name? I can tailor the exact commands and wiring for your setup. References - Work pools/queues overview: Work pools and workers - Automations (if you want to build an automation to cancel old runs): Automations API