jpuris
02/20/2023, 10:26 AMNikhil Joseph
02/21/2023, 8:54 AMimport asyncio
from prefect import flow
from prefect.client.orion import get_client, OrionClient
from prefect.client.schemas import FlowRun
from prefect.context import get_run_context
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowFilter
from prefect.orion.schemas.states import State, StateType
from tqdm import tqdm
async def update_flow_run_state(client: OrionClient, flow_run: FlowRun, state_type: StateType):
required_state = State(type=state_type, message="Manual State Update")
result = await client.set_flow_run_state(flow_run_id=flow_run.id, state=required_state, force=True)
return result
@flow
async def main():
context = get_run_context()
client = get_client()
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter.parse_obj({"state": {"name": {"any_": ["Pending"]}}}),
)
print(f"List all pending flows, total {len(flow_runs)}")
for flow_run in tqdm(flow_runs):
if flow_run.id == context.flow_run.id:
print("Found current flow context run id, ignoring...")
continue
print(f"{flow_run.id}: {flow_run.name}")
response = await update_flow_run_state(client, flow_run, state_type=StateType.SCHEDULED)
print(response)
# break
print(f"List all pending flows, total {len(flow_runs)}")
if __name__ == "__main__":
asyncio.run(main())
Above snippet moves everything from Pending to Scheduled, you can change it to meet your needsjpuris
02/21/2023, 8:57 AMNikhil Joseph
02/21/2023, 9:01 AMjpuris
02/22/2023, 6:36 AM