Nicolas van de Walle

07/09/2020, 2:55 PM
Hi, I am trying to run flows using a Kubernetes agent and I just set it up as detailed here:
Copy code
prefect agent install kubernetes -t <MY_TOKEN> -n prefect --label prefect-namespace --rbac | kubectl apply -n prefect -f -
I used Prefect cloud for that. The flow has well been scheduled but nothing gets executed by the agent (it appears in the agents tab in the cloud UI with its prefect-namespace label but it does not see the flows that need to be run). To be honest, I am quite new to kubernetes and do not really understand how to use RBAC. Do I need to do anything else?
👀 1

Laura Lorenz (she/her)

07/09/2020, 3:07 PM
Hi @Nicolas van de Walle, welcome to the wild world of kuberntes 🙂 Though actually, your RBAC setup looks good, I think this might have more to do with the
(which is actually more of a Prefect thing than a kubernetes thing). When you register your flows, do you also associate the
label to their environment? (The docs on it are here for your ref in case you hadn’t seen them before:

Nicolas van de Walle

07/09/2020, 3:22 PM
Thanks @Laura Lorenz (she/her) for your quick reply! In fact, I did not think about it. But what kind of environment am I supposed to use then? I have set a DaskKubernetesEnvironment as follow:
Copy code
environment = DaskKubernetesEnvironment(labels=["prefect-namespace"])

with Flow("Example Flow Without Mail", environment=environment) as flow:
But it still does not run the flow

Laura Lorenz (she/her)

07/09/2020, 3:30 PM
Ok gotcha, that
on the environment attached to the flow is exactly what I was looking for, because usually when a flow is stuck in a
state even though agents are running it is because the agents and the flow environment don’t have the same labels. If that’s not the case, it must be something else. Is there any output from your agent you can share (this would be in where logs are stored from wherever you deployed your kubernetes agent)

Nicolas van de Walle

07/09/2020, 3:32 PM
Nope, unfortunately, no special logs at all, just the usual
Copy code
____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|

[2020-07-09 14:50:43,735] INFO - agent | Starting KubernetesAgent with labels ['prefect-namespace']
[2020-07-09 14:50:43,736] INFO - agent | Agent documentation can be found at <>
[2020-07-09 14:50:43,736] INFO - agent | Agent connecting to the Prefect API at <>
[2020-07-09 14:50:44,019] INFO - agent | Waiting for flow runs...
Does the flow environment matter? I tried a DaskKubernetesEnvironment(labels=["prefect-namespace"]), a LocalEnvironment(executor=DaskExecutor(), labels=["prefect-namespace"]) but nothing triggers the run

Laura Lorenz (she/her)

07/09/2020, 3:40 PM
No, the environment doesn’t matter (though I appreciate you trying it, haha). The most important label restriction is that > An Agent’s labels must be a superset of the labels specified on a Flow’s environment. Just one more double check before we abandon the labels troubleshooting path, if you look at your flow in the UI the labels it was registered with should be displayed in the Flow page in the flow overview tile. It just says
there like we expect, right? In my example here my label is “`Lauras-MacBook-Pro.local`”

Nicolas van de Walle

07/09/2020, 3:42 PM
Yes, I see it there. BTW, I did not have issues when running a local agent, my running problem happened when I started using k8s
Here is the full source code. Hope it helps:
Copy code
import prefect
from prefect import task, Flow
from random import choice
from import case
from prefect.tasks.control_flow.conditional import merge
from time import sleep
from prefect.schedules.schedules import IntervalSchedule
from datetime import timedelta
from prefect import Client
from prefect.environments import LocalEnvironment
from prefect.environments import DaskKubernetesEnvironment

def hello_task():
    log('Setting up tasks and waiting for 1s')

def is_true_task() -> None:
    log('Choosing branch')
    return choice([True, False]) # Chooses a random boolean value

def first_task() -> int:
    log('Executing first_task')
    return 1234

def second_task() -> int:
    log('Executing second_task')
    return 5678

def end_task(x: int):
    log('This is the end. Final value is {}'.format(x))

def run_in_parrallel():
    log('Running concurrently')

def log(msg: str) -> None:
    logger = prefect.context.get("logger")

environment = DaskKubernetesEnvironment(labels=["prefect-namespace"])

with Flow("Example Flow", environment=environment) as flow:
    hello = hello_task()
    cond = is_true_task()

    with case(cond, True):
        val1 = first_task()

    with case(cond, False):
        val2 = second_task()

    parallel = run_in_parrallel()

    val = merge(val1, val2, parallel)
    end = end_task(val)

flow.register(project_name="Test project 1", labels=["prefect-namespace"])

Laura Lorenz (she/her)

07/09/2020, 3:50 PM
Oo thank you! Let me take a look. Interesting it worked with localagent and not k8s agent. Btw, are you running the kubernetes agent in a kuberntes cluster in a cloud or are just running it locally right now

Nicolas van de Walle

07/09/2020, 3:53 PM
I am running it on the cloud infrastructure of the company I work for (on premise). Regarding the firewall, the k8s agent is able to reach prefect cloud (I see the "last query" messages updating) and the local agent worked fine so I suppose it is not the issue (unless there is a behavior I did not think about)

Laura Lorenz (she/her)

07/09/2020, 4:03 PM
Ok gotcha! Looking at this flow it isn’t setting any storage which means it is using local storage by default. This should a) cause there to be an extra label attached to your flow, which should be visible in the UI, of the hostname of the computer it was registered from (which would prevent the k8s agent from picking it up since the agent must have a superset of the flow’s labels) and b) overall kubernetes agents cannot start flows set with local storage because by nature it is distributed from your registration file system, so you must use a cloud storage option (like docker storage, GCS storage, S3 storage — docs are

Nicolas van de Walle

07/09/2020, 4:27 PM
Alright, thank you for your help! Adding the label helped the task move to the submitted phase and in fact, it did not go further. Regarding the storage, is there a way of using the Kubernetes agent on a single pod without setting up a distributed storage? My goal would be to use the advantages of k8s (if a pod dies, it gets restarted etc.) without the need to setup an extra storage. I know it does not make much sense to use k8s then but I do not need to keep the results, that is why local storage was enough from my viewpoint. If the pods gets restarted, it is ok to lose the data

Laura Lorenz (she/her)

07/09/2020, 4:46 PM
“Storage” in this context isn’t about storage of your results, but about where the flow itself is stored, so you do need it to even run the flow. So it does need to be somewhere that the kubernetes agent (and the pods it spins up) can access it, which is why it needs to be a cloud storage of some kind that your kubernetes agent/pod is authenticated to. Docker storage is probably the most common, but we do have a new Github storage you might be interested in ( that may feel like less setup!
🚀 1