William Fitzmaurice
06/17/2024, 12:14 AMMarvin
06/17/2024, 12:14 AMreschedule_late_flow_runs
function to reschedule multiple flow runs. Below is an example function that automates the rescheduling of flow runs:
python
from datetime import datetime, timedelta, timezone
from typing import Optional
from prefect import get_client
from prefect.orion.schemas.filters import FlowRunFilter, DeploymentFilter
from prefect.orion.schemas.sorting import FlowRunSort
from prefect.orion.schemas.states import Scheduled
async def reschedule_late_flow_runs(
deployment_name: str,
delay: timedelta,
most_recent_n: int,
delete_remaining: bool = True,
states: Optional[list[str]] = None
) -> list[FlowRun]:
if not states:
states = ["Late"]
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=dict(name=dict(any_=states)),
expected_start_time=dict(
before_=datetime.now(timezone.utc)
),
),
deployment_filter=DeploymentFilter(
name={'like_': deployment_name}
),
sort=FlowRunSort.START_TIME_DESC,
limit=most_recent_n if not delete_remaining else None
)
if not flow_runs:
print(f"No flow runs found in states: {states!r}")
return []
rescheduled_flow_runs = []
for i, run in enumerate(flow_runs):
await client.delete_flow_run(flow_run_id=run.id)
if i < most_recent_n:
new_run = await client.create_flow_run_from_deployment(
deployment_id=run.deployment_id,
state=Scheduled(
scheduled_time=run.expected_start_time + delay
),
)
rescheduled_flow_runs.append(new_run)
return rescheduled_flow_runs
if __name__ == "__main__":
import asyncio
rescheduled_flow_runs = asyncio.run(
reschedule_late_flow_runs(
deployment_name="your-deployment-name",
delay=timedelta(hours=6),
most_recent_n=3,
)
)
print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs.")
- This function reads the latest flow runs with a specified state (e.g., "Late"), deletes the specified number of these flow runs, and reschedules them with a new delay.
2. Manual Retries in UI:
While there's no bulk retry feature, you can manually retry individual flow runs in the Prefect UI by:
- Navigating to the flow run details page.
- Clicking the retry button for each flow run.
Unfortunately, these are the current available methods, and for bulk actions, automation through the Prefect client is the recommended approach.
### Sources:
- Prefect Docs - Using the Client
- Related GitHub Issues