Li Yann
09/12/2024, 1:26 AM@task
def run_kubernetes_job(image, command):
job = KubernetesJob(
job_name=f"job-{command}",
image=image,
namespace="default",
command=command,
delete_job_after_completion=True
)
job.run()
@flow(name="Kubernetes Job Flow", log_prints=True)
def control_flow():
print("test")
image_name = "myimage"
future1 = run_kubernetes_job.submit(image_name, ["python", "mock.py", str(3)])
future2 = run_kubernetes_job.submit(image_name, ["python", "mock.py", str(4)])
run_kubernetes_job.submit(image_name, ["python", "mock.py", str(5)], wait_for = [future1, future2]).result()
2. Or maybe create a image for every flow?
3. Other suggestions
In airflow, we can use KubernetesPodOperator to create a airflow dag run. And all we need to do is put our dags code to a volumn mount path, and airflow will automatically load these dags. Not sure prefect has functionals like thisMarvin
09/12/2024, 1:27 AMI have a project to transfer from airflow dags to prefect flows. In every flow, it will schedule serveral tasks, and every task is a docker image which we want to run in a temporary k8s pod. My prefect version is 3.0. I have deployed prefect server and worker in k8s, but not sure what's next.1. Should I write flow in this way and deploy it to server?
@task
def run_kubernetes_job(image, command):
job = KubernetesJob(
job_name=f"job-{command}",
image=image,
namespace="default",
command=command,
delete_job_after_completion=True
)
job.run()
@flow(name="Kubernetes Job Flow", log_prints=True)
def control_flow():
print("test")
image_name = "myimage"
future1 = run_kubernetes_job.submit(image_name, ["python", "mock.py", str(3)])
future2 = run_kubernetes_job.submit(image_name, ["python", "mock.py", str(4)])
run_kubernetes_job.submit(image_name, ["python", "mock.py", str(5)], wait_for = [future1, future2]).result()
2. Or maybe create a image for every flow?
3. Other suggestions
In airflow, we can use KubernetesPodOperator to create a airflow dag run. And all we need to do is put our dags code to a volumn