Olivér Atanaszov
03/10/2022, 4:45 PMn_workers
agents as kubernetes jobs similarly to this:
# Agent flow
with Flow("run-agent") as flow_agent:
ShellTask(agent_command) # launch agent computing tasks
@task
def run_agent(worker_id):
run_agent_flow_id = create_flow_run.run("run-agent",
name=f"run-agent_#{worker_id}")
return run_agent_flow_id
# Main flow
with Flow("main") as flow:
n_workers = Parameters("n_workers", default=2)
worker_ids = get_worker_ids(n_workers) # [0, 1]
run_agent.map(worker_id=worker_ids)
When the agents finish their tasks, for some reason the kubernetes jobs are not terminating. Inside the job's container apparently the agent's process terminates, but I see prefect execute flow-run
and prefect hearbeat flow-run -i ...
being stuck.Kevin Kho
03/10/2022, 4:54 PMAnna Geller
03/10/2022, 4:55 PMwith Flow("run-agent") as flow_agent:
ShellTask(agent_command) # launch agent computing tasks
should be imo:
with Flow("run-agent") as flow_agent:
ShellTask()(agent_command) # launch agent computing tasks
because first brackets initialize the tasks while the latter call the taskcreate_flow_run.run("run-agent",
name=f"run-agent_#{worker_id}")
You should call create_flow_run from the Flow, not from a task. Even if you trigger the .run() method, I don't expect it to run reliably when calling this from a taskOlivér Atanaszov
03/10/2022, 5:07 PMYou should call create_flow_run from the Flow, not from a task. Even if you trigger the .run() method, I don't expect it to run reliably when calling this from a taskActually it seems to be working fine. Well, except for the flow run hanging.. 🙃
create_flow_run
and make sure that each of these agents run in a different job and schedules its own GPU.Anna Geller
03/10/2022, 5:11 PMKevin Kho
03/10/2022, 5:12 PMcreate_flow_run
is hanging. I assume this happens even outside the flow if you call it manually?Olivér Atanaszov
03/10/2022, 5:21 PMagent_command
I launch via ShellTask
inside the`run-agent` flow terminates in the flow run(s), but I can see prefect execute flow-run
running and apparently the pod keeps hanging around
the ShellTask gets stuck in a Running stateAnna Geller
03/10/2022, 5:33 PMfrom prefect.run_configs import KubernetesRun
flow.run_config = KubernetesRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
ShellTask
if it runs for longer than you expect it to run to ensure your parent flow run proceeds properly and the pod is not left hanging aroundOlivér Atanaszov
03/11/2022, 11:15 AMfrom prefect.run_configs import KubernetesRun
flow.run_config = KubernetesRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})
did not solve the issueAnna Geller
03/11/2022, 12:20 PMShellTask
and if something happens in this ShellTask's subprocess, there is even one more layer on top to find out about the issue.
We could e.g. mark task runs or flow runs immediately as failed if we e.g. don't get flow heartbeats for a couple of seconds and cancel the flow run, but this would risk that we cancel your computation even though it works properly but only had some transient issues with the heartbeat itself or with sending a state update to the API.
It could be that this description is technically not 100% accurate but I only wanted to make the challenge a bit clearer and show it's not a straightforward thing to do.
What you could do to help with the process if you would run your subprocess with a local agent rather than within a Kubernetes job. You could then have a subflow for that and this subflow with the ShellTask
can run on some local machine while your dependency-heavy containerized child flows run on Kubernetes and you could orchestrate all that using a parent flow (aka flow-of-flows).