Hey all, I’m trying to test Prefect Cloud on Kuber...
# prefect-server
s
Hey all, I’m trying to test Prefect Cloud on Kubernetes (GKE) with DaskExecutor and GCS storage, based loosely on the instructions from Prefect https://docs.prefect.io/orchestration/agents/kubernetes.html#running-in-cluster I’m able to get a Kubernetes Agent to run (verified in Prefect Cloud UI). But when I register a basic job and trigger a manual run in the Cloud UI, the job hangs in pending state (submitted mode) and never executes. Feels like I’m botching something related to the networking layer, but I can’t seem to pinpoint the issue after attempting some of the debugging techniques detailed in this thread: https://prefect-community.slack.com/archives/C014Z8DPDSR/p1628175479115400 Can anyone provide advice or a reference example for GKE in particular?
If it helps, I’ve verified through the Dask web UI that tasks are not reaching Dask workers or even being scheduled, afaict. I’m also not seeing anything helpful in Cloud UI logs even when running at DEBUG level. Here is an image of my state in Cloud UI
And the latest iteration of the script I’m attempting to run
k
Hey @Serdar Tumgoren, it seems like your flow is not able to get the Kubernetes resources. Have you debugged on that side?
Does this work with a LocalRun() + DaskExecutor()?
s
I tried dumping logs for pads but I’ll admit i’m brand new to K8S so operating a bit in the dark on even how to debug 😕
Yep, it works locally with LocalExecutor and LocalDaskExecutor. Not sure if I’ve tested with DaskExecutor locally but I can try that too if it helps
k
Me too. 😅 I suggest we just try LocalRun first to see if we can connect to that cluster and run. LocalRun + DaskExecutor
s
oh wait i think i’m confused. I probably did try DaskExecutor locally (is there even a LocalDaskExecutor?)
k
There is but that’s the executor
LocalRun
is a config that would replace
KubernetesRun
s
ooooh, ok.
k
And then you spin up a LocalAgent on your laptop to run it (match the Flow and Agent labels)
s
do i just swap out LocalRun for KubernetesRun, or do I have to pass in some configs such as the remote address for the K8S cluster?
k
No that’s exactly right. We’re gonna debug to see if k8s is the issue. We will avoid k8s by using LocalRun
s
hm…ok, i haven’t used labels yet. any chance you have a reference implementation and/or sample CLI commands I’d need?
ok, let me swap in LocalRun
Flows can only be picked up by Agents that have the same labels (agent labels being a superset of flow labels)
s
Aha. Ok, think i vaguely recall that from skimming those docs a ways back
k
But you said
KubernetesRun
with
LocalDaskExcutor
worked?
s
No, so far nothing with KubernetesRun has worked. haven’t tested K8S locally
only have been testing Kubernetes in a GKE context
k
Gotcha.
LocalRun
is not gonna use Kubernetes just to clarify. Just wanna see LocalRun + DaskExecutor works so we can isolate k8s
s
i built out the script locally using DaskExecutor. Then, once that was working, attempted to port it for Kubernetes, after learning how to spin up a static Kubernetes cluster with dask
right, that makes sense
ok, my local environment may be quite muddled
Copy code
OSError: Timed out trying to connect to <tcp://agenda-watch-scraping-dask-scheduler:8786> after 30 s
└── 15:59:55 | ERROR   | Unexpected error occured in FlowRunner: OSError('Timed out trying to connect to <tcp://agenda-watch-scraping-dask-scheduler:8786> after 30 s')
But I suspect that’s because I set up a bunch of env variables in order to proxy localhost to the remote dask cluster after using gcloud and kubectl to set up the GKE cluster
Have to step away for a bit, but let me know if you have any advice on next steps
k
But doesn’t the log have the correct address? You’re saying that might be hitting localhost? I guess next step is remove the env vars and then test? If we can’t get local to hit the Dask cluster at all, I am wondering if the Dask cluster can even accept connections. Maybe you can also check if you see any pods created by Prefect in GKE?
s
Apologies, that earlier error was due to me forgetting to remove the
tcp
address from the DaskExecutor line (which I had configured while attempting to debug in K8S). After changing to plain old
flow.executor = DaskExecutor()
and fixing an import bug, the modified script works locally. Attaching latest version
k
Did you need any configuration to connect? I assume no right? Then I guess it’s Kubernetes related. I’ll read up myself tonight and or find someone who knows more than me
s
So I didn’t need any configuration for DaskExecutor, which I assumed spins up Dask locally for the flow run. But I haven’t tested configuring DaskExecutor to attempt to connect to the remote Dask cluster running on GKE. Should I be attempting to do that?
@Kevin Kho Ok, let me know if you dig anything up. As always, really appreciate the help!
w
Yeah, you’re going to want to tell DaskExecutor to use Kubernetes probably. I just, I believe, got this working today..
where I’m doing:
Copy code
from prefect.executors import DaskExecutor

