Andrew Rosen
07/02/2023, 5:18 AMDavid Steiner Sand
07/03/2023, 5:57 AMimport asyncio
from datetime import timedelta
from prefect import flow, get_run_logger
from prefect.server.schemas.states import StateType
from prefect.server.utilities.schemas import DateTimeTZ
from prefect.states import Crashed
from flows.change_flow_runs_state.utils import change_state
@flow
def crash_old_flow_runs(
limit: int = 200, # 200 is the max
crash_older_than_hours: float = 24,
):
"""Crash all running flow runs that started crash_older_than_hours hours ago."""
past_timestamp = DateTimeTZ.utcnow() - timedelta(hours=crash_older_than_hours)
asyncio.run(
change_state(
logger=get_run_logger(),
from_state=StateType.RUNNING,
to_state=Crashed(),
limit=limit,
started_before=past_timestamp,
)
)
import asyncio
from typing import List, Optional
from prefect import get_client
from prefect.engine import propose_state
from prefect.exceptions import Abort
from prefect.server.schemas.filters import (
FlowRunFilter,
FlowRunFilterDeploymentId,
FlowRunFilterExpectedStartTime,
FlowRunFilterStartTime,
FlowRunFilterState,
FlowRunFilterStateType,
FlowRunFilterTags,
)
from prefect.server.schemas.states import StateType
from prefect.server.utilities.schemas import DateTimeTZ
from prefect.states import State
async def change_state(
logger,
from_state: StateType,
to_state: State,
tags: Optional[List[str]] = None,
limit: Optional[int] = None,
started_before: Optional[DateTimeTZ] = None,
expected_started_after: Optional[DateTimeTZ] = None,
):
"""Changes the state of all flow runs which match the tags from state from_state to state to_state.
The operation can be limited by limit and expected_started_after.
"""
if limit is None:
limit = 10
tags_message = ""
started_before_message = ""
expected_started_after_message = ""
if tags is not None:
tags_message = f" tagged {', '.join(tags)}"
if started_before is not None:
started_before_message = f" that started before {started_before}"
if expected_started_after is not None:
expected_started_after_message = (
f" that were expected to start after {expected_started_after}"
)
<http://logger.info|logger.info>(
f"Looking for {limit} flow runs{tags_message} from {from_state} "
f"to {to_state}{started_before_message}{expected_started_after_message}..."
)
client = get_client()
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=None,
type=FlowRunFilterStateType(any_=[from_state]),
),
deployment_id=FlowRunFilterDeploymentId(is_null_=False),
tags=FlowRunFilterTags(all_=tags) if tags is not None else None,
expected_start_time=(
FlowRunFilterExpectedStartTime(after_=expected_started_after)
if expected_started_after is not None
else None
),
start_time=(
FlowRunFilterStartTime(before_=started_before)
if started_before is not None
else None
),
),
limit=limit,
)
<http://logger.info|logger.info>(f"Found {len(flow_runs)} flow runs.")
flow_runs_names = ", ".join([f.name for f in flow_runs])
flow_runs_deployments = ", ".join({str(f.deployment_id) for f in flow_runs})
<http://logger.info|logger.info>(f"Flow runs affected: {flow_runs_names}")
<http://logger.info|logger.info>(f"Deployments affected: {flow_runs_deployments}")
<http://logger.info|logger.info>("Changing state...")
try:
await asyncio.gather(
*[
propose_state(client, to_state, flow_run_id=flow_run.id)
for flow_run in flow_runs
],
return_exceptions=True,
)
except Abort as exception:
logger.exception(exception)
Andrew Rosen
07/03/2023, 5:59 AMDavid Steiner Sand
07/03/2023, 2:41 PM