https://prefect.io logo
Title
j

jpuris

02/20/2023, 10:26 AM
Heya! I have 48 scheduled flow runs that are late due to the http2 bug in prefect, that I'd like to manually cancel, how would I go about doing that in Prefect UI? To my limited knowledge, I can select each of the 48 flow runs clicking them individually and then delete.. however I'm looking to select all at once and cancel them instead 😕 edit: I did not find any way how to solve this via UI, hence used the prefect client lib to do it via API. Example in thread.
1
I've found this discord post https://discourse.prefect.io/t/how-can-i-cancel-several-submitted-flow-runs-at-once/751, tho it is for prefect 1.. will try it out 🤔
If anyone's interested, here is the python script to cancel any flow runs that are "late".
n

Nikhil Joseph

02/21/2023, 8:54 AM
hopefully they don't change this feature(I depend on this) but this is how I force states to any state I want(note force=True)
import asyncio

from prefect import flow
from prefect.client.orion import get_client, OrionClient
from prefect.client.schemas import FlowRun
from prefect.context import get_run_context
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowFilter
from prefect.orion.schemas.states import State, StateType
from tqdm import tqdm


async def update_flow_run_state(client: OrionClient, flow_run: FlowRun, state_type: StateType):
    required_state = State(type=state_type, message="Manual State Update")
    result = await client.set_flow_run_state(flow_run_id=flow_run.id, state=required_state, force=True)
    return result


@flow
async def main():
    context = get_run_context()
    client = get_client()
    flow_runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter.parse_obj({"state": {"name": {"any_": ["Pending"]}}}),
    )

    print(f"List all pending flows, total {len(flow_runs)}")
    for flow_run in tqdm(flow_runs):
        if flow_run.id == context.flow_run.id:
            print("Found current flow context run id, ignoring...")
            continue

        print(f"{flow_run.id}: {flow_run.name}")
        response = await update_flow_run_state(client, flow_run, state_type=StateType.SCHEDULED)
        print(response)
        # break

    print(f"List all pending flows, total {len(flow_runs)}")


if __name__ == "__main__":
    asyncio.run(main())
Above snippet moves everything from Pending to Scheduled, you can change it to meet your needs
j

jpuris

02/21/2023, 8:57 AM
Thanks! This is basically talking to the Orion API via their client in python. I doubt it would be removed 😕 Anyhow, I've solved the issue I had with something very similar you've showed, but regardless, I was sort of hoping this could be done via UI for ad-hoc problem fixing.
n

Nikhil Joseph

02/21/2023, 9:01 AM
Can't say much for the UI but I just made a deployment with the required states as arguments 😅 that's the closest I could get until someone implements it in the UI
j

jpuris

02/22/2023, 6:36 AM
Interesting, @Nikhil Joseph! Do you happen to have an example 🙂👍