the options for cli `prefect agent` are: ```╭─ Opt...
# prefect-community
e
the options for cli
prefect agent
are:
Copy code
╭─ Options ──────────────────────────────────────────────────────────────────────
│ --work-queue    -q      TEXT  One or more work queue names for the agent to pull from. [default: None]                                                                                                                                           │
│ --hide-welcome                                                                                                                                                                                                                                   │
│ --api                   TEXT  [default: (from PREFECT_API_URL)]                                                                                                                                                                                  │
│ --tag           -t      TEXT  DEPRECATED: One or more optional tags that will be used to create a work queue [default: None]                                                                                                                     │
│ --help                        Show this message and exit.                                                                                                                                                                                        │
╰─────────────────────────────────────────────────────────────────────────────────
What I would like to do is launch an agent once, grab the flow_runs that are sitting in the queue (a single call), once those are completed, the agent would close itself. Is there any kind of option for that?
1
k
Is there a reason why you’d like to close the agent after running it?
e
Yes, I was thinking when a flowrun is created (initially status PENDING), I would send a webhook to a cloud function or cloud run container. This way I don’t need to have infra running 24-7. I can just spin up infra when there’s a flowrun waiting to run, and it will spin down automatically when those pending flowruns are complete.
@Khuyen Tran right now my pipelines are super lightweight running for 10s/day. As far as I can tell, the simplest way to run an agent is to spin up a 24/7 VPS and ping the server constantly. It seems like wasted overhead that could be handled more much leanly with webhooks. Thoughts? Has no one asked about something like this before?
I figured out a workaround using the rest api, maybe Ill write a post about it
k
sorry for answering late. I’ve been busy yesterday. That makes sense. Do you have any thoughts on this @Taylor Curran, @Peyton Runyan?
p
Hey Erik - I’m very curious about your workaround if you've got time for a TLDR. The agent is sufficiently lightweight that to my knowledge we haven't really had big concerns with people running it. A free-tier server is generally plenty.
I haven't done this myself, but I imagine you could accomplish what you want by using something like cloud run to spin up an agent. This would have to be external to prefect though.
e
@Peyton Runyan yes exactly right - The agent is def light weight enough for a free tier. But we use our free tier for other stuff, so it’s not 100% free. I also am just trying to get into the habit of doing everything serverless and modular. My idea for a workaround is this, I can use a slack webhook that fires on flow run pending to, instead of goto slack, to goto a public endpoing in either cloud run or cloud function (GCP). From there, I have this little script that, inside a fastapi request, uses the prefect REST api to monitor flow runs and spins up and spins down the agent accordingly. I’m not done testing, but I have the code working locally.
Copy code
from fastapi import FastAPI
from datetime import datetime as Dt
from time import sleep
import requests
import subprocess

API_BASE = API_BASE
API_KEY = API_KEY
AUTH_HEADER = {'Authorization':f'Bearer {API_KEY}'}

WQ_ID_DEFAULT = WQ_ID #default

app = FastAPI()

@app.get("/")
def read_root():
    get_flow_runs_url = API_BASE+ f'/work_queues/{WQ_ID_DEFAULT}/get_runs'
    get_flow_runs_res = <http://requests.post|requests.post>(get_flow_runs_url, headers=AUTH_HEADER, json={})
    flow_runs = [fr['id'] for fr in get_flow_runs_res.json()]
    n_runs = len(flow_runs)
    if n_runs:
        process = subprocess.Popen('prefect agent start -q default', 
            shell=True, 
            start_new_session=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            )
    
    finished_runs = []
    while n_runs != len(finished_runs):
        for fr in flow_runs:
            res = requests.get(API_BASE+f'/flow_run_states/?flow_run_id={fr}', headers=AUTH_HEADER)
            for state in res.json():
                if state['type'] in ['COMPLETED','FAILED','CANCELLED','CRASHED']:
                    finished_runs.append(fr)
            print(Dt.now(),'::',fr,'::',state['type'])
        sleep(1)
    
    if n_runs:
        process.terminate()
    print('All Runs Complete')
    return {"message": "All Runs Complete"}
And then the output looks like this:
Copy code
$ uvicorn simple_server:app
INFO:     Started server process [43615]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on <http://127.0.0.1:8000> (Press CTRL+C to quit)
2022-10-07 10:56:47.567919 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: SCHEDULED
2022-10-07 10:56:48.709624 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: SCHEDULED
2022-10-07 10:56:49.847617 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: SCHEDULED
2022-10-07 10:56:51.495856 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:52.668744 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:53.898882 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:55.057235 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: PENDING
2022-10-07 10:56:56.243720 :: 254a2d7c-5a02-415a-969f-1d86a37f6526 :: COMPLETED
All Runs Complete
INFO:     127.0.0.1:54437 - "GET / HTTP/1.1" 200 OK
p
Much appreciated!