Thread
#prefect-community
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    Hello, I'm trying to use the prefect nomad agent instead of the k8s agent. To do that I added the NomadAgent class inside Prefect/src/prefect/agent/nomad and changed a few thing from the code to make it work. I copied the class from the commit I found here: https://github.com/PrefectHQ/prefect/pull/1341/commits/adcb3edad21d36a95d0d9c19041b5774e995b1ae. I made it work and I can run prefect flows now as nomad jobs. However, although I can see in the flow logs that the tasks are running and the flow state is changed to success, the UI shows that the state is stucked in submitted (also appears in submitted with CLI) and the flow is continuously being rescheduled by a Lazarus process (I already tried disabling Lazarus, but the flow state doesn't change to success) . In the attached documents you can see the flow logs and the flow in the UI. Thank you so much for your help!
    Anna Geller

    Anna Geller

    10 months ago
    @Marcos San Miguel a couple of questions:1. Do you run on Prefect Cloud or Server? 2. Do you run Prefect with your custom NomadAgent as editable package so that your changes are reflected in both, the agent and the environment from which you register your flow? 3. Did you assign any labels to both your flow and your Nomad agent? Overall, I think you might need to make more changes to make this custom agent work with other components e.g. Lazarus, but I would need to consult the team about that. When you are stuck in a Submitted state, it means that your Nomad agent actually did pick up the flow and submitted it for execution, but something happened in the execution layer e.g.: your Kubernetes pod died or was accidentally deleted. Typically, Lazarus would try to kill such “zombie” process and resurrect it i.e. restart the process, but you may need some custom logic to do that with your Nomad agent.
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    Hello @Anna Geller , First of all, than you for your quick response. 1. I run on Prefect Server. 2. I run Prefect with my custom NomadAgent as editable package. 3. There are no labels assigned to the code or the agent. The rare behaviour I don't understand is that the logs, for example, in the nomad job are the following:
    [2021-11-02 10:42:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'prefect'
    
    Calling my custom state handler on <Flow: name="prefect">:
    <Pending> to <Running: "Running flow.">
    
    [2021-11-02 10:42:32+0000] INFO - prefect.TaskRunner | Task 'hello_task': Starting task run...
    
    Calling my custom state handler on <Task: hello_task>:
    <Pending> to <Running: "Starting task run.">
    
    [2021-11-02 10:42:32+0000] INFO - prefect.hello_task | Hello world!
    
    Calling my custom state handler on <Task: hello_task>:
    <Running: "Starting task run."> to <Success: "Task run succeeded.">
    
    [2021-11-02 10:42:32+0000] INFO - prefect.TaskRunner | Task 'hello_task': Finished task run for task with final state: 'Success'
    [2021-11-02 10:42:32+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    
    Calling my custom state handler on <Flow: name="prefect">:
    <Running: "Running flow."> to <Success: "All reference tasks succeeded.">
    However, the flow seems to be stuck in the Submitted state.
    Anna Geller

    Anna Geller

    10 months ago
    Can you attach the
    --show-flow-logs
    flag when you start your agent? This way, we should see the flow logs there, if everything went well. I agree, it looks like Prefect Server backend API doesn’t get the flow progress from the execution layer.1. Can you confirm that networking is fine - i.e. that the execution layer can reach your Apollo endpoint? 2. Can you check if Flow heartbeats are enabled? This is crucial so that the backend can check if your flow is still running on the nomad cluster
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    This is what I see when I start the agent with: prefect agent nomad start --log-level DEBUG: [2021-11-02 11:08:06,063] DEBUG - agent | Environment variables: [] [2021-11-02 11:08:06,063] DEBUG - agent | Max polls: None [2021-11-02 11:08:06,063] DEBUG - agent | Agent address: [2021-11-02 11:08:06,063] DEBUG - agent | Log to Cloud: True [2021-11-02 11:08:06,063] DEBUG - agent | Prefect backend: server [2021-11-02 11:08:06,063] INFO - agent | Registering agent... [2021-11-02 11:08:06,089] INFO - agent | Registration successful! [2021-11-02 11:08:06,089] DEBUG - agent | Assigned agent id: adf8b9dd-fa74-446e-942c-6b71e0633c22 [2021-11-02 11:08:06,089] DEBUG - agent | Sending test query to API at 'http://192.168.32.27:4200'... [2021-11-02 11:08:06,097] DEBUG - agent | Test query successful! __ __ _ _ _ | _ \ _ __ _ / _| _ __| | / \ __ _ _ _ _ | | | |) | '__/ _ \ | / _ / __| __| / _ \ / ` |/ _ \ ' | __| | __/| | | __/ _| __/ (_| | / _ \ (_| | _/ | | | | || || _|_| _|_|_| // ___, |_|| ||_| |__/ [2021-11-02 11:08:06,097] INFO - agent | Starting NomadAgent with labels [] [2021-11-02 11:08:06,097] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/ [2021-11-02 11:08:06,097] INFO - agent | Waiting for flow runs... [2021-11-02 11:08:06,097] INFO - agent | Host connected to http://192.168.32.27:4200 [2021-11-02 11:08:06,097] INFO - agent | Server ui at http://192.168.32.27:8080 [2021-11-02 11:08:06,097] DEBUG - agent | Sending agent heartbeat... [2021-11-02 11:08:06,100] DEBUG - agent | Heartbeat succesful! Sleeping for 60.0 seconds... [2021-11-02 11:08:06,101] DEBUG - agent | Running thread pool with 12 workers to handle flow deployment [2021-11-02 11:08:06,101] DEBUG - agent | Querying for ready flow runs... I'm not sure how to check points 1 and 2, I guess from the logs that flow heartbeats are enabled and that the Apollo endpoint is working fine, but I don't know if there is a proper way to check it
    Anna Geller

    Anna Geller

    10 months ago
    @Marcos San Miguel #1. Perhaps you can submit a script to the Nomad cluster that sends a simple hello-world query to the GraphQL API in order to check whether the API is reachable from the cluster?
    from prefect import Client
    
    client = Client()
    
    query = """
    query {
      hello
    }
    """
    response = client.graphql(query)
    print(response)
    #2. What you see in the logs are agent heartbeats, flow heartbeats are different. You can use the following mutation to enable heartbeats on a specific flow, and then you can test this flow with your Nomad agent. You could also enable Flow heartbeats from the UI.
    from prefect import Client
    
    client = Client()
    
    mutation = """
    mutation {
      enable_flow_heartbeat(input: { flow_id: "your-flow-id-here" }) {
        success
      }
    }
    """
    response = client.graphql(mutation)
    print(response)
    #3. What I meant was to include this flag on the agent
    --show-flow-logs
    - you enabled debug logs, which is helpful, but you can also allow sending flow logs which can confirm whether your agent receives the flow heartbeats and flow progress/logs
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    Hello, I tried #1 and it did run from the cluster. #2 Seemed to be already enable. #3: It seems that --show-flow-logs flag is only available in local or docker agent (https://docs.prefect.io/api/latest/cli/agent.html). In the nomad agent I'm using there's no such an option
    Anna Geller

    Anna Geller

    10 months ago
    @Marcos San Miguel thanks for your answers! #3 - didn’t know that any new information since yesterday? I will ask the team about your issue, there may be some additional steps to make the backend aware of this new type of agent.
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    @Anna Geller I'm still trying to find the issue. Maybe it has something to do with engine/state.py and the NomadAgent, as the flow seems to be running properly inside the Nomad's job, but the state of the flow is not working properly. I'll let you know if I find something 😃
    Michael Adkins

    Michael Adkins

    10 months ago
    Hey Marcos, it seems likely that you're missing a bunch of required environment variables. Quite notably, it looks like you're missing the toggles which switch the
    FlowRunner
    class to the one that sends states to the backend https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/local/agent.py#L228-L229
    So the flow is running "locally" in your container as though you'd called
    flow.run()
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    Hi Michael, it worked! I added
    env["PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS"] = "prefect.engine.cloud.CloudTaskRunner"
    env["PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS"] = "prefect.engine.cloud.CloudFlowRunner"
    to the NomadAgent class and it worked. I have one question though. I'm still new to prefect and the first thing I tried was adding
    prefect.engine.flow_runner.FlowRunner
    and prefect.engine.task_runner.TaskRunner, as I'm running prefect with backend server, I don't really understand the difference between this two and the one in prefect.engine.cloud.
    Anna Geller

    Anna Geller

    10 months ago
    My basic understanding of this:
    CloudFlowRunner
    is normally instantiated when agent submits a flow run for execution to the backend. It does more than the basic
    FlowRunner
    e.g. updates flow run heartbeats, or if start time is more than 10 minutes in the future, it fails the run so that Lazarus can reschedule it
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    Ok, thank you so much for your help. So
    CloudFlowRunner
    can be used with server backend, right? The Cloud terminology was confusing for me.
    Anna Geller

    Anna Geller

    10 months ago
    yes I think so. And your confusion is understandable 🙂
    the Cloud portion here means “backend” or API
    Marcos San Miguel

    Marcos San Miguel

    10 months ago
    Ok, thank you so much Anna and Michael. I think it works now. As soon as I have something readable I'll share it here (https://github.com/PrefectHQ/prefect/discussions/3575), so maybe in the future the nomad Agent could be also implemented in Prefect. I'm not an expert in coding so probably my code might be improved. I will work now on deleting
    Failed
    or
    Completed
    jobs, just like the
    manage_jobs
    function defined inside
    KubernetesAgent
    and also in deploying prefect with the nomad agent as a nomad job.