https://prefect.io logo
Title
s

Serdar Tumgoren

08/19/2021, 10:46 PM
Hey all, I’m trying to test Prefect Cloud on Kubernetes (GKE) with DaskExecutor and GCS storage, based loosely on the instructions from Prefect https://docs.prefect.io/orchestration/agents/kubernetes.html#running-in-cluster I’m able to get a Kubernetes Agent to run (verified in Prefect Cloud UI). But when I register a basic job and trigger a manual run in the Cloud UI, the job hangs in pending state (submitted mode) and never executes. Feels like I’m botching something related to the networking layer, but I can’t seem to pinpoint the issue after attempting some of the debugging techniques detailed in this thread: https://prefect-community.slack.com/archives/C014Z8DPDSR/p1628175479115400 Can anyone provide advice or a reference example for GKE in particular?
If it helps, I’ve verified through the Dask web UI that tasks are not reaching Dask workers or even being scheduled, afaict. I’m also not seeing anything helpful in Cloud UI logs even when running at DEBUG level. Here is an image of my state in Cloud UI
And the latest iteration of the script I’m attempting to run
k

Kevin Kho

08/19/2021, 10:49 PM
Hey @Serdar Tumgoren, it seems like your flow is not able to get the Kubernetes resources. Have you debugged on that side?
Does this work with a LocalRun() + DaskExecutor()?
s

Serdar Tumgoren

08/19/2021, 10:50 PM
I tried dumping logs for pads but I’ll admit i’m brand new to K8S so operating a bit in the dark on even how to debug 😕
Yep, it works locally with LocalExecutor and LocalDaskExecutor. Not sure if I’ve tested with DaskExecutor locally but I can try that too if it helps
k

Kevin Kho

08/19/2021, 10:51 PM
Me too. 😅 I suggest we just try LocalRun first to see if we can connect to that cluster and run. LocalRun + DaskExecutor
s

Serdar Tumgoren

08/19/2021, 10:51 PM
oh wait i think i’m confused. I probably did try DaskExecutor locally (is there even a LocalDaskExecutor?)
k

Kevin Kho

08/19/2021, 10:52 PM
There is but that’s the executor
LocalRun
is a config that would replace
KubernetesRun
s

Serdar Tumgoren

08/19/2021, 10:52 PM
ooooh, ok.
k

Kevin Kho

08/19/2021, 10:53 PM
And then you spin up a LocalAgent on your laptop to run it (match the Flow and Agent labels)
s

Serdar Tumgoren

08/19/2021, 10:53 PM
do i just swap out LocalRun for KubernetesRun, or do I have to pass in some configs such as the remote address for the K8S cluster?
k

Kevin Kho

08/19/2021, 10:53 PM
No that’s exactly right. We’re gonna debug to see if k8s is the issue. We will avoid k8s by using LocalRun
s

Serdar Tumgoren

08/19/2021, 10:53 PM
hm…ok, i haven’t used labels yet. any chance you have a reference implementation and/or sample CLI commands I’d need?
ok, let me swap in LocalRun
Flows can only be picked up by Agents that have the same labels (agent labels being a superset of flow labels)
s

Serdar Tumgoren

08/19/2021, 10:55 PM
Aha. Ok, think i vaguely recall that from skimming those docs a ways back
k

Kevin Kho

08/19/2021, 10:55 PM
But you said
KubernetesRun
with
LocalDaskExcutor
worked?
s

Serdar Tumgoren

08/19/2021, 10:56 PM
No, so far nothing with KubernetesRun has worked. haven’t tested K8S locally
only have been testing Kubernetes in a GKE context
k

Kevin Kho

08/19/2021, 10:57 PM
Gotcha.
LocalRun
is not gonna use Kubernetes just to clarify. Just wanna see LocalRun + DaskExecutor works so we can isolate k8s
s

Serdar Tumgoren

