Erik
10/05/2022, 10:36 AMprefect agent
are:
╭─ Options ──────────────────────────────────────────────────────────────────────
│ --work-queue -q TEXT One or more work queue names for the agent to pull from. [default: None] │
│ --hide-welcome │
│ --api TEXT [default: (from PREFECT_API_URL)] │
│ --tag -t TEXT DEPRECATED: One or more optional tags that will be used to create a work queue [default: None] │
│ --help Show this message and exit. │
╰─────────────────────────────────────────────────────────────────────────────────
What I would like to do is launch an agent once, grab the flow_runs that are sitting in the queue (a single call), once those are completed, the agent would close itself. Is there any kind of option for that?Khuyen Tran
10/05/2022, 1:32 PMErik
10/06/2022, 1:27 AMKhuyen Tran
10/07/2022, 2:17 PMPeyton Runyan
10/07/2022, 3:02 PMErik
10/07/2022, 4:04 PMfrom fastapi import FastAPI
from datetime import datetime as Dt
from time import sleep
import requests
import subprocess
API_BASE = API_BASE
API_KEY = API_KEY
AUTH_HEADER = {'Authorization':f'Bearer {API_KEY}'}
WQ_ID_DEFAULT = WQ_ID #default
app = FastAPI()
@app.get("/")
def read_root():
get_flow_runs_url = API_BASE+ f'/work_queues/{WQ_ID_DEFAULT}/get_runs'
get_flow_runs_res = <http://requests.post|requests.post>(get_flow_runs_url, headers=AUTH_HEADER, json={})
flow_runs = [fr['id'] for fr in get_flow_runs_res.json()]
n_runs = len(flow_runs)
if n_runs:
process = subprocess.Popen('prefect agent start -q default',
shell=True,
start_new_session=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
finished_runs = []
while n_runs != len(finished_runs):
for fr in flow_runs:
res = requests.get(API_BASE+f'/flow_run_states/?flow_run_id={fr}', headers=AUTH_HEADER)
for state in res.json():
if state['type'] in ['COMPLETED','FAILED','CANCELLED','CRASHED']:
finished_runs.append(fr)
print(Dt.now(),'::',fr,'::',state['type'])
sleep(1)
if n_runs:
process.terminate()
print('All Runs Complete')
return {"message": "All Runs Complete"}
And then the output looks like this:
$ uvicorn simple_server:app
INFO: Started server process [43615]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on <http://127.0.0.1:8000> (Press CTRL+C to quit)
2022-10-07 10:56:47.567919 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: SCHEDULED
2022-10-07 10:56:48.709624 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: SCHEDULED
2022-10-07 10:56:49.847617 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: SCHEDULED
2022-10-07 10:56:51.495856 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:52.668744 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:53.898882 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:55.057235 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:56.243720 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: COMPLETED
All Runs Complete
INFO: 127.0.0.1:54437 - "GET / HTTP/1.1" 200 OK
Peyton Runyan
10/07/2022, 4:28 PM