Marcos San Miguel
11/02/2021, 9:41 AMAnna Geller
Marcos San Miguel
11/02/2021, 10:51 AM[2021-11-02 10:42:32+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'prefect'
Calling my custom state handler on <Flow: name="prefect">:
<Pending> to <Running: "Running flow.">
[2021-11-02 10:42:32+0000] INFO - prefect.TaskRunner | Task 'hello_task': Starting task run...
Calling my custom state handler on <Task: hello_task>:
<Pending> to <Running: "Starting task run.">
[2021-11-02 10:42:32+0000] INFO - prefect.hello_task | Hello world!
Calling my custom state handler on <Task: hello_task>:
<Running: "Starting task run."> to <Success: "Task run succeeded.">
[2021-11-02 10:42:32+0000] INFO - prefect.TaskRunner | Task 'hello_task': Finished task run for task with final state: 'Success'
[2021-11-02 10:42:32+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Calling my custom state handler on <Flow: name="prefect">:
<Running: "Running flow."> to <Success: "All reference tasks succeeded.">
However, the flow seems to be stuck in the Submitted state.Anna Geller
--show-flow-logs
flag when you start your agent? This way, we should see the flow logs there, if everything went well.
I agree, it looks like Prefect Server backend API doesn’t get the flow progress from the execution layer.
1. Can you confirm that networking is fine - i.e. that the execution layer can reach your Apollo endpoint?
2. Can you check if Flow heartbeats are enabled? This is crucial so that the backend can check if your flow is still running on the nomad clusterMarcos San Miguel
11/02/2021, 11:22 AMAnna Geller
from prefect import Client
client = Client()
query = """
query {
hello
}
"""
response = client.graphql(query)
print(response)
#2. What you see in the logs are agent heartbeats, flow heartbeats are different. You can use the following mutation to enable heartbeats on a specific flow, and then you can test this flow with your Nomad agent. You could also enable Flow heartbeats from the UI.
from prefect import Client
client = Client()
mutation = """
mutation {
enable_flow_heartbeat(input: { flow_id: "your-flow-id-here" }) {
success
}
}
"""
response = client.graphql(mutation)
print(response)
#3. What I meant was to include this flag on the agent --show-flow-logs
- you enabled debug logs, which is helpful, but you can also allow sending flow logs which can confirm whether your agent receives the flow heartbeats and flow progress/logsMarcos San Miguel
11/03/2021, 12:29 PMAnna Geller
Anna Geller
Marcos San Miguel
11/03/2021, 12:51 PMZanie
FlowRunner
class to the one that sends states to the backend https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/local/agent.py#L228-L229Zanie
flow.run()
Marcos San Miguel
11/04/2021, 7:34 AMenv["PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS"] = "prefect.engine.cloud.CloudTaskRunner"
env["PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS"] = "prefect.engine.cloud.CloudFlowRunner"
to the NomadAgent class and it worked. I have one question though. I'm still new to prefect and the first thing I tried was adding prefect.engine.flow_runner.FlowRunner
and `prefect.engine.task_runner.TaskRunner`*,* as I'm running prefect with backend server, I don't really understand the difference between this two and the one in prefect.engine.cloud.Anna Geller
CloudFlowRunner
is normally instantiated when agent submits a flow run for execution to the backend. It does more than the basic FlowRunner
e.g. updates flow run heartbeats, or if start time is more than 10 minutes in the future, it fails the run so that Lazarus can reschedule itMarcos San Miguel
11/04/2021, 12:02 PMCloudFlowRunner
can be used with server backend, right? The Cloud terminology was confusing for me.Anna Geller
Anna Geller
Marcos San Miguel
11/04/2021, 12:30 PMFailed
or Completed
jobs, just like the manage_jobs
function defined inside KubernetesAgent
and also in deploying prefect with the nomad agent as a nomad job.