im trying out prefect for our etl pipelines. we us...
# ask-community
a
im trying out prefect for our etl pipelines. we use GCP as our cloud provider. Id like to use prefect cloud and run agents in GCP. is the best option for doing this the vertex agent? If I do use that, what is the process for getting started? do i just create the agents at the beginning via cli and then leave them running, or do i have to continuously update them? is there a way to set up ci/cd for my flow code?
k
Hi @Andrew Lawlor, this blog used Vertex, but you also have Google Kubernetes Engine and you can also just use a VM. Agents always have to be running. For Flow deployment patterns, you can start here
a
thx ill investigate these
i followed the above examples and have tried starting a docker agent on a GCP VM instance. it hasnt been working and after i ssh'ed into the vm, i see it had this error docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2 , 'No such file or directory')) my dockerfile looks like FROM prefecthq/prefect:latest-python3.9 CMD prefect agent docker start --key can anyone help?
k
I think it doesnt have docker installed? What is the output of
docker run hello-world
?
a
that works when i run it in the shell of my machine. not sure what it would look like as the last line of the dockerfile. Unable to find image 'hello-world:latest' locally latest: Pulling from library/hello-world 2db29710123e: Pull complete Digest: sha256:507ecde44b8eb741278274653120c2bf793b174c06ff4eaa672b713b3263477b Status: Downloaded newer image for hello-world:latest Hello from Docker! This message shows that your installation appears to be working correctly. To generate this message, Docker took the following steps: 1. The Docker client contacted the Docker daemon. 2. The Docker daemon pulled the "hello-world" image from the Docker Hub. (amd64) 3. The Docker daemon created a new container from that image which runs the executable that produces the output you are currently reading. 4. The Docker daemon streamed that output to the Docker client, which sent it to your terminal. To try something more ambitious, you can run an Ubuntu container with: $ docker run -it ubuntu bash Share images, automate workflows, and more with a free Docker ID: https://hub.docker.com/ For more examples and ideas, visit: https://docs.docker.com/get-started/
k
Could you give me a longer traceback of the first one?
a
Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 214, in _retrieve_server_version return self.version(api_version=False)["ApiVersion"] File "/usr/local/lib/python3.9/site-packages/docker/api/daemon.py", line 181, in version return self._result(self._get(url), json=True) File "/usr/local/lib/python3.9/site-packages/docker/utils/decorators.py", line 46, in inner return f(self, *args, **kwargs) File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 237, in _get return self.get(url, **self._set_request_timeout(kwargs)) File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 542, in get return self.request('GET', url, **kwargs) File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 529, in request resp = self.send(prep, **send_kwargs) File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 645, in send r = adapter.send(request, **kwargs) File "/usr/local/lib/python3.9/site-packages/requests/adapters.py", line 501, in send raise ConnectionError(err, request=request) requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/bin/prefect", line 8, in <module> sys.exit(cli()) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1128, in call return self.main(*args, **kwargs) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1053, in main rv = self.invoke(ctx) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1395, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 754, in invoke return __callback(*args, **kwargs) File "/usr/local/lib/python3.9/site-packages/prefect/cli/agent.py", line 274, in start start_agent( File "/usr/local/lib/python3.9/site-packages/prefect/cli/agent.py", line 139, in start_agent agent = agent_cls(labels=labels, env_vars=env_vars, **kwargs) File "/usr/local/lib/python3.9/site-packages/prefect/agent/docker/agent.py", line 160, in init self.docker_client = self._get_docker_client() File "/usr/local/lib/python3.9/site-packages/prefect/agent/docker/agent.py", line 186, in _get_docker_client return docker.APIClient( File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 197, in init self._version = self._retrieve_server_version() File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 221, in _retrieve_server_version raise DockerException( docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2 , 'No such file or directory'))
k
The
docker run hello-world
was intended to be run on the machine so that looks like. I’m pretty confused because this error seems to say it can’t find Docker. What is the output of
docker version
?
a
Docker version 20.10.6, build 370c289
k
Oh sorry not version. That works even if Docker is off. Try
docker ps
a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c8e4121f7a1a gcr.io/development-273516/datapipeline:68b3ed3 "tini -g -- entrypoi…" 6 minutes ago Restarting (127) 6 seconds ago klt-prefect-pipeline-lltc ba11113bf413 gcr.io/stackdriver-agents/stackdriver-logging-agent:1.8.9 "/entrypoint.sh /usr…" 7 minutes ago Up 6 minutes stackdriver-logging-agent
k
This looks good let me look at what i can find
Maybe try this ? This is a very similar issue to yours. You can also try restarting the service
a
i tried running that in the vm and its still not working. but now all im seeing is i cant attach to the docker container over and over again bc it keeps restarting
k
Are you on ubuntu or centos?
a
i think its debian
k
https://github.com/docker/docker-py/issues/2455 it seems they closed this issue and the error is not related to the SDK
a
ya im sure the error is something im doing. im seeing lots of docs that say i can start an agent in a vm on gcp, but im not really a devops person so im not totally sure how to do that. is there a guide anywhere for the exact steps i need? i should note i did successfully start a local agent on my vm, but it didnt pick up my jobs. not sure whats going on with that either. i think i probably do want a docker agent anyway so i dont want to focus on that
k
I only have exact instructions for Kubernetes here, but unfortunately not Docker. I looked around and maybe this will help for just getting Docker on, and then we can get Prefect by installing Python. The Local Agent starting really points to a Docker issue. Also the “not picking jobs” is likely an easy fix with Labels
a
thx for the help. i found something similar to your first link and followed it with a new VM (with debian 10) and did get it running. however, i did get this new error Failed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.8.2', currently running with '3.7.12')") which makes sense because my local machine does have python 3.8 and the vm has 3.7. not really sure the best way around that tho, since i dont want to have to rely on my machine and the vm having the same python version. i will say im not tied to docker i just thought it would be easier than kubernetes since im not really familiar with kubernetes. but i could try kubernetes too
k
This error is just saying that you registered in 3.8 but the Flow image seems to be using the default 3.7. There are two places you can define the Prefect 3.7 image. Find the version you want here . And then you can either specify in: 1. If using Docker storage, the Docker storage class takes in a base image 2. If using another storage but DockerRun, DockerRun takes an image
Then you would just use
prefecthq/prefect:latest-python3.8
or something like that
a
where should i be registering them? i have been registering them on my local machine but it doesnt match exactly. i could have docker use the exact same image as my local (3.8.2) but since i want to set up ci/cd on registration anyway, id rather try running registration somewhere els
k
Are you using Prefect Cloud or Prefect Server (self-hosted)?
a
cloud
k
Then your registrations are against Prefect Cloud (wherever you register from), so if you register from Local machine, and run from the VM, the agent of the VM is responsible for loading that Flow (from the defined Storage) and executing it.
A couple of things to look at: 1. Registering your flow adds it to a Storage . The execution side will pull it from Storage to run the Flow. Because your execution is not where you registered from, you need to use some storage that can be pulled by the agent (GCS, Github, Docker, etc) 2. The agent gets the flow from Storage and runs in on top of the specified RunConfiguration . You can specify the image on the DockerRun . The image Python version just needs to match the version of registration (for consistent serialization and desrialization) 3. Docker storage can build an image, and DockerRun can use that image. In that case, just specify a
base_image
for Docker storage to make the versions match
a
ive been using GCS storage. it seemed like that was better since it wouldnt need to build a new image every time
k
That works. Just specify the `DockerRun(image="prefecthq/prefect:latest-python3.8")`and I think your Flow will work
a
i did, but i got Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ImportError(\'Using
prefect.tasks.mysql
requires Prefect to be installed with the "mysql" extra.\')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with \'3.8.2\', currently running with \'3.8.12\')\nThis also may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.') so i think i have to create my own img with mysql
k
Ah ok. Yes and no. For quick prototyping you can do this. But yes it is preferable to build the image yep.
a
im building the image in a gcp container registry but getting unauthorized errors when i try to use it. is there a way to pass credentials to the image?
k
The agent needs credentials to pull so you can set the
GOOGLE_APPLICATION_CREDENTIALS
env variable and point it to a json to authenticate
a
oh i didnt think i had to do that since im within a GCP VM
k
That’s a good point cuz AWS can do this if you add a role. I just checked and it seems you still have to configure
a
the extra_pip_packages isnt working, the jobs get picked up by agents but hang in a submitted state i also cant seem to get my dockerfile to work. think im gonna try out kubernetes. i think i eventually want kubernetes anyway and was trying docker to get my feet wet, but it seems like its still going to be a struggle for docker
actually nvm i guess i still need an image to run kubernetes so ill continue trying to get it to work with docker
k
I think the test would be
docker pull
on your VM from the registry and as long as that works, the Prefect agent should be able to pull
a
ok it seems to be working a little bit better now. it is now starting tasks on my vm instance. however, i am getting an issue with the MySQLFetch and MySQLExecute tasks. i was passing ssl certs to these when running locally, but when i have been running on GCP i pass a unix_socket instead. the SQLAlchemy engine looks like db = sqlalchemy.create_engine( sqlalchemy.engine.url.URL( drivername='mysql+pymysql', username=db_user, password=db_password, database=db_name, query={"unix_socket": connection_name)}, ), ) is it possible to do this with the MySQLFetch and MySQLExecute tasks?
k
I guess not, or it would be different because the MySQL task uses
pymysql
, so I would suggest you may just want to make your own task for that?
a
ok
ok so im using kubernetes run and GKE now. i am still getting the mysql error. i followed a google tutorial here to connect via a sidecar in my kubernetes pod. the sidecar is set up in the same pod as my agent. how do sidecars work with kubernetesrun instances? does it spin up a new pod with my docker image to run my flow? if so, how do i ensure that the new pod also has my sidecar? the logs seem to indicate a new pod being spun up, and it seems to not have the sidecar set up
actually i set up a job template a la this guide and it seems to work a little better, but now im seeing Message: Error: container has runAsNonRoot and image will run as root
k
what was the MySQL error? Trying to find it but could not? am a bit confused how we got to Docker in Docker? 😅
a
its an unauthorized error. gcp has specific ways to connect to their cloud sql via a cloud sql proxy, so im following their guide to connect via kubernetes
but im a little confused on how kubernetes works. their guide is to deploy a sidecar container with a docker image that they provide for connection to mysql, and then to talk to mysql via that on localhost with a user and pw. i deployed the sidecar in the agent k8s.config but it didnt seem to work. how do i ensure the sidecar is always deployed with the instance of every run?
k
wow that sounds so complicated. you can do it with the job template I guess like in the example you showed?
a
thats what im trying but i am very confused. what exactly is the job template doing? why do i have both that and a kubernetes config for the agent? whats the difference between them?
k
The agent job template is a default one. If there is none in KubernetesRun, that will be used. If both are supplied, RunConfig takes precedence. So think Agent is default and Flow can come in and override it
a
oh so i should specify a job template in the agent if i want it to apply to everything?
k
Yes. I think this may clear things up. The agent already has a default one actually
a
ok. this is working. thank you for your help. i had confused myself on how kubernetes pods were deployed.
do you have any documentation handy on sharing tasks between flows?
k
Not specifically, but that would be similar to our task library
a
im trying to create a script that registers my flows on a git push. im getting an unauthenticated error with prefect. how do i login with my api key in code?
k
If using CLI
prefect auth login -k API_KEY
. If using code, I think you need to use
client.save_auth_to_disk
like this
a
is there a way to know how much memory i should be using for a pod? i regularly see this message when spinning up pods:
message: 0/3 nodes are available: 3 Insufficient cpu, 3 Insufficient memory.
k
Isn’t this saying that there isn’t even nodes available though?
a
i read it as there are 3 nodes and. all had insufficient cpu and insufficient memory
k
Ah ok. The answer though is there isn’t a good way ahead of time (at least that I know of)
a
ok. im memory and cpu requests in the run_config like so
Copy code
run_config = KubernetesRun(
cpu_request=0.5,
memory_request="2Gi",
)
is that the correct place to do it? and is there a good place to tell after the fact?
k
I personally don’t know a good way because the pod gets cleaned after execution. So I would just exec in and try to get metrics before it ends. But actually for Prefect 2.0, this is one of the things we’ll expose so that you can send these metrics out because it’s one of the common questions we get. I suggest you post in the community channel so we can see what others have to say?
a
sure. am i setting the requests in the correct way tho? also, how do i ensure the pod gets cleaned up? i am looking at my gke cluster right now and its still running the same amount of vcpus and memory as it was when my flow was running (it ended about 10 mins ago).
and the pods are up with not ready statuses
k
Yes the request is right. I have a call with a teammate that regularly uses Kubernetes. Let me ask them and I’ll get back to you in like 30 mins
👍 1
g
Hey Andrew, what flavor of k8s are you using?
It sounds like an autoscaling event is not occurring
a
gke
g
We can see the job/pod name scheduled by a prefect flow from the prefect cloud ui. We can then use that name to search the GKE workloads for the pod
This will show you a very clear profile of memory usage vs requests/limits
This will clean up shortly after the pod runs, but is useful for now.
a
how do i search gke workloads?
g
In the bar click where it says
filter workloads
then paste the pod name
or job name
a
also, i think what is happening is my sidecars are causing my pods not to get torn down. i created sidecars to connect to google cloud sql, and those sidecars seem to stay up (my get pods commands shows 1 pod for every job i run, with 1/2 ready). so the pods stay up after the job completes. would you know how to make a pod shutdown after the prefect job completes, even if the sql connection isnt necessarily "done" yet
g
How did you create the sidecars?
a
in the job template
g
How are you passing the job template to the prefect flow?
a
which looks like
Copy code
apiVersion: batch/v1
kind: Job
metadata:
  name: prefect-job
