Hello, I'm trying to use the prefect nomad agent ...
# ask-community
m
Hello, I'm trying to use the prefect nomad agent instead of the k8s agent. To do that I added the NomadAgent class inside Prefect/src/prefect/agent/nomad and changed a few thing from the code to make it work. I copied the class from the commit I found here: https://github.com/PrefectHQ/prefect/pull/1341/commits/adcb3edad21d36a95d0d9c19041b5774e995b1ae. I made it work and I can run prefect flows now as nomad jobs. However, although I can see in the flow logs that the tasks are running and the flow state is changed to success, the UI shows that the state is stucked in submitted (also appears in submitted with CLI) and the flow is continuously being rescheduled by a Lazarus process (I already tried disabling Lazarus, but the flow state doesn't change to success) . In the attached documents you can see the flow logs and the flow in the UI. Thank you so much for your help!
a
@Marcos San Miguel a couple of questions: 1. Do you run on Prefect Cloud or Server? 2. Do you run Prefect with your custom NomadAgent as editable package so that your changes are reflected in both, the agent and the environment from which you register your flow? 3. Did you assign any labels to both your flow and your Nomad agent? Overall, I think you might need to make more changes to make this custom agent work with other components e.g. Lazarus, but I would need to consult the team about that. When you are stuck in a Submitted state, it means that your Nomad agent actually did pick up the flow and submitted it for execution, but something happened in the execution layer e.g.: your Kubernetes pod died or was accidentally deleted. Typically, Lazarus would try to kill such “zombie” process and resurrect it i.e. restart the process, but you may need some custom logic to do that with your Nomad agent.
m
Hello @Anna Geller , First of all, than you for your quick response. 1. I run on Prefect Server. 2. I run Prefect with my custom NomadAgent as editable package. 3. There are no labels assigned to the code or the agent. The rare behaviour I don't understand is that the logs, for example, in the nomad job are the following:
Copy code
[2021-11-02 10:42:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'prefect'

Calling my custom state handler on <Flow: name="prefect">:
<Pending> to <Running: "Running flow.">

[2021-11-02 10:42:32+0000] INFO - prefect.TaskRunner | Task 'hello_task': Starting task run...

Calling my custom state handler on <Task: hello_task>:
<Pending> to <Running: "Starting task run.">

[2021-11-02 10:42:32+0000] INFO - prefect.hello_task | Hello world!

Calling my custom state handler on <Task: hello_task>:
<Running: "Starting task run."> to <Success: "Task run succeeded.">

[2021-11-02 10:42:32+0000] INFO - prefect.TaskRunner | Task 'hello_task': Finished task run for task with final state: 'Success'
[2021-11-02 10:42:32+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

Calling my custom state handler on <Flow: name="prefect">:
<Running: "Running flow."> to <Success: "All reference tasks succeeded.">
However, the flow seems to be stuck in the Submitted state.
a
Can you attach the
--show-flow-logs
flag when you start your agent? This way, we should see the flow logs there, if everything went well. I agree, it looks like Prefect Server backend API doesn’t get the flow progress from the execution layer. 1. Can you confirm that networking is fine - i.e. that the execution layer can reach your Apollo endpoint? 2. Can you check if Flow heartbeats are enabled? This is crucial so that the backend can check if your flow is still running on the nomad cluster
m
This is what I see when I start the agent with: prefect agent nomad start --log-level DEBUG: [2021-11-02 110806,063] DEBUG - agent | Environment variables: [] [2021-11-02 110806,063] DEBUG - agent | Max polls: None [2021-11-02 110806,063] DEBUG - agent | Agent address: [2021-11-02 110806,063] DEBUG - agent | Log to Cloud: True [2021-11-02 110806,063] DEBUG - agent | Prefect backend: server [2021-11-02 110806,063] INFO - agent | Registering agent... [2021-11-02 110806,089] INFO - agent | Registration successful! [2021-11-02 110806,089] DEBUG - agent | Assigned agent id: adf8b9dd-fa74-446e-942c-6b71e0633c22 [2021-11-02 110806,089] DEBUG - agent | Sending test query to API at 'http://192.168.32.27:4200'... [2021-11-02 110806,097] DEBUG - agent | Test query successful! __ __ _ _ _ | _ \ _ __ _ / _| _ ___| |_ / \ __ _ _ _ _ | | | |_) | '__/ _ \ |_ / _ \/ __| __| / _ \ / _` |/ _ \ '_ \| __| | __/| | | __/ _| __/ (_| | / _ \ (_| | _/ | | | | |_| |_| \___|_| \___|\___|\__| /_/ \_\__, |\___|_| |_|\__| |___/ [2021-11-02 110806,097] INFO - agent | Starting NomadAgent with labels [] [2021-11-02 110806,097] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/ [2021-11-02 110806,097] INFO - agent | Waiting for flow runs... [2021-11-02 110806,097] INFO - agent | Host connected to http://192.168.32.27:4200 [2021-11-02 110806,097] INFO - agent | Server ui at http://192.168.32.27:8080 [2021-11-02 110806,097] DEBUG - agent | Sending agent heartbeat... [2021-11-02 110806,100] DEBUG - agent | Heartbeat succesful! Sleeping for 60.0 seconds... [2021-11-02 110806,101] DEBUG - agent | Running thread pool with 12 workers to handle flow deployment [2021-11-02 110806,101] DEBUG - agent | Querying for ready flow runs... I'm not sure how to check points 1 and 2, I guess from the logs that flow heartbeats are enabled and that the Apollo endpoint is working fine, but I don't know if there is a proper way to check it
a
@Marcos San Miguel #1. Perhaps you can submit a script to the Nomad cluster that sends a simple hello-world query to the GraphQL API in order to check whether the API is reachable from the cluster?
Copy code
from prefect import Client

client = Client()

query = """
query {
  hello
}
"""
response = client.graphql(query)
print(response)
#2. What you see in the logs are agent heartbeats, flow heartbeats are different. You can use the following mutation to enable heartbeats on a specific flow, and then you can test this flow with your Nomad agent. You could also enable Flow heartbeats from the UI.
Copy code
from prefect import Client

client = Client()

mutation = """
mutation {
  enable_flow_heartbeat(input: { flow_id: "your-flow-id-here" }) {
    success
  }
}
"""
response = client.graphql(mutation)
print(response)
#3. What I meant was to include this flag on the agent
--show-flow-logs
- you enabled debug logs, which is helpful, but you can also allow sending flow logs which can confirm whether your agent receives the flow heartbeats and flow progress/logs
m
Hello, I tried #1 and it did run from the cluster. #2 Seemed to be already enable. #3: It seems that --show-flow-logs flag is only available in local or docker agent (https://docs.prefect.io/api/latest/cli/agent.html). In the nomad agent I'm using there's no such an option
a
@Marcos San Miguel thanks for your answers! #3 - didn’t know that any new information since yesterday? I will ask the team about your issue, there may be some additional steps to make the backend aware of this new type of agent.
m
@Anna Geller I'm still trying to find the issue. Maybe it has something to do with engine/state.py and the NomadAgent, as the flow seems to be running properly inside the Nomad's job, but the state of the flow is not working properly. I'll let you know if I find something :)
🙌 1
z
Hey Marcos, it seems likely that you're missing a bunch of required environment variables. Quite notably, it looks like you're missing the toggles which switch the
FlowRunner
class to the one that sends states to the backend https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/local/agent.py#L228-L229
So the flow is running "locally" in your container as though you'd called
flow.run()
m
Hi Michael, it worked! I added
env["PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS"] = "prefect.engine.cloud.CloudTaskRunner"
env["PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS"] = "prefect.engine.cloud.CloudFlowRunner"
to the NomadAgent class and it worked. I have one question though. I'm still new to prefect and the first thing I tried was adding
prefect.engine.flow_runner.FlowRunner
and `prefect.engine.task_runner.TaskRunner`*,* as I'm running prefect with backend server, I don't really understand the difference between this two and the one in prefect.engine.cloud.
a
My basic understanding of this:
CloudFlowRunner
is normally instantiated when agent submits a flow run for execution to the backend. It does more than the basic
FlowRunner
e.g. updates flow run heartbeats, or if start time is more than 10 minutes in the future, it fails the run so that Lazarus can reschedule it
m
Ok, thank you so much for your help. So
CloudFlowRunner
can be used with server backend, right? The Cloud terminology was confusing for me.
a
yes I think so. And your confusion is understandable 🙂
the Cloud portion here means “backend” or API
m
Ok, thank you so much Anna and Michael. I think it works now. As soon as I have something readable I'll share it here (https://github.com/PrefectHQ/prefect/discussions/3575), so maybe in the future the nomad Agent could be also implemented in Prefect. I'm not an expert in coding so probably my code might be improved. I will work now on deleting
Failed
or
Completed
jobs, just like the
manage_jobs
function defined inside
KubernetesAgent
and also in deploying prefect with the nomad agent as a nomad job.
🙌 1