https://prefect.io logo
#prefect-kubernetes
Title
# prefect-kubernetes
p

phil

01/10/2023, 3:42 PM
I'm new to Prefect. Try to understand how prefect works in the context of k8s. From the documentation, looks like Prefect runs tasks as k8s jobs. Is it possible for Prefect to create any k8s CDRs. For example, if I would like to kick off a Spark run with Spark Operators's SparkApplications. How do I do that with Prefect? TIA for any help.
j

Jeff Hale

01/10/2023, 4:08 PM
Hi phil. I assume you mean CRDs - Custom Resource Definitions? Have you seen prefect-kubernetes? I’m not sure it has what you need, but you could file an issue for a enhancement there if needed. I’m not a Spark pro, but will just note that if using Spark with Databricks is an option we also have a prefect-databricks integration.
p

phil

01/10/2023, 4:43 PM
Thanks Jeff, yes, it's a typo and I meant CRDs. We don't use databricks. I'm using Spark Operator as an example. In reality it could be anything for example, it could be AWS EMR, or a Dash/Ray cluster etc. So, if we can have Prefect to create any k8s CRDs, it will be really helpful.
It looks like prefect-kubernetes supports only k8s job, pod, service 3 types of resources.
j

Jeff Hale

01/11/2023, 1:37 AM
Yes, I think you would need to create a custom block type. Using code similar to the library as a starting point might be helpful.
p

phil

03/22/2023, 2:46 AM
Back on running prefect flow on k8s. Is it possible possible for running k8s jobs with a custom build docker image that have the flow code included in the image rather than uploading flow to any remote storage block such as s3 or github/gitlab repository?
j

jpuris

03/22/2023, 6:25 AM
I am curious about the above as well ☝️ In my mind we would have a CI that creates container images and pushes to AWS ECR. I would then have the deployment applied to spin up k8s job with the new image. Is this at all supported? If so, would absolutely love an example 🙏 For context, we currently run our flows on EC2 with agent retrieving the flow from local filesystem. I would like to move this to serverless..
p

phil

03/22/2023, 7:00 AM
Question, why s3fs is still required if flow is backed in and we don’t need to pull it from s3 bucket?
👍 1
@Jeff Hale the above blog article requires we build a docker image use the prefect docker image as the base image. If I already have a base image, is there other way that could add the prefect function to the custom docker image without require using prefect docker image as the base image?
j

Jeff Hale

03/22/2023, 1:59 PM
Re: s3fs: If the storage code is baked into the image, then you don’t need s3fs to pull it. Re: custom Docker image: Sure. You can just do what the Prefect images do. For example, see a Prefect Dockerfile on DockerHub with the final command:
p

phil

03/22/2023, 2:21 PM
Great, thanks Jeff!
Whether I use prefect as base image or duplicate the Prefect Dockerfiles in my application Dockerfile, the logics are mixed together. It might also increase the chance of some Python module conflicts as well. Is there a better way to separate flow logic (orchestration) with application logic? For example, in argo workflows, the flow logic is defined with yaml (DSL), each task runs the application in a separate pod/container which is completely unaware of the orchestration logic (workflows) at all. Does Prefect support such cleaner separate of flow orchestration from application task? Ideally, I could defined flow as very simple/lightweight Python code which will be deployed inline (without need any remote storage block, not even any baked docker image). That flow logic can then be pulled by agent, and then the agent can perform the orchestration based on the flow coming from the queue, kicks off application tasks as k8s pod. Is this possible with Prefect 2.0 today or is this something Prefect could support in future?
Hi @Jeff Hale, try to follow this article to create a k8s infrastructure block with custom job spec. https://medium.com/the-prefect-blog/how-to-use-kubernetes-with-prefect-part-2-2e98cdb91c7e I generated the _k8s_flow_run_job_manifest.yaml using this command:_ prefect kubernetes manifest flow-run-job > _k8s_flow_run_job_manifest.yaml_ Then I added namespace in the metadata. _But I am getting an error on the k8s agent, stating create_namespaced_job failed with missing required parameter 'namespace'._ Any idea what I might be missing? _In addition, how do I specify Pod toleration? Can I just add it in the generated k8s_flow_run_job_manifest.yaml file as I did for the namespace?_
c

Christopher Boyd

03/23/2023, 1:48 PM
What do your manifest look like?
You can also specify the namespace in the block configuration
Copy code
apiVersion: batch/v1
kind: Job
metadata:
  # labels are required, even if empty
  labels: {}
  namespace: prefect2
spec:
  template:
    spec:
      completions: 1
      containers:  # the first container is required
      - env: []  # env is required, even if empty
        name: prefect-job
      tolerations:
      - key: "example-key"
        operator: "Exists"
        effect: "NoSchedule"
      parallelism: 1
      restartPolicy: Never
p

phil

03/23/2023, 5:57 PM
Thanks @Christopher Boyd for your response. My manifest is very similar to above. I know it made difference, because the agent used to complain about no permission to deploy job in the default namespace (maybe by default, the agent should try to deploy in the same namespace as the agent itself) and now it complain about "_create_namespaced_job failed with missing required parameter 'namespace'."_
c

