Hi, I'm running `n_workers` agents as kubernetes j...
# prefect-community
o
Hi, I'm running
n_workers
agents as kubernetes jobs similarly to this:
Copy code
# Agent flow
with Flow("run-agent") as flow_agent:
    ShellTask(agent_command) # launch agent computing tasks

@task
def run_agent(worker_id): 
    run_agent_flow_id = create_flow_run.run("run-agent",
                                            name=f"run-agent_#{worker_id}")
    return run_agent_flow_id

# Main flow
with Flow("main") as flow:
    n_workers = Parameters("n_workers", default=2)
    worker_ids = get_worker_ids(n_workers)  # [0, 1]
    run_agent.map(worker_id=worker_ids)
When the agents finish their tasks, for some reason the kubernetes jobs are not terminating. Inside the job's container apparently the agent's process terminates, but I see
prefect execute flow-run
and
prefect hearbeat flow-run -i ...
being stuck.
k
Hi @Olivér Atanaszov, I’m a bit confused if you are launching several agents here? What does the agent flow do?
upvote 1
a
First issue is this:
Copy code
with Flow("run-agent") as flow_agent:
    ShellTask(agent_command) # launch agent computing tasks
should be imo:
Copy code
with Flow("run-agent") as flow_agent:
    ShellTask()(agent_command) # launch agent computing tasks
because first brackets initialize the tasks while the latter call the task
the second issue is this:
Copy code
create_flow_run.run("run-agent",
                                            name=f"run-agent_#{worker_id}")
You should call create_flow_run from the Flow, not from a task. Even if you trigger the .run() method, I don't expect it to run reliably when calling this from a task
o
@Anna Geller this was meant to be just pseudocode, sorry for not emphasizing that
You should call create_flow_run from the Flow, not from a task. Even if you trigger the .run() method, I don't expect it to run reliably when calling this from a task
Actually it seems to be working fine. Well, except for the flow run hanging.. 🙃
@Kevin Kho yes, but not prefect agents, some other kind of agents responsible to compute some stuff I wrap launching the agents in a flow so I can use
create_flow_run
and make sure that each of these agents run in a different job and schedules its own GPU.
does that make sense?
a
Does it mean: your flow run completed successfully and the state in the UI is successful but the Kubernetes flow run pod keeps hanging around? or is your flow run stuck in a Running state even though all task runs finished successfully?
k
That makes sense so the issue is that
create_flow_run
is hanging. I assume this happens even outside the flow if you call it manually?
o
@Anna Geller the process
agent_command
I launch via
ShellTask
inside the`run-agent` flow terminates in the flow run(s), but I can see
prefect execute flow-run
running and apparently the pod keeps hanging around the ShellTask gets stuck in a Running state
a
This seems to be related to this heartbeat issue https://discourse.prefect.io/t/flow-is-failing-with-an-error-message-no-heartbeat-detected-from-the-remote-task/79 You can try setting this environment variable:
Copy code
from prefect.run_configs import KubernetesRun
flow.run_config = KubernetesRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
Alternatively, as suggested in the same ☝️ Discourse topic, you can offload this shell task into its own subflow and turn off heartbeat for it. You can then optionally also create an automation to cancel this child flow run doing this
ShellTask
if it runs for longer than you expect it to run to ensure your parent flow run proceeds properly and the pod is not left hanging around
o
amazing, thanks Anna!
fyi doing
Copy code
from prefect.run_configs import KubernetesRun
flow.run_config = KubernetesRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
did not solve the issue
a
Yeah, to be fully transparent, flow heartbeats on remote Kubernetes jobs are challenging in a hybrid execution model for several reasons: • Kubernetes job for the flow run is a fully independent containerized application that communicates with Prefect Cloud API purely via outbound API calls, i.e. your Kubernetes job must send a request to our API to perform any state change etc. • Heartbeats allow us to track if the flow/task runs are still in progress but again, we don't have inbound access to your pod to check the state and if something goes wrong in the pod (say someone deletes the pod or the pod crashes or faces some network issues), heartbeats are the only way for Prefect to detect such infrastructure issue without having access to your infrastructure. And right now your flow doubles the challenge since not only your flow run is executed in a completely separate environment (the container in your Kubernetes job pod), but also within that container you are spinning up a subprocess for the
ShellTask
and if something happens in this ShellTask's subprocess, there is even one more layer on top to find out about the issue. We could e.g. mark task runs or flow runs immediately as failed if we e.g. don't get flow heartbeats for a couple of seconds and cancel the flow run, but this would risk that we cancel your computation even though it works properly but only had some transient issues with the heartbeat itself or with sending a state update to the API. It could be that this description is technically not 100% accurate but I only wanted to make the challenge a bit clearer and show it's not a straightforward thing to do. What you could do to help with the process if you would run your subprocess with a local agent rather than within a Kubernetes job. You could then have a subflow for that and this subflow with the
ShellTask
can run on some local machine while your dependency-heavy containerized child flows run on Kubernetes and you could orchestrate all that using a parent flow (aka flow-of-flows).