Li Yann
09/12/2024, 1:23 AM@task
def run_kubernetes_job(image, command):
job = KubernetesJob(
job_name=f"job-{command}",
image=image,
namespace="default",
command=command,
delete_job_after_completion=True
)
job.run()
@flow(name="Kubernetes Job Flow", log_prints=True)
def control_flow():
print("test")
image_name = "myimage"
future1 = run_kubernetes_job.submit(image_name, ["python", "mock.py", str(3)])
future2 = run_kubernetes_job.submit(image_name, ["python", "mock.py", str(4)])
run_kubernetes_job.submit(image_name, ["python", "mock.py", str(5)], wait_for = [future1, future2]).result()
2. Or maybe create a image for every flow?
3. Other suggestions?
In airflow, we can use KubernetesPodOperator to create a temporary k8s pod to run a image. And for deploy dags, all we need to do is put our dags code to a volumn mount path, and airflow will automatically load these dags. Not sure prefect has functionals like this.
Any suggestions would help me a lot! Thks