Gabi Pi
10/24/2021, 12:17 PMRunNamespacedJob
task over Kubernetes. I am using the following code:
storage = S3(bucket="gavriel-test", stored_as_script=True)
kubernetes_run_conf = KubernetesRun(
env={
"AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY
},
labels=["prefect-poc-k8s-agent"]
)
body = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {'name': 'echo'},
'spec':
{
'template':
{
'spec': {
'containers': [
{
'name': 'echo',
'image': 'alpine:3.7',
'command': ['sh -c "echo Hello!!!"; sleep 10']
}
]
}
},
'backoffLimit': 4
}
}
with Flow("kubernetes-CreateNamespacedJob", run_config=kubernetes_run_conf, storage=storage) as flow:
job = RunNamespacedJob(body=body, namespace="prefect", delete_job_after_completion=False)
But I keep getting the following error:
Error during execution of task: MaxRetryError("HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8c8adeb650>: Failed to establish a new connection: [Errno 111] Connection refused'))")
Do you have any idea how to resolve this?Anna Geller
kubernetes_api_key_secret=None
in your RunNamespacedJob
This example worked for me with a local Kubernetes cluster:
from prefect import Flow, task
from prefect.storage import S3
from prefect.run_configs import KubernetesRun
from prefect.client.secrets import Secret
from prefect.tasks.kubernetes import RunNamespacedJob
FLOW_NAME = "run_namedspaced_job"
AWS_ACCESS_KEY_ID = Secret("AWS_ACCESS_KEY_ID").get()
AWS_SECRET_ACCESS_KEY = Secret("AWS_SECRET_ACCESS_KEY").get()
STORAGE = S3(
bucket="prefect-datasets",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
local_script_path=f"flows/{FLOW_NAME}.py",
)
body = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {'name': 'dummy'},
'spec':
{
'template':
{
'spec': {
'containers': [
{
'name': 'echo',
'image': 'alpine:3.7',
'command': ["echo", "Hello"]
}
],
'restartPolicy': 'Never'. # this was missing
}
},
'backoffLimit': 4
}
}
k8s_job = RunNamespacedJob(body=body, delete_job_after_completion=True, kubernetes_api_key_secret=None) # this was also missing
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
env={
"AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
"AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
},
),
) as flow:
k8s_job()
Gabi Pi
10/24/2021, 4:08 PMkubectl logs <pod-name>
. Do you have an idea why?
And thanks again for your help, I highly appreciate it!Kevin Kho
RunNamespacedJob
, I think there is this issueKevin Kho
Gabi Pi
10/24/2021, 6:02 PMAnna Geller
Gabi Pi
10/25/2021, 7:53 AM