https://prefect.io logo
a

Andrew Rosen

07/02/2023, 5:18 AM
In the UI, I'd like to query for all runs with "State = Running" and a runtime of greater than N hours, which I'd then like to either delete from my list of flows or switch to "Failed". I'm not sure how to accomplish this. There are three parts: 1. How do I delete multiple flows from the UI without clicking each one manually? 2. Is there a way to convert a "Running" state to "Failed" state in the UI or command line? 3. I know how to query by state in the UI, but I can't see how I might do a concurrent query by runtime. While it might seem a bit odd to want to change the state of a Flow Run, this is an inherent issue that comes up when using Prefect with a job scheduler on an HPC machine. If the job is cancelled or hits a walltime, it has no way to report back to the Prefect server that it is no longer Running and gets stuck in that state forever.
d

David Steiner Sand

07/03/2023, 5:57 AM
My work-around for that problem is defined in a flow:
Copy code
import asyncio
from datetime import timedelta

from prefect import flow, get_run_logger
from prefect.server.schemas.states import StateType
from prefect.server.utilities.schemas import DateTimeTZ
from prefect.states import Crashed

from flows.change_flow_runs_state.utils import change_state


@flow
def crash_old_flow_runs(
    limit: int = 200,  # 200 is the max
    crash_older_than_hours: float = 24,
):
    """Crash all running flow runs that started crash_older_than_hours hours ago."""

    past_timestamp = DateTimeTZ.utcnow() - timedelta(hours=crash_older_than_hours)

    asyncio.run(
        change_state(
            logger=get_run_logger(),
            from_state=StateType.RUNNING,
            to_state=Crashed(),
            limit=limit,
            started_before=past_timestamp,
        )
    )
Copy code
import asyncio
from typing import List, Optional

from prefect import get_client
from prefect.engine import propose_state
from prefect.exceptions import Abort
from prefect.server.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterDeploymentId,
    FlowRunFilterExpectedStartTime,
    FlowRunFilterStartTime,
    FlowRunFilterState,
    FlowRunFilterStateType,
    FlowRunFilterTags,
)
from prefect.server.schemas.states import StateType
from prefect.server.utilities.schemas import DateTimeTZ
from prefect.states import State


async def change_state(
    logger,
    from_state: StateType,
    to_state: State,
    tags: Optional[List[str]] = None,
    limit: Optional[int] = None,
    started_before: Optional[DateTimeTZ] = None,
    expected_started_after: Optional[DateTimeTZ] = None,
):
    """Changes the state of all flow runs which match the tags from state from_state to state to_state.

    The operation can be limited by limit and expected_started_after.
    """

    if limit is None:
        limit = 10

    tags_message = ""
    started_before_message = ""
    expected_started_after_message = ""
    if tags is not None:
        tags_message = f" tagged {', '.join(tags)}"
    if started_before is not None:
        started_before_message = f" that started before {started_before}"
    if expected_started_after is not None:
        expected_started_after_message = (
            f" that were expected to start after {expected_started_after}"
        )

    <http://logger.info|logger.info>(
        f"Looking for {limit} flow runs{tags_message} from {from_state} "
        f"to {to_state}{started_before_message}{expected_started_after_message}..."
    )

    client = get_client()
    flow_runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            state=FlowRunFilterState(
                name=None,
                type=FlowRunFilterStateType(any_=[from_state]),
            ),
            deployment_id=FlowRunFilterDeploymentId(is_null_=False),
            tags=FlowRunFilterTags(all_=tags) if tags is not None else None,
            expected_start_time=(
                FlowRunFilterExpectedStartTime(after_=expected_started_after)
                if expected_started_after is not None
                else None
            ),
            start_time=(
                FlowRunFilterStartTime(before_=started_before)
                if started_before is not None
                else None
            ),
        ),
        limit=limit,
    )

    <http://logger.info|logger.info>(f"Found {len(flow_runs)} flow runs.")

    flow_runs_names = ", ".join([f.name for f in flow_runs])
    flow_runs_deployments = ", ".join({str(f.deployment_id) for f in flow_runs})

    <http://logger.info|logger.info>(f"Flow runs affected: {flow_runs_names}")
    <http://logger.info|logger.info>(f"Deployments affected: {flow_runs_deployments}")

    <http://logger.info|logger.info>("Changing state...")

    try:
        await asyncio.gather(
            *[
                propose_state(client, to_state, flow_run_id=flow_run.id)
                for flow_run in flow_runs
            ],
            return_exceptions=True,
        )
    except Abort as exception:
        logger.exception(exception)
a

Andrew Rosen

07/03/2023, 5:59 AM
oh very good idea!! thank you so much for sharing your code to do this. I'll try implementing it too 🙂
❤️ 1
d

David Steiner Sand

07/03/2023, 2:41 PM
Happy to help 🙂
I then set the flow run to run at every hour
7 Views