08/19/2021, 10:57 PM
i built out the script locally using DaskExecutor. Then, once that was working, attempted to port it for Kubernetes, after learning how to spin up a static Kubernetes cluster with dask
right, that makes sense
ok, my local environment may be quite muddled
OSError: Timed out trying to connect to <tcp://agenda-watch-scraping-dask-scheduler:8786> after 30 s
└── 15:59:55 | ERROR   | Unexpected error occured in FlowRunner: OSError('Timed out trying to connect to <tcp://agenda-watch-scraping-dask-scheduler:8786> after 30 s')
But I suspect that’s because I set up a bunch of env variables in order to proxy localhost to the remote dask cluster after using gcloud and kubectl to set up the GKE cluster
Have to step away for a bit, but let me know if you have any advice on next steps
k

Kevin Kho

08/19/2021, 11:02 PM
But doesn’t the log have the correct address? You’re saying that might be hitting localhost? I guess next step is remove the env vars and then test? If we can’t get local to hit the Dask cluster at all, I am wondering if the Dask cluster can even accept connections. Maybe you can also check if you see any pods created by Prefect in GKE?
s

Serdar Tumgoren

08/19/2021, 11:33 PM
Apologies, that earlier error was due to me forgetting to remove the
tcp
address from the DaskExecutor line (which I had configured while attempting to debug in K8S). After changing to plain old
flow.executor = DaskExecutor()
and fixing an import bug, the modified script works locally. Attaching latest version
k

Kevin Kho

08/19/2021, 11:37 PM
Did you need any configuration to connect? I assume no right? Then I guess it’s Kubernetes related. I’ll read up myself tonight and or find someone who knows more than me
s

Serdar Tumgoren

08/19/2021, 11:39 PM
So I didn’t need any configuration for DaskExecutor, which I assumed spins up Dask locally for the flow run. But I haven’t tested configuring DaskExecutor to attempt to connect to the remote Dask cluster running on GKE. Should I be attempting to do that?
@Kevin Kho Ok, let me know if you dig anything up. As always, really appreciate the help!
w

Wilson Bilkovich

08/19/2021, 11:58 PM
Yeah, you’re going to want to tell DaskExecutor to use Kubernetes probably. I just, I believe, got this working today..
where I’m doing:
from prefect.executors import DaskExecutor

from dask_kubernetes import KubeCluster, make_pod_spec

pod_spec = make_pod_spec(image='daskdev/dask:latest',
                        memory_limit='4G', memory_request='4G',
                        cpu_limit=1, cpu_request=1,
                        env={'EXTRA_PIP_PACKAGES': 'prefect'})


executor = DaskExecutor(
   cluster_class=KubeCluster,
   cluster_kwargs=dict(
       namespace="prefect",
       pod_template=pod_spec,
       n_workers=1,
   )
)
(where
prefect
is the namespace I chose to create in advance)
(it silently hangs forever if you don’t specify
n_workers
, at the moment)
s

Serdar Tumgoren

08/20/2021, 12:03 AM
@Wilson Bilkovich Interesting. I’ll give that a shot as well. Many thanks for sharing!
w

Wilson Bilkovich

08/20/2021, 12:03 AM
Hopefully I’m not leading you astray, good luck
s

Serdar Tumgoren

08/20/2021, 12:05 AM
Heh, not at all. It’s worth exploring options as I haven’t gotten anything to work as of yet! I see you’re using KubeCluster, which appears to be for dynamic deployments of Dask. I’ve been testing against a static, pre-existing Dask cluster deployed using Helm, so perhaps I may need to adapt my general approach. But I’ll look into it. Thanks again!
w

Wilson Bilkovich

08/20/2021, 12:08 AM
There’s a matching class called HelmCluster that’s intended for your use case
👍 1
though I haven’t tried it yet, it’s on my todo list
s

Serdar Tumgoren

08/20/2021, 12:16 AM
@Wilson Bilkovich I'll give Helm a try but your approach with KubeCluster is ultimately where I'm trying to get (i.e. dynamically spinning up an ephemeral Dask cluster for a Flow run and then tearing it down). Appreciate the pointers!
w

Wilson Bilkovich

08/20/2021, 12:16 AM
Good luck; seems like a fun library
Oh, and here’s a related Gist that I found useful https://gist.github.com/Ogaday/53e00fd89f5985a3633888216d91a6f6
(The KubeCluster.from_yaml call the gist makes is deprecated these days, however; they want us to use the constructor directly)
s

