Hey everyone, I am trying to run a simple flow wit...
# ask-community
g
Hey everyone, I am trying to run a simple flow with a
RunNamespacedJob
task over Kubernetes. I am using the following code:
Copy code
storage = S3(bucket="gavriel-test", stored_as_script=True)
kubernetes_run_conf = KubernetesRun(
    env={
        "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
        "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY
    },
    labels=["prefect-poc-k8s-agent"]
)
body = {
    'apiVersion': 'batch/v1',
    'kind': 'Job',
    'metadata': {'name': 'echo'},
    'spec':
        {
            'template':
                {
                    'spec': {
                        'containers': [
                            {
                                'name': 'echo',
                                'image': 'alpine:3.7',
                                'command': ['sh -c  "echo Hello!!!"; sleep 10']
                            }
                        ]
                    }
                },
            'backoffLimit': 4
        }
}
with Flow("kubernetes-CreateNamespacedJob", run_config=kubernetes_run_conf, storage=storage) as flow:
    job = RunNamespacedJob(body=body, namespace="prefect", delete_job_after_completion=False)
But I keep getting the following error:
Copy code
Error during execution of task: MaxRetryError("HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f8c8adeb650>: Failed to establish a new connection: [Errno 111] Connection refused'))")
Do you have any idea how to resolve this?
a
@Gabi Pi there are a couple of things to make it work: • I think your command needs to be refactored a bit so that each part of a command is separated into its own string, see example below • your spec --> template --> spec is missing restartPolicy, which is required, see example below • lastly, since you attempt an in-cluster connection, rather than triggering this pod creation via API key, you should set
kubernetes_api_key_secret=None
in your
RunNamespacedJob
This example worked for me with a local Kubernetes cluster:
Copy code
from prefect import Flow, task
from prefect.storage import S3
from prefect.run_configs import KubernetesRun
from prefect.client.secrets import Secret
from prefect.tasks.kubernetes import RunNamespacedJob


FLOW_NAME = "run_namedspaced_job"
AWS_ACCESS_KEY_ID = Secret("AWS_ACCESS_KEY_ID").get()
AWS_SECRET_ACCESS_KEY = Secret("AWS_SECRET_ACCESS_KEY").get()
STORAGE = S3(
    bucket="prefect-datasets",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    local_script_path=f"flows/{FLOW_NAME}.py",
)


body = {
    'apiVersion': 'batch/v1',
    'kind': 'Job',
    'metadata': {'name': 'dummy'},
    'spec':
        {
            'template':
                {
                    'spec': {
                        'containers': [
                            {
                                'name': 'echo',
                                'image': 'alpine:3.7',
                                'command': ["echo",  "Hello"]
                            }
                        ],
                        'restartPolicy': 'Never'. # this was missing
                    }
                },
            'backoffLimit': 4
        }
}
k8s_job = RunNamespacedJob(body=body, delete_job_after_completion=True, kubernetes_api_key_secret=None) # this was also missing


with Flow(
    FLOW_NAME,
    storage=STORAGE,
    run_config=KubernetesRun(
        labels=["k8s"],
        env={
            "AWS_ACCESS_KEY_ID": AWS_ACCESS_KEY_ID,
            "AWS_SECRET_ACCESS_KEY": AWS_SECRET_ACCESS_KEY,
        },
    ),
) as flow:
    k8s_job()
g
Thank you @Anna Geller, it worked although the logs of the spawned pods are not redirected to the UI. The only way to inspect them is to access directly using
kubectl logs <pod-name>
. Do you have an idea why? And thanks again for your help, I highly appreciate it!
k
For
RunNamespacedJob
, I think there is this issue
Looks like Tony has an open PR
g
Oh, that's definitely explains it. Thanks @Kevin Kho!
a
@Gabi Pi when we combine it with other Kubernetes tasks, it then works. Took a bit of time to figure out how, but this example worked for me https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py
g
Really useful @Anna Geller, thank you!👏