spec:
  template:
    metadata:
      labels:
        app: prefect-agent
    spec:
      serviceAccountName: default
      containers:
      - name: flow-container
      - name: cloud-sql-proxy
        lifecycle:
          type: Sidecar
        image: <http://gcr.io/cloudsql-docker/gce-proxy:latest|gcr.io/cloudsql-docker/gce-proxy:latest>
        command:
          - "/cloud_sql_proxy"
          - "-instances=<CONNECTION>"
        resources:
          requests:
            memory: "128Mi"
            cpu:    "0.25"
in the run config
Copy code
run_config = KubernetesRun(
    image='IMAGE',
    labels=['etl'],
    cpu_request=0.25,
    memory_request="128Mi",
    env={ENV_VARS},
    job_template=job_template
  )
g
I think your job manifest needs some tweaking, I would start here https://cloud.google.com/kubernetes-engine/docs/how-to/jobs
Prefect also offers professional services for things like this if you would like some more hands on support for infrastructure setup
I think there is an autoscaling issue, and a pod definition issue
a
its not very clear to me what the pod definition issue is. i have read the job docs, and dont see anything obvious. i needed to add the sidecar in the job definition to ensure it was always deployed with each pod so i could connect to cloud sql. it doesnt seem to spin down correctly, but the only thing i could see that might help with that is the completionCount, but it defaults to 1 which is what id want anyway
k
George is normally on enterprise support calls, so just mentioning he’ll probably respond later in the day
g
What flavor cloudsql
a
mysql
g
So I use a pgbouncer as a single proxy as opposed to sidecars, but I think you can do a proxysql
a
i do currently have a single proxy service
and it is working
g
Thats excellent
a
ya. im a little thrown off by me not being able to get the sidecar to work, given that its what google recommends. but this is working for now. do you deploy gke with public ip and in autopilot mode?
g
Personally control pane public, all instances private
I haven’t tested out autopilot yet, not sure if I will. I like the granular control of node pools.
a
hmmm maybe i should try that then. ive never used kubernetes before so im not really sure. went autopilot since it seemed easier to setup
👍 1
after talking to google, they suggested doing this instead of a cloudsql proxy service in GKE https://cotton-ori.medium.com/how-to-terminate-a-side-car-container-in-kubernetes-job-2468f435ca99 when i try it, it doesnt work, and i think the issue is because i am overwriting the command that the flow container runs to now signal for it to end, and it seems to only do that, and not actually run my flow. what is the command it needs to run to start the flow, and can i have it run it?
@Kevin Kho @George Coyne did you see my above message? id like to try to go back to using sidecars if possible, instead of kubernetes services
k
I have something for the next few hours but will take a look in a bit.
a
ok thanks
k
So the first answer though is that whatever container you have, Prefect adds the commands on top of your container through the agent when it starts the Flow so you shouldn’t need to add it yourself. It’s something like
prefect execute …
I think this is the line that does it
What do you mean that you overrode the command to signal for it to end? You mean to immediately spin down the sidecar?
a
i set the arg for the flow container to be
Copy code
args:
            - |
              sleep 2s
              trap "touch /tmp/pod/terminated" EXIT
k
Sorry I’m a bit confused why the need for that?