from dask_kubernetes import KubeCluster, make_pod_spec

pod_spec = make_pod_spec(image='daskdev/dask:latest',
                        memory_limit='4G', memory_request='4G',
                        cpu_limit=1, cpu_request=1,
                        env={'EXTRA_PIP_PACKAGES': 'prefect'})


executor = DaskExecutor(
   cluster_class=KubeCluster,
   cluster_kwargs=dict(
       namespace="prefect",
       pod_template=pod_spec,
       n_workers=1,
   )
)
(where
prefect
is the namespace I chose to create in advance)
(it silently hangs forever if you don’t specify
n_workers
, at the moment)
s
@Wilson Bilkovich Interesting. I’ll give that a shot as well. Many thanks for sharing!
w
Hopefully I’m not leading you astray, good luck
s
Heh, not at all. It’s worth exploring options as I haven’t gotten anything to work as of yet! I see you’re using KubeCluster, which appears to be for dynamic deployments of Dask. I’ve been testing against a static, pre-existing Dask cluster deployed using Helm, so perhaps I may need to adapt my general approach. But I’ll look into it. Thanks again!
w
There’s a matching class called HelmCluster that’s intended for your use case
👍 1
though I haven’t tried it yet, it’s on my todo list
s
@Wilson Bilkovich I'll give Helm a try but your approach with KubeCluster is ultimately where I'm trying to get (i.e. dynamically spinning up an ephemeral Dask cluster for a Flow run and then tearing it down). Appreciate the pointers!
w
Good luck; seems like a fun library
Oh, and here’s a related Gist that I found useful https://gist.github.com/Ogaday/53e00fd89f5985a3633888216d91a6f6
(The KubeCluster.from_yaml call the gist makes is deprecated these days, however; they want us to use the constructor directly)
s
This is perfect! Thanks!
@Wilson Bilkovich Alas, I attempted the KubeCluster approach but I’m getting the same issue (Job appears to be picked up by K8S agent, but hangs in submitted state and eventually times out). Feels like I may have botched some other part of the setup, for example in how I initially configured the K8S agent. I used the below command to set up an “in-cluster” agent. Wondering if you or @Kevin Kho can offer any advice on my approach there and any details I should be aware of with respect to agents and the broader K8S cluster on GKE…
prefect agent kubernetes install --rbac --namespace=default --service-account-name prefect -t MY_TOKEN | kubectl apply --namespace=default -f -
k
I am midway through walking through it myself, this looks good though. You can try
KubernetesRun
and
LocalExecutor
to see if that works too?
s
@Kevin Kho Ok, I’ll give that a try now. Thanks!
w
I switched from using the ‘agent kubernetes install’ yaml, to letting the prefect-server Helm chart set up an Agent for me with
--set agent.enabled=true
Not sure if that fixed anything, but it seemed simpler
s
@Wilson Bilkovich Is that a flag for prefect server cli command? I’m using Cloud as backend…
w
Aah, my bad, I thought you might be using Server. That’s for the Server install chart
I guess the prefect-cli YAML-generator is the correct thing to do for Cloud. I wouldn’t be surprised if there were some env variable values to change in what it outputs though.
👍 1
s
No worries. I am after all posting in the prefect-server channel 🙂 I’ll keep that in my back pocket though since I also occasionally test on local with prefect server
k
I am up and running now for
KubernetesRun
with no
DaskExecutor
so let me know if
LocalExecutor
works for you, otherwise I’ll continue and spin up a Dask cluster
s
@Kevin Kho Any chance I could see your flow script to make sure I’m doing things similarly?
k
You could but it’s a copy of yours for the most part. I just changed storage to Github so it’s public and I can run without authentication. You can even grab this as is and it should work
s
@Kevin Kho Aha, so not specifying an executor will default to LocalExecutor?
k
Yes that’s right
s
@Kevin Kho Darn. Using KubernetesRun and LocalExecutor still leaves flow run hanging in submitted state… 😕
k
Ah ok then I guess it’s the agent job templated we can compare
s
is that the job template produced by the
prefect agent kubernetes install
command that’s piped to
kubectl
?
k
Yep I used the same process. just added namespace and rbac but double checking
👍 1
w
Do you mind if I share my example flow? I’m trying to figure out basically the same setup
s
shout back if i should post the yaml produced by my install command
@Wilson Bilkovich would be great to see another example!
(f.run() could also be f.register(“demo”) or f.visualize() etc)
I’m running prefect server + the agent it can be told to install.. but I don’t see the log output I’d expect when I run it non-locally.
Copy code
% k get pods
NAME                                             READY   STATUS      RESTARTS   AGE
prefect-server-initial-agent-5f76b4fd69-xmdxl    1/1     Running     4          2d20h
prefect-server-initial-apollo-586cbdffb5-pbdrr   1/1     Running     1          2d20h
prefect-server-initial-create-tenant-job-hw74c   0/1     Completed   4          2d20h
prefect-server-initial-graphql-b5489bbf6-kcnzg   1/1     Running     0          2d20h
prefect-server-initial-hasura-68448b469b-jr4bz   1/1     Running     2          2d20h
prefect-server-initial-postgresql-0              1/1     Running     0          2d20h
prefect-server-initial-towel-656fdc954b-5fvjh    1/1     Running     0          2d20h
prefect-server-initial-ui-5d4f9ff55-69csx        1/1     Running     0          2d20h
The agent says
[2021-08-17 20:57:40,279] INFO - agent | Registration successful!
and
[2021-08-17 20:57:40,291] INFO - agent | Waiting for flow runs...
but doesn’t seem to acquire any work
k
I did
prefect agent kubernetes install --rbac --key XXX
but then I changed my image so here is my final yaml , just took out the key to not expose it. and then I used
kubectl apply --namespace=default -f agent.yaml
where
agent.yaml
is the file from the github gist
For you Wilson, if it’s not acquiring any work, it might be related to labels? Does it just remain as Scheduled in the UI?
w
Aah, yeah, the registered job in the UI shows the label of the hostname I registered it from
whereas the agent says labels = [ ]
k
You can disable that from your storage (
add_default_labels=None
), which is from the base Storage class
@Serdar Tumgoren, if this still fails we’re gonna compare GKE setups lol
s
@Kevin Kho looks like our yaml templates are largely identical but one diff is that my auto-generated version is placing the key in PREFECT__CLOUD__AGENT__AUTH_TOKEN as opposed to PREFECT__CLOUD__API_KEY
w
I’m not even configuring any storage, which is presumably my problem
k
Default is Local @Wilson Bilkovich, and has that label on by default.
w
“Note that if you register a Flow with Prefect Cloud using this storage, your flow will automatically be labeled with your machine’s hostname. This ensures that only agents that are known to be running on the same filesystem can run your flow.” aha, I see now
k
That makes sense @Serdar Tumgoren, if you are below 0.15.0,
key
is a 0.15.0 thing. the auth system was changed to use keys
s
@Kevin Kho I should be on the latest (0.15.3?). So i should leave my yaml as is?
k
Oh yeah you can’t run on k8s because LocalStorage saves on your local filesystem so the pods won’t be able to retrieve your flow
Can you run
prefect diagnostics
? @Serdar Tumgoren
You need to store somewhere accessible to the pods (Docker, AWS, Github, etc) @Wilson Bilkovich
w
Yeah, obvious in retrospect, hmm.. Github would be nice, but suddenly requires me to inject secrets
k
For Cloud, you store the secret with us, and then specify the secret name to use to pull down the Flow and Prefect will use that to load your Flow
s
k
I believe token will work and shouldn’t be the problem, but just in case, could you try copying my agent yaml? Delete the previous pods and create a new one?
s
sure. lemme dig into docs to refresh on deleting a pod (also, if it helps, at this point I only have one pod running with a single prefect agent container; i.e. i haven’t provisioned dask via helm, in case that’s an issue)
Copy code
NAME                            READY   STATUS    RESTARTS   AGE
prefect-agent-6c46558df-xjxlg   1/1     Running   0          116m
k
Would love to learn if there is a better way to do this (Wilson? lol) but I’ve been doing
kubectl delete pod --name prefect-agent-6b9b7957b7-wc6hb
. Oh yeah I just wanna make sure we hit the right agent this time (the new one)
👍 1
s
Yep, that seemed to work nicely
@Kevin Kho That worked!!!
k
Were you by chance using
--token
instead of
--key
?
s
(should mention that I ran your flow, i.e. i switched over to a GitHub storage that pointed to your GH repo and flow)
k
That’s ok. That’s a good start 👍. I think you might need to provide environment variables to authenticate to pull from that Google storage though. (in the agent.yaml)
👍 1
s
No, i left the key in PREFECT__CLOUD__AGENT__AUTH_TOKEN and made the PREFECT__CLOUD__API_KEY an empty string. otherwise used your agent yaml as is
Seems like the fix here was deleting my initial pod and recreating based on your specs
Is the next step to switch the executor to DaskExecutor?
k
I believe you should have no problem with that using the DaskExecutor? I didnt have any new specs so I don’t know what changed 🤷. That seems logical and then getting storage to work with the right environment variables.
👍 1
s
oh, one other potentially significant difference between our yaml files is that mine specified
prefect
as the
SERVICE_ACCOUNT_NAME
wonder if that could cause an issue with the agent executing the flow?
(your setting for
SERVICE_ACCOUNT_NAME
is blank
k
That’s right I can see that being it
s
Cool. Ok, so one other super basic question. If I cut over to DaskExecutor, do I need to pass in a
tcp
address setting and do any other configuration in the flow script or at the pod layer?
w
I believe the answer for DaskExecutor is yes, but you’d want to use the kubectx probably if you end up using the Kube Dask stuff like KubeCluster
👍 1
s
Aha, ok, just tried reverting to GCS storage and got a 403, so I suppose that i’ve come full circle here to permissions related to accessing the flow. Early in my process i had created a GCP service account called
prefect
with (what I hoped) were appropriate roles, which is how i think i ended up with the
prefect
setting for SERVICE_ACCOUNT_NAME. I’ll try debugging that now
k
If connecting to an existing Dask Cluster, it would be
Copy code
executor = DaskExecutor(address="192.0.2.255:8786")
like detailed here . What Wilson is saying is if you pass in a callable Dask cluster class like
KubeCluster
to the exectuor and then it creates those workers on the fly for you. For an existing cluster, I don’t think there would be more configuration. That makes sense, I think you can use GCS Storage by passing the env vars to the agent. I think if you follow this , it would be
Copy code
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"
I don’t know if the best way is to copy that json into your own container. or add it to the pod as a ConfigMap?
s
Ok, yep, i’ve been using the env approach when registering the flow but i can explore the alternatives you mentioned. However, now i’m unable to delete and recreate the pods. k8s keeps auto-creating a pod after i run
kubectl delete pods <podname>
k
Ah ok I didn’t encounter that but my understanding is that this happens because you have a service with X number of replicas. So if you delete a pod in a service, k8s will create one again to fulfill the number of replicas.
w
Yeah, you want to delete the
deployment
if you’d like it to stay gone
👍 1
s
Aha. ok, i suspect might be related to the GKE setup i’m using. I followed the basic docs from Zero to JupyterHub on GKE (basically the steps on this page for single-user: https://zero-to-jupyterhub.readthedocs.io/en/latest/kubernetes/google/step-zero-gcp.html)
w
deleting the pod just makes k8s decide to replace it, and is the standard way to do a ‘restart’
s
@Wilson Bilkovich Aha, i see. I’ll just tear down and recreate the whole deployment. Thanks!
Hey @Kevin Kho , I’m using KubernetesRun + LocalExecutor to keep things “simple” while I attempt to implement usage of GCS as flow storage. I’ve configured KubernetesRun to accept
GOOGLE_APPLICATION_CREDENTIALS=/tmp/secret.json
and, as quick-and-dirty shortcut, copied my GCP credentials JSON file to that /tmp location in the pod. Alas, I’m getting a
Failed to load and execute Flow's environment: DefaultCredentialsError('File /tmp/secrets.json was not found.')
error when I attempt to run via Cloud. Wondering if there’s something very dumb I’m doing here? Should I avoid this hackish approach and instead create a container image that contains the gcp credentials file, post it to a private registry, and then specify that image in KubernetesRun?
If it helps, here’s the relevant portion of my Flow:
Copy code
flow.run_config = KubernetesRun(
    env={
        'GOOGLE_APPLICATION_CREDENTIALS': '/tmp/secrets.json',
    }
)
flow.executor = LocalExecutor()
flow.storage = GCS(
    bucket='aw-prefect-flows',
    project='hai-gcp-augmenting',
)
k
The syntax around this is tricky I think. Will try it myself then get back to you.
s
Thanks! Let me know what you find. For now, I’m experimenting with building a custom docker image for KubernetesRun and wrestling with how to enable a service account to pull the image from GCP Artifact Registry via image pull secrets
@Kevin Kho Heads up that I got the KubernetesRun + DaskExecutor working with a custom docker image hosted on GCP Artifact Regsitry!! Just wanted to mention so you don’t invest time in debugging on this front (know you’re very busy!). I’ll experiment next with @Wilson Bilkovich’s KubeControl approach and will report back on results. Thanks to you both for all the help throughout!!
k
Hey @Serdar Tumgoren! Thanks for circling back and good to hear! I’ll still likely try it for my own understanding while I have GKE setup, but thanks for letting me know there’s no particular rush for it 😄
👍 1
s
@Kevin Kho Please do send along whatever you cook up (I can use all the reference material I can get my hands on!). Thanks!
@Kevin Kho I seem to have a working KubeCluster flow: https://gist.github.com/zstumgoren/1d7044038757c2e6f5b2c38ba2c3c651 Any chance you could give me a sanity check next week to make sure I’m not doing anything incorrect? In particular, wondering if KubernetesRun is the proper approach here…
k
Hey, it looks good me to me and from what I've seen from this community. Wilson will probably be able to verify better on the k8s part and I don't know if you saw the gist he posted but it looks similar
s
Sounds good and thanks @Kevin Kho! My next step will be to test saving the downloaded files to a cloud bucket (similar to what @Wilson Bilkovich appears to be doing in his most recent gist). Will report back on the results