Tita
08/15/2024, 5:15 PMMarvin
08/15/2024, 5:15 PMfrom datetime import datetime, timedelta, timezone
from prefect.client import get_client
from prefect.filters import FlowRunFilter, DeploymentFilter, FlowRunSort
from prefect.states import Scheduled
async def reschedule_late_flow_runs(deployment_name: str, delay: timedelta, most_recent_n: int, delete_remaining: bool = True) -> list:
states = ["Late"]
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(state=dict(name=dict(any_=states)), expected_start_time=dict(before_=datetime.now(timezone.utc))),
deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
sort=FlowRunSort.START_TIME_DESC,
limit=most_recent_n if not delete_remaining else None
)
if not flow_runs:
print(f"No flow runs found in states: {states!r}")
return []
rescheduled_flow_runs = []
for i, run in enumerate(flow_runs):
await client.delete_flow_run(flow_run_id=run.id)
if i < most_recent_n:
new_run = await client.create_flow_run_from_deployment(
deployment_id=run.deployment_id,
state=Scheduled(scheduled_time=run.expected_start_time + delay)
)
rescheduled_flow_runs.append(new_run)
return rescheduled_flow_runs
# Example usage
if __name__ == "__main__":
from asyncio import run
rescheduled_flow_runs = run(reschedule_late_flow_runs(deployment_name="your-deployment-name", delay=timedelta(hours=6), most_recent_n=3))
print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs")
By following these steps, you should be able to identify and resolve the issue of your runs being stuck in the late status. If the problem persists, please refer to the mentioned GitHub issues and documentation links for more context and potential fixes.
- Prevent running late runs: https://github.com/PrefectHQ/prefect/issues/9054
- Capacity limiter errors: https://github.com/PrefectHQ/prefect/issues/10632
- Agents canceling wrong work pools: https://github.com/PrefectHQ/prefect/issues/9569
- Reschedule late flow runs: Documentation provided above.Nate
08/15/2024, 5:20 PMNate
08/15/2024, 5:21 PMTita
08/15/2024, 5:42 PMTita
08/15/2024, 5:43 PMNate
08/15/2024, 5:55 PMI'm using Prefect-managed resourceshmm, do you mean that you're using a prefect managed work pool?
Tita
08/15/2024, 5:56 PMNate
08/15/2024, 6:01 PMTita
08/15/2024, 6:04 PMNate
08/15/2024, 7:14 PMRuns
page and filter for runs in Running
state)Tita
08/15/2024, 8:25 PMNate
08/15/2024, 8:39 PMRunning
state will count toward your limit
and whether or not they're consuming memory depends on whether the container is actually alive and running
unfortunately, inevitably, sometimes the prefect API loses track of a container while its running, leaving the flow run in a Running
state even though the container has exited
does that answer your question?Tita
08/15/2024, 8:50 PM