Serdar Tumgoren

08/20/2021, 12:25 AM
This is perfect! Thanks!
@Wilson Bilkovich Alas, I attempted the KubeCluster approach but I’m getting the same issue (Job appears to be picked up by K8S agent, but hangs in submitted state and eventually times out). Feels like I may have botched some other part of the setup, for example in how I initially configured the K8S agent. I used the below command to set up an “in-cluster” agent. Wondering if you or @Kevin Kho can offer any advice on my approach there and any details I should be aware of with respect to agents and the broader K8S cluster on GKE…
prefect agent kubernetes install --rbac --namespace=default --service-account-name prefect -t MY_TOKEN | kubectl apply --namespace=default -f -
k

Kevin Kho

08/20/2021, 5:22 PM
I am midway through walking through it myself, this looks good though. You can try
KubernetesRun
and
LocalExecutor
to see if that works too?
s

Serdar Tumgoren

08/20/2021, 5:23 PM
@Kevin Kho Ok, I’ll give that a try now. Thanks!
w

Wilson Bilkovich

08/20/2021, 5:25 PM
I switched from using the ‘agent kubernetes install’ yaml, to letting the prefect-server Helm chart set up an Agent for me with
--set agent.enabled=true
Not sure if that fixed anything, but it seemed simpler
s

Serdar Tumgoren

08/20/2021, 5:27 PM
@Wilson Bilkovich Is that a flag for prefect server cli command? I’m using Cloud as backend…
w

Wilson Bilkovich

08/20/2021, 5:31 PM
Aah, my bad, I thought you might be using Server. That’s for the Server install chart
I guess the prefect-cli YAML-generator is the correct thing to do for Cloud. I wouldn’t be surprised if there were some env variable values to change in what it outputs though.
👍 1
s

Serdar Tumgoren

08/20/2021, 5:32 PM
No worries. I am after all posting in the prefect-server channel 🙂 I’ll keep that in my back pocket though since I also occasionally test on local with prefect server
k

Kevin Kho

08/20/2021, 5:32 PM
I am up and running now for
KubernetesRun
with no
DaskExecutor
so let me know if
LocalExecutor
works for you, otherwise I’ll continue and spin up a Dask cluster
s

Serdar Tumgoren

08/20/2021, 5:33 PM
@Kevin Kho Any chance I could see your flow script to make sure I’m doing things similarly?
k

Kevin Kho

08/20/2021, 5:34 PM
You could but it’s a copy of yours for the most part. I just changed storage to Github so it’s public and I can run without authentication. You can even grab this as is and it should work
s

Serdar Tumgoren

08/20/2021, 5:35 PM
@Kevin Kho Aha, so not specifying an executor will default to LocalExecutor?
k

Kevin Kho

08/20/2021, 5:40 PM
Yes that’s right
s

Serdar Tumgoren

08/20/2021, 5:46 PM
@Kevin Kho Darn. Using KubernetesRun and LocalExecutor still leaves flow run hanging in submitted state… 😕
k

Kevin Kho

08/20/2021, 5:46 PM
Ah ok then I guess it’s the agent job templated we can compare
s

Serdar Tumgoren

08/20/2021, 5:47 PM
is that the job template produced by the
prefect agent kubernetes install
command that’s piped to
kubectl
?
k

Kevin Kho

08/20/2021, 5:48 PM
Yep I used the same process. just added namespace and rbac but double checking
👍 1
w

Wilson Bilkovich

08/20/2021, 5:49 PM
Do you mind if I share my example flow? I’m trying to figure out basically the same setup
s

Serdar Tumgoren

