Thread
#prefect-community
    n

    Nicolas van de Walle

    2 years ago
    Hi, I am trying to run flows using a Kubernetes agent and I just set it up as detailed here: https://docs.prefect.io/orchestration/agents/kubernetes.html#installation
    prefect agent install kubernetes -t <MY_TOKEN> -n prefect --label prefect-namespace --rbac | kubectl apply -n prefect -f -
    I used Prefect cloud for that. The flow has well been scheduled but nothing gets executed by the agent (it appears in the agents tab in the cloud UI with its prefect-namespace label but it does not see the flows that need to be run). To be honest, I am quite new to kubernetes and do not really understand how to use RBAC. Do I need to do anything else?
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    Hi @Nicolas van de Walle, welcome to the wild world of kuberntes 🙂 Though actually, your RBAC setup looks good, I think this might have more to do with the
    --labels
    (which is actually more of a Prefect thing than a kubernetes thing). When you register your flows, do you also associate the
    prefect-namespace
    label to their environment? (The docs on it are here for your ref in case you hadn’t seen them before: https://docs.prefect.io/orchestration/execution/overview.html#labels)
    n

    Nicolas van de Walle

    2 years ago
    Thanks @Laura Lorenz (she/her) for your quick reply! In fact, I did not think about it. But what kind of environment am I supposed to use then? I have set a DaskKubernetesEnvironment as follow:
    environment = DaskKubernetesEnvironment(labels=["prefect-namespace"])
    
    with Flow("Example Flow Without Mail", environment=environment) as flow:
        ...
    But it still does not run the flow
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    Ok gotcha, that
    labels=["prefect-namespace"]
    on the environment attached to the flow is exactly what I was looking for, because usually when a flow is stuck in a
    Scheduled
    state even though agents are running it is because the agents and the flow environment don’t have the same labels. If that’s not the case, it must be something else. Is there any output from your agent you can share (this would be in where logs are stored from wherever you deployed your kubernetes agent)
    n

    Nicolas van de Walle

    2 years ago
    Nope, unfortunately, no special logs at all, just the usual
    ____            __           _        _                    _
    |  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
    | |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
    |  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
    |_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                               |___/
    
    [2020-07-09 14:50:43,735] INFO - agent | Starting KubernetesAgent with labels ['prefect-namespace']
    [2020-07-09 14:50:43,736] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
    [2020-07-09 14:50:43,736] INFO - agent | Agent connecting to the Prefect API at <https://api.prefect.io>
    [2020-07-09 14:50:44,019] INFO - agent | Waiting for flow runs...
    Does the flow environment matter? I tried a DaskKubernetesEnvironment(labels=["prefect-namespace"]), a LocalEnvironment(executor=DaskExecutor(), labels=["prefect-namespace"]) but nothing triggers the run
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    No, the environment doesn’t matter (though I appreciate you trying it, haha). The most important label restriction is that
    An Agent’s labels must be a superset of the labels specified on a Flow’s environment. Just one more double check before we abandon the labels troubleshooting path, if you look at your flow in the UI the labels it was registered with should be displayed in the Flow page in the flow overview tile. It just says
    "prefect-namespace"
    there like we expect, right? In my example here my label is “Lauras-MacBook-Pro.local
    n

    Nicolas van de Walle

    2 years ago
    Yes, I see it there. BTW, I did not have issues when running a local agent, my running problem happened when I started using k8s
    Here is the full source code. Hope it helps:
    import prefect
    from prefect import task, Flow
    from random import choice
    from prefect.tasks.control_flow.case import case
    from prefect.tasks.control_flow.conditional import merge
    from time import sleep
    from prefect.schedules.schedules import IntervalSchedule
    from datetime import timedelta
    from prefect import Client
    from prefect.environments import LocalEnvironment
    from prefect.environments import DaskKubernetesEnvironment
    
    @task
    def hello_task():
        log('Setting up tasks and waiting for 1s')
        sleep(1)
    
    @task
    def is_true_task() -> None:
        log('Choosing branch')
        return choice([True, False]) # Chooses a random boolean value
    
    @task
    def first_task() -> int:
        log('Executing first_task')
        return 1234
    
    @task
    def second_task() -> int:
        log('Executing second_task')
        return 5678
    
    @task
    def end_task(x: int):
        log('This is the end. Final value is {}'.format(x))
    
    @task
    def run_in_parrallel():
        log('Running concurrently')
    
    def log(msg: str) -> None:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(msg)
    
    environment = DaskKubernetesEnvironment(labels=["prefect-namespace"])
    
    
    with Flow("Example Flow", environment=environment) as flow:
        hello = hello_task()
        cond = is_true_task()
        hello.set_downstream(cond) 
    
    
        with case(cond, True):
            val1 = first_task()
    
        with case(cond, False):
            val2 = second_task()
    
        parallel = run_in_parrallel()
    
        val = merge(val1, val2, parallel)
        
        end = end_task(val)
    
    flow.register(project_name="Test project 1", labels=["prefect-namespace"])
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    Oo thank you! Let me take a look. Interesting it worked with localagent and not k8s agent. Btw, are you running the kubernetes agent in a kuberntes cluster in a cloud or are just running it locally right now
    n

    Nicolas van de Walle

    2 years ago
    I am running it on the cloud infrastructure of the company I work for (on premise). Regarding the firewall, the k8s agent is able to reach prefect cloud (I see the "last query" messages updating) and the local agent worked fine so I suppose it is not the issue (unless there is a behavior I did not think about)
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    Ok gotcha! Looking at this flow it isn’t setting any storage which means it is using local storage by default. This should a) cause there to be an extra label attached to your flow, which should be visible in the UI, of the hostname of the computer it was registered from (which would prevent the k8s agent from picking it up since the agent must have a superset of the flow’s labels) and b) overall kubernetes agents cannot start flows set with local storage because by nature it is distributed from your registration file system, so you must use a cloud storage option (like docker storage, GCS storage, S3 storage — docs are https://docs.prefect.io/orchestration/execution/storage_options.html).
    n

    Nicolas van de Walle

    2 years ago
    Alright, thank you for your help! Adding the label helped the task move to the submitted phase and in fact, it did not go further. Regarding the storage, is there a way of using the Kubernetes agent on a single pod without setting up a distributed storage? My goal would be to use the advantages of k8s (if a pod dies, it gets restarted etc.) without the need to setup an extra storage. I know it does not make much sense to use k8s then but I do not need to keep the results, that is why local storage was enough from my viewpoint. If the pods gets restarted, it is ok to lose the data
    Laura Lorenz (she/her)

    Laura Lorenz (she/her)

    2 years ago
    “Storage” in this context isn’t about storage of your results, but about where the flow itself is stored, so you do need it to even run the flow. So it does need to be somewhere that the kubernetes agent (and the pods it spins up) can access it, which is why it needs to be a cloud storage of some kind that your kubernetes agent/pod is authenticated to. Docker storage is probably the most common, but we do have a new Github storage you might be interested in (https://docs.prefect.io/orchestration/execution/storage_options.html#github) that may feel like less setup!