Hi I'm trying to prepare a parametrised flow for P...
# prefect-community
s
Hi I'm trying to prepare a parametrised flow for Prefect v1. It has almost the same code as https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py , but I only changed some functions to make them ready to work with parameters. I cannot get why does not it work now: function
CreateAndRunJob
marked as succeeded but prefect does not wait for the end of the execution and then function
get_our_pod_name
fails because it cannot find Job which is not yet created. I though to use threading or queue from python here, but I think it's not the right way with Prefect. Could you please advice here? I believe I do something wrong here.
1
a
Could you please move your code blocks to the thread? If you look at my example, I was declaring the Kubernetes tasks at the module scope and then calling those within the flow. When you call the already initialized task's class, you're invoking the task's run method
gratitude thank you 1
This is currently missing in your example
s
code
Copy code
from unicodedata import name
from prefect import Flow, task, Parameter, Task
from prefect.storage import S3
from prefect.run_configs import KubernetesRun
from prefect import storage

from prefect.engine.signals import VALIDATIONFAIL
from prefect.triggers import all_finished
from prefect.tasks.kubernetes import (
    RunNamespacedJob,
    ReadNamespacedPodLogs,
    ListNamespacedPod,
    DeleteNamespacedPod,
    DeleteNamespacedJob,
)

from prefect.executors import DaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)

@task(log_stdout=True)
def DeleteIfExists(job_name: str):
  delete_if_exists = DeleteNamespacedJob(
      job_name=job_name, 
      kubernetes_api_key_secret=None,
      namespace="prefect-rnd"
  )

@task(log_stdout=True)
def CreateAndRunJob(job_name: str) -> RunNamespacedJob:
  create_and_run_job = RunNamespacedJob(
      delete_job_after_completion=False,
      kubernetes_api_key_secret=None,
      trigger=all_finished,
      namespace="prefect-rnd",
      body={
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {
              "name": job_name,
              "namespace": "prefect-rnd"
            },
            "spec": {
              "template": {
                "spec": {
                  "restartPolicy": "Never",
                  "imagePullSecrets": [
                    {
                      "name": "docker-private"
                    }
                  ],
                  "containers":  [
                    {
                      "name": job_name,
                      "image": "private-image",
                      "command": ["/bin/bash", "run_job.sh"]
                    }
                  ]
                }
              }
            }
          },
    )

list_pods = ListNamespacedPod(
  kubernetes_api_key_secret=None,
  namespace="prefect-rnd"
)
logs = ReadNamespacedPodLogs(
  kubernetes_api_key_secret=None,
  namespace="prefect-rnd"
)
delete_pod = DeleteNamespacedPod(
  kubernetes_api_key_secret=None,
  namespace="prefect-rnd"
)


@task(log_stdout=True)
def get_pod_ids(api_response):
    list_of_pods = []
    for i in api_response.items:
        list_of_pods.append(i.metadata.name)
    print(list_of_pods)
    return list_of_pods


@task(log_stdout=True)
def get_our_pod_name(pods: Task, job_name: str):
    dummy_pods = [i for i in pods if i.startswith(JOB_NAME)]
    if len(dummy_pods) > 1:
        raise VALIDATIONFAIL("More than one dummy pod")
    result = dummy_pods[0]
    print(f"Our pod name: {result}")
    return result


@task(trigger=all_finished, log_stdout=True)
def print_log_output(output):
    print(output)


@task(log_stdout=True)
def extract_controller_uid(v1job):
    return v1job.metadata.labels['controller-uid']

with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["K8S"],
        ),
        executor=DaskExecutor()
) as flow:
    job_name = Parameter("job_name", default="Prefect")
    del_job = DeleteIfExists(job_name)
    k8s_job = CreateAndRunJob(job_name)
    del_job.set_downstream(k8s_job)
    
    pods = list_pods()
    k8s_job.set_downstream(pods)

    list_od_pods = get_pod_ids(pods)
    pod_name = get_our_pod_name(list_od_pods, job_name)
    logs = logs(pod_name)
    print_logs = print_log_output(logs)

    delete_task = delete_pod(pod_name)
    print_logs.set_downstream(delete_task)
seems like I fixed that. Thank you! However I cannot get how to pass a parameter of a flow to not a task, but to a class. Like on the example below.
Copy code
from prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect import storage

from prefect.tasks.kubernetes import (
    DeleteNamespacedJob,
)

import prefect

FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
JOB_NAME = "test-name"

class CustomDeleteNamespacedJob(DeleteNamespacedJob):

  def run(self):
    delete_if_exists = DeleteNamespacedJob(
>>>     job_name=PARAMETER_VALUE, 
        kubernetes_api_key_secret=None,
        namespace="prefect-rnd"
    )
    return delete_if_exists


delete_custom_job = CustomDeleteNamespacedJob()


with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["K8S"],
        ),
) as flow:
    parameter_job_name = Parameter("parameter_job_name", default=JOB_NAME)
    
    del_job = delete_custom_job.run()
Of course, I'm not asking for a solution, but for a way how to try to solve that 🙂 From my point of view it's not possible to use here prefect.context to retrieve the value of the parameter.
a
the easiest is when your custom task can accept that parameter value within the .run() method - this way, you can pass it to a downstream task as data dependency
🙏 1
s
@Anna Geller, appreciate your help!
🙌 1
just for save here to possible solution. I've solved it by that way
Copy code
from prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect import storage

from prefect.tasks.kubernetes import (
    DeleteNamespacedJob,
)

import prefect

FLOW_NAME = "flow_name_tmp"
STORAGE = storage.GitLab(...)
JOB_NAME = "test-name"

class CustomDeleteNamespacedJob(DeleteNamespacedJob):

  def run(self, job_name:  str):
    delete_if_exists = DeleteNamespacedJob(
        job_name=job_name, 
        kubernetes_api_key_secret=None,
        namespace="prefect-rnd"
    )
    return delete_if_exists


delete_custom_job = CustomDeleteNamespacedJob()


with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["K8S"],
        ),
) as flow:
    parameter_job_name = Parameter("parameter_job_name", default=JOB_NAME)
    
    del_job = delete_custom_job.run(job_name=parameter_job_name.run())
🙌 1
a
thanks for sharing 🙌