08/20/2021, 5:49 PM
shout back if i should post the yaml produced by my install command
@Wilson Bilkovich would be great to see another example!
(f.run() could also be f.register(“demo”) or f.visualize() etc)
I’m running prefect server + the agent it can be told to install.. but I don’t see the log output I’d expect when I run it non-locally.
% k get pods
NAME                                             READY   STATUS      RESTARTS   AGE
prefect-server-initial-agent-5f76b4fd69-xmdxl    1/1     Running     4          2d20h
prefect-server-initial-apollo-586cbdffb5-pbdrr   1/1     Running     1          2d20h
prefect-server-initial-create-tenant-job-hw74c   0/1     Completed   4          2d20h
prefect-server-initial-graphql-b5489bbf6-kcnzg   1/1     Running     0          2d20h
prefect-server-initial-hasura-68448b469b-jr4bz   1/1     Running     2          2d20h
prefect-server-initial-postgresql-0              1/1     Running     0          2d20h
prefect-server-initial-towel-656fdc954b-5fvjh    1/1     Running     0          2d20h
prefect-server-initial-ui-5d4f9ff55-69csx        1/1     Running     0          2d20h
The agent says
[2021-08-17 20:57:40,279] INFO - agent | Registration successful!
and
[2021-08-17 20:57:40,291] INFO - agent | Waiting for flow runs...
but doesn’t seem to acquire any work
k

Kevin Kho

08/20/2021, 5:53 PM
I did
prefect agent kubernetes install --rbac --key XXX
but then I changed my image so here is my final yaml , just took out the key to not expose it. and then I used
kubectl apply --namespace=default -f agent.yaml
where
agent.yaml
is the file from the github gist
For you Wilson, if it’s not acquiring any work, it might be related to labels? Does it just remain as Scheduled in the UI?
w

Wilson Bilkovich

08/20/2021, 5:57 PM
Aah, yeah, the registered job in the UI shows the label of the hostname I registered it from
whereas the agent says labels = [ ]
k

Kevin Kho

08/20/2021, 6:01 PM
You can disable that from your storage (
add_default_labels=None
), which is from the base Storage class
@Serdar Tumgoren, if this still fails we’re gonna compare GKE setups lol
s

Serdar Tumgoren

08/20/2021, 6:02 PM
@Kevin Kho looks like our yaml templates are largely identical but one diff is that my auto-generated version is placing the key in PREFECT__CLOUD__AGENT__AUTH_TOKEN as opposed to PREFECT__CLOUD__API_KEY
w

Wilson Bilkovich

08/20/2021, 6:03 PM
I’m not even configuring any storage, which is presumably my problem
k

Kevin Kho

08/20/2021, 6:03 PM
Default is Local @Wilson Bilkovich, and has that label on by default.
w

Wilson Bilkovich

08/20/2021, 6:04 PM
“Note that if you register a Flow with Prefect Cloud using this storage, your flow will automatically be labeled with your machine’s hostname. This ensures that only agents that are known to be running on the same filesystem can run your flow.” aha, I see now
k

Kevin Kho

08/20/2021, 6:04 PM
That makes sense @Serdar Tumgoren, if you are below 0.15.0,
key
is a 0.15.0 thing. the auth system was changed to use keys
s

Serdar Tumgoren

08/20/2021, 6:04 PM
@Kevin Kho I should be on the latest (0.15.3?). So i should leave my yaml as is?
k

Kevin Kho

08/20/2021, 6:04 PM
Oh yeah you can’t run on k8s because LocalStorage saves on your local filesystem so the pods won’t be able to retrieve your flow
Can you run
prefect diagnostics
? @Serdar Tumgoren
You need to store somewhere accessible to the pods (Docker, AWS, Github, etc) @Wilson Bilkovich
w

Wilson Bilkovich

08/20/2021, 6:06 PM
Yeah, obvious in retrospect, hmm.. Github would be nice, but suddenly requires me to inject secrets
k

Kevin Kho

08/20/2021, 6:07 PM
For Cloud, you store the secret with us, and then specify the secret name to use to pull down the Flow and Prefect will use that to load your Flow
s

Serdar Tumgoren

08/20/2021, 6:07 PM
k

Kevin Kho

08/20/2021, 6:07 PM
I believe token will work and shouldn’t be the problem, but just in case, could you try copying my agent yaml? Delete the previous pods and create a new one?
s

Serdar Tumgoren

