Sergey Goncharov
07/14/2022, 8:21 PMCreateAndRunJob
marked as succeeded but prefect does not wait for the end of the execution and then function get_our_pod_name
fails because it cannot find Job which is not yet created.
I though to use threading or queue from python here, but I think it's not the right way with Prefect.
Could you please advice here? I believe I do something wrong here.Anna Geller
07/14/2022, 10:23 PMSergey Goncharov
07/15/2022, 5:30 AMfrom unicodedata import name
from prefect import Flow, task, Parameter, Task
from prefect.storage import S3
from prefect.run_configs import KubernetesRun
from prefect import storage
from prefect.engine.signals import VALIDATIONFAIL
from prefect.triggers import all_finished
from prefect.tasks.kubernetes import (
RunNamespacedJob,
ReadNamespacedPodLogs,
ListNamespacedPod,
DeleteNamespacedPod,
DeleteNamespacedJob,
)
from prefect.executors import DaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
@task(log_stdout=True)
def DeleteIfExists(job_name: str):
delete_if_exists = DeleteNamespacedJob(
job_name=job_name,
kubernetes_api_key_secret=None,
namespace="prefect-rnd"
)
@task(log_stdout=True)
def CreateAndRunJob(job_name: str) -> RunNamespacedJob:
create_and_run_job = RunNamespacedJob(
delete_job_after_completion=False,
kubernetes_api_key_secret=None,
trigger=all_finished,
namespace="prefect-rnd",
body={
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
"namespace": "prefect-rnd"
},
"spec": {
"template": {
"spec": {
"restartPolicy": "Never",
"imagePullSecrets": [
{
"name": "docker-private"
}
],
"containers": [
{
"name": job_name,
"image": "private-image",
"command": ["/bin/bash", "run_job.sh"]
}
]
}
}
}
},
)
list_pods = ListNamespacedPod(
kubernetes_api_key_secret=None,
namespace="prefect-rnd"
)
logs = ReadNamespacedPodLogs(
kubernetes_api_key_secret=None,
namespace="prefect-rnd"
)
delete_pod = DeleteNamespacedPod(
kubernetes_api_key_secret=None,
namespace="prefect-rnd"
)
@task(log_stdout=True)
def get_pod_ids(api_response):
list_of_pods = []
for i in api_response.items:
list_of_pods.append(i.metadata.name)
print(list_of_pods)
return list_of_pods
@task(log_stdout=True)
def get_our_pod_name(pods: Task, job_name: str):
dummy_pods = [i for i in pods if i.startswith(JOB_NAME)]
if len(dummy_pods) > 1:
raise VALIDATIONFAIL("More than one dummy pod")
result = dummy_pods[0]
print(f"Our pod name: {result}")
return result
@task(trigger=all_finished, log_stdout=True)
def print_log_output(output):
print(output)
@task(log_stdout=True)
def extract_controller_uid(v1job):
return v1job.metadata.labels['controller-uid']
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["K8S"],
),
executor=DaskExecutor()
) as flow:
job_name = Parameter("job_name", default="Prefect")
del_job = DeleteIfExists(job_name)
k8s_job = CreateAndRunJob(job_name)
del_job.set_downstream(k8s_job)
pods = list_pods()
k8s_job.set_downstream(pods)
list_od_pods = get_pod_ids(pods)
pod_name = get_our_pod_name(list_od_pods, job_name)
logs = logs(pod_name)
print_logs = print_log_output(logs)
delete_task = delete_pod(pod_name)
print_logs.set_downstream(delete_task)
from prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect import storage
from prefect.tasks.kubernetes import (
DeleteNamespacedJob,
)
import prefect
FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
JOB_NAME = "test-name"
class CustomDeleteNamespacedJob(DeleteNamespacedJob):
def run(self):
delete_if_exists = DeleteNamespacedJob(
>>> job_name=PARAMETER_VALUE,
kubernetes_api_key_secret=None,
namespace="prefect-rnd"
)
return delete_if_exists
delete_custom_job = CustomDeleteNamespacedJob()
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["K8S"],
),
) as flow:
parameter_job_name = Parameter("parameter_job_name", default=JOB_NAME)
del_job = delete_custom_job.run()
Of course, I'm not asking for a solution, but for a way how to try to solve that 🙂
From my point of view it's not possible to use here prefect.context to retrieve the value of the parameter.Anna Geller
07/15/2022, 4:29 PMSergey Goncharov
07/15/2022, 4:37 PMfrom prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect import storage
from prefect.tasks.kubernetes import (
DeleteNamespacedJob,
)
import prefect
FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
JOB_NAME = "test-name"
class CustomDeleteNamespacedJob(DeleteNamespacedJob):
def run(self, job_name: str):
delete_if_exists = DeleteNamespacedJob(
job_name=job_name,
kubernetes_api_key_secret=None,
namespace="prefect-rnd"
)
return delete_if_exists
delete_custom_job = CustomDeleteNamespacedJob()
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["K8S"],
),
) as flow:
parameter_job_name = Parameter("parameter_job_name", default=JOB_NAME)
del_job = delete_custom_job.run(job_name=parameter_job_name.run())
Anna Geller
07/15/2022, 5:10 PM