Christopher Boyd

03/23/2023, 6:06 PM
typical default behavior where none is specified is to deploy to the default namespace which is just standard kubernetes practice. Where is your agent running, and what namespace? If you are trying to deploy jobs to the default namespace, the easiest place I find to update the job is in the UI where you set the agent, OR in the python deployment:
Copy code
k8s_job = KubernetesJob(
    image=image,
    namespace="prefect2",
    name=environ['PROJECT_NAME'],
    customizations=customizations,
    env=dict(
        GCP_PROJECT_ID=GCP_PROJECT_ID,
        GCP_RESULTS_BUCKET=GCP_RESULTS_BUCKET,
        PREFECT_VERSION=PREFECT_VERSION,
        PYTHON_VERSION=PYTHON_VERSION
    ),
    labels={"environment": f'{environment}'.lower()},
    finished_job_ttl=600,
    job_watch_timeout_seconds=600,
    service_account_name=prefect2-agent
)
p

phil

03/23/2023, 6:19 PM
In that case, would it make sense to include a clusterrole/clusterrolebinding in the helm chart template, so it will allow agent to deploy to different namespace (including default namespace)? I can try to put namespace in the KubernetsJob infrastructure like the above example, can we also specify toleration similarly? Do you have an example?
c

Christopher Boyd

03/23/2023, 6:27 PM
not really, as that’s a specific use case for you, not the general practice. If you need agents to deploy into different namespaces, the best approach would be to either deploy a new agent in that new namespace, or create a clusterrole binding and role yourself - considering the security implications that has to be able to push jobs into other namespaces, it’s not something we want to ship by default
tolerations go in the pod spec at the same level as containers - https://docs.prefect.io/concepts/infrastructure/?h=kubernetes%20job#kubernetesjob
So you can either do it via yaml as a base manifest, or add it as a customization
I think the best approach would be into your manifest which is just a job spec, doing it via customization can be a little syntax odd because of the list of values, although it’s doable
if your spec is yaml:
Copy code
spec:
  template:
    spec:
      completions: 1
      containers:  # the first container is required
      - env: []  # env is required, even if empty
        name: prefect-job
      tolerations:
      - key: "example-key"
        operator: "Exists"
        effect: "NoSchedule"
if it’s a customization, it would be like
Copy code
{
  "op": "add",
  "path": "/spec/template/spec",
  "value": {
    "tolerations": [
      {
        "key": "example-key",
        "operator": "Exists",
        "effect": "NoSchedule"
      }
    ]
  }
}
p

phil

03/23/2023, 6:48 PM
Thanks Chris, the namespace change made difference. Now the job is started, but in pending state. Next will try to fix the toleration as you suggested. Hope that is it. Regarding the security implication, you have a point there. But I guess it's a legit use case where people want to a single cluster wide agent that can deploy job to different namespace. Having your create clusterole/binding is okay, but may not be idea for user to maintain additional manifests outside of helm chart. It will be nice if those are included in the helm chart with a switch to turn it on/off. All those could be documented with some examples how to set those values for different use cases.
@Christopher Boyd I'm having trouble to add the customizations to the KubernetesJob. If I add the above code as Python object or json object it complains about the type not being JsonPath. If I pass the above as string representation of json object, it complains about "string indices must be integers". Do you have an working example of how to provide customizations in the KubernetesJob?
c

Christopher Boyd

03/23/2023, 7:28 PM
Yes, the documentation link above provides working examples
A working example using tolerations for me (copied from my manifest in the UI):
Copy code
{
  "kind": "Job",
  "spec": {
    "template": {
      "spec": {
        "containers": [
          {
            "env": [],
            "name": "prefect-job"
          }
        ],
        "completions": 1,
        "parallelism": 1,
        "tolerations": [
          {
            "key": "prefect",
            "value": "regular",
            "effect": "NoSchedule",
            "operator": "Equal"
          }
        ],
        "nodeSelector": {
          "prefect": "regular"
        },
        "restartPolicy": "Never"
      }
    }
  },
  "metadata": {
    "labels": {}
  },
  "apiVersion": "batch/v1"
}
Also, I’m not sure how you’re passing the customization, but customizations expects a list - I was just providing an example formatting, but if you’re just looking to copy and paste my example it would be:
Copy code
customizations = [ 
{
    "op": "add",
    "path": "/spec/template/spec",
    "value": {
        "tolerations": [
          {
            "key": "example-key",
            "operator": "Exists",
            "effect": "NoSchedule"
          },
        ]
    },
}
]

KubernetesJob(
...
    customizations=customizations
)
p

phil

03/23/2023, 7:51 PM
That helped, I had a couple of issues with the customization. Not I am moving on to slightly different issue. Basically I need to figure out how to get the other scheduling policies to work properly by further customizing the pod spec. Thank you so much @Christopher Boyd.
🙌 1
2 Views