08/20/2021, 6:10 PM
sure. lemme dig into docs to refresh on deleting a pod (also, if it helps, at this point I only have one pod running with a single prefect agent container; i.e. i haven’t provisioned dask via helm, in case that’s an issue)
NAME                            READY   STATUS    RESTARTS   AGE
prefect-agent-6c46558df-xjxlg   1/1     Running   0          116m
k

Kevin Kho

08/20/2021, 6:12 PM
Would love to learn if there is a better way to do this (Wilson? lol) but I’ve been doing
kubectl delete pod --name prefect-agent-6b9b7957b7-wc6hb
. Oh yeah I just wanna make sure we hit the right agent this time (the new one)
👍 1
s

Serdar Tumgoren

08/20/2021, 6:14 PM
Yep, that seemed to work nicely
@Kevin Kho That worked!!!
k

Kevin Kho

08/20/2021, 6:17 PM
Were you by chance using
--token
instead of
--key
?
s

Serdar Tumgoren

08/20/2021, 6:17 PM
(should mention that I ran your flow, i.e. i switched over to a GitHub storage that pointed to your GH repo and flow)
k

Kevin Kho

08/20/2021, 6:18 PM
That’s ok. That’s a good start 👍. I think you might need to provide environment variables to authenticate to pull from that Google storage though. (in the agent.yaml)
👍 1
s

Serdar Tumgoren

08/20/2021, 6:19 PM
No, i left the key in PREFECT__CLOUD__AGENT__AUTH_TOKEN and made the PREFECT__CLOUD__API_KEY an empty string. otherwise used your agent yaml as is
Seems like the fix here was deleting my initial pod and recreating based on your specs
Is the next step to switch the executor to DaskExecutor?
k

Kevin Kho

08/20/2021, 6:23 PM
I believe you should have no problem with that using the DaskExecutor? I didnt have any new specs so I don’t know what changed 🤷. That seems logical and then getting storage to work with the right environment variables.
👍 1
s

Serdar Tumgoren

