https://prefect.io logo
Title
s

Sergey Goncharov

07/14/2022, 8:21 PM
Hi I'm trying to prepare a parametrised flow for Prefect v1. It has almost the same code as https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py , but I only changed some functions to make them ready to work with parameters. I cannot get why does not it work now: function
CreateAndRunJob
marked as succeeded but prefect does not wait for the end of the execution and then function
get_our_pod_name
fails because it cannot find Job which is not yet created. I though to use threading or queue from python here, but I think it's not the right way with Prefect. Could you please advice here? I believe I do something wrong here.
1
a

Anna Geller

07/14/2022, 10:23 PM
Could you please move your code blocks to the thread? If you look at my example, I was declaring the Kubernetes tasks at the module scope and then calling those within the flow. When you call the already initialized task's class, you're invoking the task's run method
:gratitude-thank-you: 1
This is currently missing in your example
s

Sergey Goncharov

07/15/2022, 5:30 AM
code
from unicodedata import name
from prefect import Flow, task, Parameter, Task
from prefect.storage import S3
from prefect.run_configs import KubernetesRun
from prefect import storage

from prefect.engine.signals import VALIDATIONFAIL
from prefect.triggers import all_finished
from prefect.tasks.kubernetes import (
    RunNamespacedJob,
    ReadNamespacedPodLogs,
    ListNamespacedPod,
    DeleteNamespacedPod,
    DeleteNamespacedJob,
)

from prefect.executors import DaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)

@task(log_stdout=True)
def DeleteIfExists(job_name: str):
  delete_if_exists = DeleteNamespacedJob(
      job_name=job_name, 
      kubernetes_api_key_secret=None,
      namespace="prefect-rnd"
  )

@task(log_stdout=True)
def CreateAndRunJob(job_name: str) -> RunNamespacedJob:
  create_and_run_job = RunNamespacedJob(
      delete_job_after_completion=False,
      kubernetes_api_key_secret=None,
      trigger=all_finished,
      namespace="prefect-rnd",
      body={
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {
              "name": job_name,
              "namespace": "prefect-rnd"
            },
            "spec": {
              "template": {
                "spec": {
                  "restartPolicy": "Never",
                  "imagePullSecrets": [
                    {
                      "name": "docker-private"
                    }
                  ],
                  "containers":  [
                    {
                      "name": job_name,
                      "image": "private-image",
                      "command": ["/bin/bash", "run_job.sh"]
                    }
                  ]
                }
              }
            }
          },
    )

list_pods = ListNamespacedPod(
  kubernetes_api_key_secret=None,
  namespace="prefect-rnd"
)
logs = ReadNamespacedPodLogs(
  kubernetes_api_key_secret=None,
  namespace="prefect-rnd"
)
delete_pod = DeleteNamespacedPod(
  kubernetes_api_key_secret=None,
  namespace="prefect-rnd"
)


@task(log_stdout=True)
def get_pod_ids(api_response):
    list_of_pods = []
    for i in api_response.items:
        list_of_pods.append(i.metadata.name)
    print(list_of_pods)
    return list_of_pods


@task(log_stdout=True)
def get_our_pod_name(pods: Task, job_name: str):
    dummy_pods = [i for i in pods if i.startswith(JOB_NAME)]
    if len(dummy_pods) > 1:
        raise VALIDATIONFAIL("More than one dummy pod")
    result = dummy_pods[0]
    print(f"Our pod name: {result}")
    return result


@task(trigger=all_finished, log_stdout=True)
def print_log_output(output):
    print(output)


@task(log_stdout=True)
def extract_controller_uid(v1job):
    return v1job.metadata.labels['controller-uid']

with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["K8S"],
        ),
        executor=DaskExecutor()
) as flow:
    job_name = Parameter("job_name", default="Prefect")
    del_job = DeleteIfExists(job_name)
    k8s_job = CreateAndRunJob(job_name)
    del_job.set_downstream(k8s_job)
    
    pods = list_pods()
    k8s_job.set_downstream(pods)

    list_od_pods = get_pod_ids(pods)
    pod_name = get_our_pod_name(list_od_pods, job_name)
    logs = logs(pod_name)
    print_logs = print_log_output(logs)

    delete_task = delete_pod(pod_name)
    print_logs.set_downstream(delete_task)
seems like I fixed that. Thank you! However I cannot get how to pass a parameter of a flow to not a task, but to a class. Like on the example below.
from prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect import storage

from prefect.tasks.kubernetes import (
    DeleteNamespacedJob,
)

import prefect

FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
JOB_NAME = "test-name"

class CustomDeleteNamespacedJob(DeleteNamespacedJob):

  def run(self):
    delete_if_exists = DeleteNamespacedJob(
>>>     job_name=PARAMETER_VALUE, 
        kubernetes_api_key_secret=None,
        namespace="prefect-rnd"
    )
    return delete_if_exists


delete_custom_job = CustomDeleteNamespacedJob()


with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["K8S"],
        ),
) as flow:
    parameter_job_name = Parameter("parameter_job_name", default=JOB_NAME)
    
    del_job = delete_custom_job.run()
Of course, I'm not asking for a solution, but for a way how to try to solve that 🙂 From my point of view it's not possible to use here prefect.context to retrieve the value of the parameter.
a

Anna Geller

07/15/2022, 4:29 PM
the easiest is when your custom task can accept that parameter value within the .run() method - this way, you can pass it to a downstream task as data dependency
🙏 1
s

Sergey Goncharov

07/15/2022, 4:37 PM
@Anna Geller, appreciate your help!
🙌 1
just for save here to possible solution. I've solved it by that way
from prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect import storage

from prefect.tasks.kubernetes import (
    DeleteNamespacedJob,
)

import prefect

FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
JOB_NAME = "test-name"

class CustomDeleteNamespacedJob(DeleteNamespacedJob):

  def run(self, job_name:  str):
    delete_if_exists = DeleteNamespacedJob(
        job_name=job_name, 
        kubernetes_api_key_secret=None,
        namespace="prefect-rnd"
    )
    return delete_if_exists


delete_custom_job = CustomDeleteNamespacedJob()


with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["K8S"],
        ),
) as flow:
    parameter_job_name = Parameter("parameter_job_name", default=JOB_NAME)
    
    del_job = delete_custom_job.run(job_name=parameter_job_name.run())
🙌 1
a

Anna Geller

07/15/2022, 5:10 PM
thanks for sharing 🙌