Kayvon Avishan
05/23/2024, 8:05 PMMarvin
05/23/2024, 8:06 PMpython
import os
from prefect import flow, task
from prefect.client import get_client
async def delete_k8s_job(flow, flow_run, state):
"""Flow run state change hook that deletes a Kubernetes Job."""
k8s_job_name = flow_run.flow_name + "-job" # Customize this as necessary
delete_job_command = f"kubectl delete job {k8s_job_name}"
os.system(delete_job_command)
@flow(on_completion=[delete_k8s_job])
def my_flow():
# Your flow tasks here
pass
if __name__ == "__main__":
my_flow()
### Example Adaptation for Kubernetes Jobs
Here’s how you could adapt the example code to suit Kubernetes jobs:
python
import os
from prefect import flow, task
from prefect.blocks.system import String
from prefect.client import get_client
import prefect.runtime
async def delete_k8s_job(flow, flow_run, state):
"""Flow run state change hook that deletes a Kubernetes Job."""
k8s_job_name = "<your-job-name>" # Define the Kubernetes job name
delete_job_command = f"kubectl delete job {k8s_job_name}"
os.system(delete_job_command)
@flow(on_completion=[delete_k8s_job])
def example_flow():
@task
def hello():
print("Hello, world!")
hello()
if __name__ == "__main__":
example_flow()
This example defines a simple flow and a task within it. When the flow completes, the delete_k8s_job
function will run and delete the Kubernetes job.
Be sure to modify the job name and command parameters as necessary to fit your environment and naming conventions. For more detailed guidance, you may want to refer to the official documentation linked above.
### Links to References
- State Change Hooks Documentation
- Kubernetes Cluster AutoScaler and Prefect Issue
It’s always a good idea to adapt the hooks based on specific scenarios and handle any edge cases unique to your workflows.Kayvon Avishan
05/23/2024, 8:07 PMMarvin
05/23/2024, 8:07 PMjson
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": "{{ name }}"
},
"spec": {
"ttlSecondsAfterFinished": 0,
"template": {
"spec": {
"containers": [
{
"name": "{{ name }}",
"image": "{{ image }}",
"resources": {
"requests": {
"cpu": "{{ cpu_request }}"
}
}
}
],
"restartPolicy": "Never"
}
}
}
}
ttlSecondsAfterFinished
is set to 0, which ensures that the job is deleted immediately after it has finished. Adjust the value based on your specific needs.
3. Testing and Deployment
After editing the job template, create a new deployment and specify this work pool. The jobs created by this work pool will now have the ttlSecondsAfterFinished
setting, ensuring they get deleted after completion.
### Reference Links
You can read more about configuring work pools and job templates in the Prefect documentation:
- Kubernetes Deployment Guide
- Developing a New Worker Type
These guides will provide additional context and details for setting up and managing your Kubernetes workers in Prefect.
Adapting this global configuration ensures that all jobs spawned by your Kubernetes worker are automatically cleaned up, maintaining a tidy and efficient cluster environment.Taylor Curran
05/23/2024, 8:15 PMKayvon Avishan
05/23/2024, 8:21 PMKayvon Avishan
05/23/2024, 8:25 PMTaylor Curran
05/23/2024, 8:25 PM