08/20/2021, 6:24 PM
oh, one other potentially significant difference between our yaml files is that mine specified
prefect
as the
SERVICE_ACCOUNT_NAME
wonder if that could cause an issue with the agent executing the flow?
(your setting for
SERVICE_ACCOUNT_NAME
is blank
k

Kevin Kho

08/20/2021, 6:26 PM
That’s right I can see that being it
s

Serdar Tumgoren

08/20/2021, 6:28 PM
Cool. Ok, so one other super basic question. If I cut over to DaskExecutor, do I need to pass in a
tcp
address setting and do any other configuration in the flow script or at the pod layer?
w

Wilson Bilkovich

08/20/2021, 6:31 PM
I believe the answer for DaskExecutor is yes, but you’d want to use the kubectx probably if you end up using the Kube Dask stuff like KubeCluster
👍 1
s

Serdar Tumgoren

08/20/2021, 6:34 PM
Aha, ok, just tried reverting to GCS storage and got a 403, so I suppose that i’ve come full circle here to permissions related to accessing the flow. Early in my process i had created a GCP service account called
prefect
with (what I hoped) were appropriate roles, which is how i think i ended up with the
prefect
setting for SERVICE_ACCOUNT_NAME. I’ll try debugging that now
k

Kevin Kho

08/20/2021, 6:40 PM
If connecting to an existing Dask Cluster, it would be
executor = DaskExecutor(address="192.0.2.255:8786")
like detailed here . What Wilson is saying is if you pass in a callable Dask cluster class like
KubeCluster
to the exectuor and then it creates those workers on the fly for you. For an existing cluster, I don’t think there would be more configuration. That makes sense, I think you can use GCS Storage by passing the env vars to the agent. I think if you follow this , it would be
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"
I don’t know if the best way is to copy that json into your own container. or add it to the pod as a ConfigMap?
s

Serdar Tumgoren

08/20/2021, 6:42 PM
Ok, yep, i’ve been using the env approach when registering the flow but i can explore the alternatives you mentioned. However, now i’m unable to delete and recreate the pods. k8s keeps auto-creating a pod after i run
kubectl delete pods <podname>
k

Kevin Kho

08/20/2021, 6:45 PM
Ah ok I didn’t encounter that but my understanding is that this happens because you have a service with X number of replicas. So if you delete a pod in a service, k8s will create one again to fulfill the number of replicas.
w

Wilson Bilkovich

08/20/2021, 6:47 PM
Yeah, you want to delete the
deployment
if you’d like it to stay gone
👍 1
s

Serdar Tumgoren

08/20/2021, 6:47 PM
Aha. ok, i suspect might be related to the GKE setup i’m using. I followed the basic docs from Zero to JupyterHub on GKE (basically the steps on this page for single-user: https://zero-to-jupyterhub.readthedocs.io/en/latest/kubernetes/google/step-zero-gcp.html)
w

Wilson Bilkovich

08/20/2021, 6:47 PM
deleting the pod just makes k8s decide to replace it, and is the standard way to do a ‘restart’
s

Serdar Tumgoren

08/20/2021, 6:48 PM
@Wilson Bilkovich Aha, i see. I’ll just tear down and recreate the whole deployment. Thanks!
Hey @Kevin Kho , I’m using KubernetesRun + LocalExecutor to keep things “simple” while I attempt to implement usage of GCS as flow storage. I’ve configured KubernetesRun to accept
GOOGLE_APPLICATION_CREDENTIALS=/tmp/secret.json
and, as quick-and-dirty shortcut, copied my GCP credentials JSON file to that /tmp location in the pod. Alas, I’m getting a
Failed to load and execute Flow's environment: DefaultCredentialsError('File /tmp/secrets.json was not found.')
error when I attempt to run via Cloud. Wondering if there’s something very dumb I’m doing here? Should I avoid this hackish approach and instead create a container image that contains the gcp credentials file, post it to a private registry, and then specify that image in KubernetesRun?
If it helps, here’s the relevant portion of my Flow:
flow.run_config = KubernetesRun(
    env={
        'GOOGLE_APPLICATION_CREDENTIALS': '/tmp/secrets.json',
    }
)
flow.executor = LocalExecutor()
flow.storage = GCS(
    bucket='aw-prefect-flows',
    project='hai-gcp-augmenting',
)
k

Kevin Kho

08/21/2021, 1:26 AM
The syntax around this is tricky I think. Will try it myself then get back to you.
s

Serdar Tumgoren

08/21/2021, 1:29 AM
Thanks! Let me know what you find. For now, I’m experimenting with building a custom docker image for KubernetesRun and wrestling with how to enable a service account to pull the image from GCP Artifact Registry via image pull secrets
@Kevin Kho Heads up that I got the KubernetesRun + DaskExecutor working with a custom docker image hosted on GCP Artifact Regsitry!! Just wanted to mention so you don’t invest time in debugging on this front (know you’re very busy!). I’ll experiment next with @Wilson Bilkovich’s KubeControl approach and will report back on results. Thanks to you both for all the help throughout!!
k

Kevin Kho

08/21/2021, 11:46 PM
Hey @Serdar Tumgoren! Thanks for circling back and good to hear! I’ll still likely try it for my own understanding while I have GKE setup, but thanks for letting me know there’s no particular rush for it 😄
👍 1
s

Serdar Tumgoren

08/21/2021, 11:51 PM
@Kevin Kho Please do send along whatever you cook up (I can use all the reference material I can get my hands on!). Thanks!
@Kevin Kho I seem to have a working KubeCluster flow: https://gist.github.com/zstumgoren/1d7044038757c2e6f5b2c38ba2c3c651 Any chance you could give me a sanity check next week to make sure I’m not doing anything incorrect? In particular, wondering if KubernetesRun is the proper approach here…
k

Kevin Kho

08/22/2021, 3:33 AM
Hey, it looks good me to me and from what I've seen from this community. Wilson will probably be able to verify better on the k8s part and I don't know if you saw the gist he posted but it looks similar
s

Serdar Tumgoren

08/22/2021, 4:13 AM
Sounds good and thanks @Kevin Kho! My next step will be to test saving the downloaded files to a cloud bucket (similar to what @Wilson Bilkovich appears to be doing in his most recent gist). Will report back on the results