Andrew Lawlor

    Andrew Lawlor

    7 months ago
    im trying out prefect for our etl pipelines. we use GCP as our cloud provider. Id like to use prefect cloud and run agents in GCP. is the best option for doing this the vertex agent? If I do use that, what is the process for getting started? do i just create the agents at the beginning via cli and then leave them running, or do i have to continuously update them? is there a way to set up ci/cd for my flow code?
    Kevin Kho

    Kevin Kho

    7 months ago
    Hi @Andrew Lawlor, this blog used Vertex, but you also have Google Kubernetes Engine and you can also just use a VM. Agents always have to be running. For Flow deployment patterns, you can start here
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    thx ill investigate these
    i followed the above examples and have tried starting a docker agent on a GCP VM instance. it hasnt been working and after i ssh'ed into the vm, i see it had this error docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2 , 'No such file or directory')) my dockerfile looks like FROM prefecthq/prefect:latest-python3.9 CMD prefect agent docker start --key can anyone help?
    Kevin Kho

    Kevin Kho

    7 months ago
    I think it doesnt have docker installed? What is the output of
    docker run hello-world
    ?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    that works when i run it in the shell of my machine. not sure what it would look like as the last line of the dockerfile. Unable to find image 'hello-world:latest' locally latest: Pulling from library/hello-world 2db29710123e: Pull complete Digest: sha256:507ecde44b8eb741278274653120c2bf793b174c06ff4eaa672b713b3263477b Status: Downloaded newer image for hello-world:latest Hello from Docker! This message shows that your installation appears to be working correctly. To generate this message, Docker took the following steps: 1. The Docker client contacted the Docker daemon. 2. The Docker daemon pulled the "hello-world" image from the Docker Hub. (amd64) 3. The Docker daemon created a new container from that image which runs the executable that produces the output you are currently reading. 4. The Docker daemon streamed that output to the Docker client, which sent it to your terminal. To try something more ambitious, you can run an Ubuntu container with: $ docker run -it ubuntu bash Share images, automate workflows, and more with a free Docker ID: https://hub.docker.com/ For more examples and ideas, visit: https://docs.docker.com/get-started/
    Kevin Kho

    Kevin Kho

    7 months ago
    Could you give me a longer traceback of the first one?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 214, in _retrieve_server_version return self.version(api_version=False)["ApiVersion"] File "/usr/local/lib/python3.9/site-packages/docker/api/daemon.py", line 181, in version return self._result(self._get(url), json=True) File "/usr/local/lib/python3.9/site-packages/docker/utils/decorators.py", line 46, in inner return f(self, *args, **kwargs) File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 237, in _get return self.get(url, **self._set_request_timeout(kwargs)) File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 542, in get return self.request('GET', url, **kwargs) File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 529, in request resp = self.send(prep, **send_kwargs) File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 645, in send r = adapter.send(request, **kwargs) File "/usr/local/lib/python3.9/site-packages/requests/adapters.py", line 501, in send raise ConnectionError(err, request=request) requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/bin/prefect", line 8, in <module> sys.exit(cli()) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1128, in call return self.main(*args, **kwargs) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1053, in main rv = self.invoke(ctx) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1659, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1395, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python3.9/site-packages/click/core.py", line 754, in invoke return __callback(*args, **kwargs) File "/usr/local/lib/python3.9/site-packages/prefect/cli/agent.py", line 274, in start start_agent( File "/usr/local/lib/python3.9/site-packages/prefect/cli/agent.py", line 139, in start_agent agent = agent_cls(labels=labels, env_vars=env_vars, **kwargs) File "/usr/local/lib/python3.9/site-packages/prefect/agent/docker/agent.py", line 160, in init self.docker_client = self._get_docker_client() File "/usr/local/lib/python3.9/site-packages/prefect/agent/docker/agent.py", line 186, in _get_docker_client return docker.APIClient( File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 197, in init self._version = self._retrieve_server_version() File "/usr/local/lib/python3.9/site-packages/docker/api/client.py", line 221, in _retrieve_server_version raise DockerException( docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2 , 'No such file or directory'))
    Kevin Kho

    Kevin Kho

    7 months ago
    The
    docker run hello-world
    was intended to be run on the machine so that looks like. I’m pretty confused because this error seems to say it can’t find Docker. What is the output of
    docker version
    ?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    Docker version 20.10.6, build 370c289
    Kevin Kho

    Kevin Kho

    7 months ago
    Oh sorry not version. That works even if Docker is off. Try
    docker ps
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c8e4121f7a1a gcr.io/development-273516/datapipeline:68b3ed3 "tini -g -- entrypoi…" 6 minutes ago Restarting (127) 6 seconds ago klt-prefect-pipeline-lltc ba11113bf413 gcr.io/stackdriver-agents/stackdriver-logging-agent:1.8.9 "/entrypoint.sh /usr…" 7 minutes ago Up 6 minutes stackdriver-logging-agent
    Kevin Kho

    Kevin Kho

    7 months ago
    This looks good let me look at what i can find
    Maybe try this ? This is a very similar issue to yours. You can also try restarting the service
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    i tried running that in the vm and its still not working. but now all im seeing is i cant attach to the docker container over and over again bc it keeps restarting
    Kevin Kho

    Kevin Kho

    7 months ago
    Are you on ubuntu or centos?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    i think its debian
    Kevin Kho

    Kevin Kho

    7 months ago
    https://github.com/docker/docker-py/issues/2455 it seems they closed this issue and the error is not related to the SDK
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ya im sure the error is something im doing. im seeing lots of docs that say i can start an agent in a vm on gcp, but im not really a devops person so im not totally sure how to do that. is there a guide anywhere for the exact steps i need? i should note i did successfully start a local agent on my vm, but it didnt pick up my jobs. not sure whats going on with that either. i think i probably do want a docker agent anyway so i dont want to focus on that
    Kevin Kho

    Kevin Kho

    7 months ago
    I only have exact instructions for Kubernetes here, but unfortunately not Docker. I looked around and maybe this will help for just getting Docker on, and then we can get Prefect by installing Python. The Local Agent starting really points to a Docker issue. Also the “not picking jobs” is likely an easy fix with Labels
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    thx for the help. i found something similar to your first link and followed it with a new VM (with debian 10) and did get it running. however, i did get this new error Failed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.8.2', currently running with '3.7.12')") which makes sense because my local machine does have python 3.8 and the vm has 3.7. not really sure the best way around that tho, since i dont want to have to rely on my machine and the vm having the same python version. i will say im not tied to docker i just thought it would be easier than kubernetes since im not really familiar with kubernetes. but i could try kubernetes too
    Kevin Kho

    Kevin Kho

    7 months ago
    This error is just saying that you registered in 3.8 but the Flow image seems to be using the default 3.7. There are two places you can define the Prefect 3.7 image. Find the version you want here . And then you can either specify in:1. If using Docker storage, the Docker storage class takes in a base image 2. If using another storage but DockerRun, DockerRun takes an image
    Then you would just use
    prefecthq/prefect:latest-python3.8
    or something like that
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    where should i be registering them? i have been registering them on my local machine but it doesnt match exactly. i could have docker use the exact same image as my local (3.8.2) but since i want to set up ci/cd on registration anyway, id rather try running registration somewhere els
    Kevin Kho

    Kevin Kho

    7 months ago
    Are you using Prefect Cloud or Prefect Server (self-hosted)?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    cloud
    Kevin Kho

    Kevin Kho

    7 months ago
    Then your registrations are against Prefect Cloud (wherever you register from), so if you register from Local machine, and run from the VM, the agent of the VM is responsible for loading that Flow (from the defined Storage) and executing it.
    A couple of things to look at:1. Registering your flow adds it to a Storage . The execution side will pull it from Storage to run the Flow. Because your execution is not where you registered from, you need to use some storage that can be pulled by the agent (GCS, Github, Docker, etc) 2. The agent gets the flow from Storage and runs in on top of the specified RunConfiguration . You can specify the image on the DockerRun . The image Python version just needs to match the version of registration (for consistent serialization and desrialization) 3. Docker storage can build an image, and DockerRun can use that image. In that case, just specify a
    base_image
    for Docker storage to make the versions match
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ive been using GCS storage. it seemed like that was better since it wouldnt need to build a new image every time
    Kevin Kho

    Kevin Kho

    7 months ago
    That works. Just specify the DockerRun(image="prefecthq/prefect:latest-python3.8")and I think your Flow will work
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    i did, but i got Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ImportError('Using
    prefect.tasks.mysql
    requires Prefect to be installed with the "mysql" extra.')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.8.2', currently running with '3.8.12')\nThis also may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.') so i think i have to create my own img with mysql
    Kevin Kho

    Kevin Kho

    7 months ago
    Ah ok. Yes and no. For quick prototyping you can do this. But yes it is preferable to build the image yep.
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    im building the image in a gcp container registry but getting unauthorized errors when i try to use it. is there a way to pass credentials to the image?
    Kevin Kho

    Kevin Kho

    7 months ago
    The agent needs credentials to pull so you can set the
    GOOGLE_APPLICATION_CREDENTIALS
    env variable and point it to a json to authenticate
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    oh i didnt think i had to do that since im within a GCP VM
    Kevin Kho

    Kevin Kho

    7 months ago
    That’s a good point cuz AWS can do this if you add a role. I just checked and it seems you still have to configure
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    the extra_pip_packages isnt working, the jobs get picked up by agents but hang in a submitted state i also cant seem to get my dockerfile to work. think im gonna try out kubernetes. i think i eventually want kubernetes anyway and was trying docker to get my feet wet, but it seems like its still going to be a struggle for docker
    actually nvm i guess i still need an image to run kubernetes so ill continue trying to get it to work with docker
    Kevin Kho

    Kevin Kho

    7 months ago
    I think the test would be
    docker pull
    on your VM from the registry and as long as that works, the Prefect agent should be able to pull
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ok it seems to be working a little bit better now. it is now starting tasks on my vm instance. however, i am getting an issue with the MySQLFetch and MySQLExecute tasks. i was passing ssl certs to these when running locally, but when i have been running on GCP i pass a unix_socket instead. the SQLAlchemy engine looks like db = sqlalchemy.create_engine( sqlalchemy.engine.url.URL( drivername='mysql+pymysql', username=db_user, password=db_password, database=db_name, query={"unix_socket": connection_name)}, ), ) is it possible to do this with the MySQLFetch and MySQLExecute tasks?
    Kevin Kho

    Kevin Kho

    7 months ago
    I guess not, or it would be different because the MySQL task uses
    pymysql
    , so I would suggest you may just want to make your own task for that?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ok
    ok so im using kubernetes run and GKE now. i am still getting the mysql error. i followed a google tutorial here to connect via a sidecar in my kubernetes pod. the sidecar is set up in the same pod as my agent. how do sidecars work with kubernetesrun instances? does it spin up a new pod with my docker image to run my flow? if so, how do i ensure that the new pod also has my sidecar? the logs seem to indicate a new pod being spun up, and it seems to not have the sidecar set up
    actually i set up a job template a la this guide and it seems to work a little better, but now im seeing Message: Error: container has runAsNonRoot and image will run as root
    Kevin Kho

    Kevin Kho

    7 months ago
    what was the MySQL error? Trying to find it but could not? am a bit confused how we got to Docker in Docker? 😅
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    its an unauthorized error. gcp has specific ways to connect to their cloud sql via a cloud sql proxy, so im following their guide to connect via kubernetes
    but im a little confused on how kubernetes works. their guide is to deploy a sidecar container with a docker image that they provide for connection to mysql, and then to talk to mysql via that on localhost with a user and pw. i deployed the sidecar in the agent k8s.config but it didnt seem to work. how do i ensure the sidecar is always deployed with the instance of every run?
    Kevin Kho

    Kevin Kho

    7 months ago
    wow that sounds so complicated. you can do it with the job template I guess like in the example you showed?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    thats what im trying but i am very confused. what exactly is the job template doing? why do i have both that and a kubernetes config for the agent? whats the difference between them?
    Kevin Kho

    Kevin Kho

    7 months ago
    The agent job template is a default one. If there is none in KubernetesRun, that will be used. If both are supplied, RunConfig takes precedence. So think Agent is default and Flow can come in and override it
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    oh so i should specify a job template in the agent if i want it to apply to everything?
    Kevin Kho

    Kevin Kho

    7 months ago
    Yes. I think this may clear things up. The agent already has a default one actually
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ok. this is working. thank you for your help. i had confused myself on how kubernetes pods were deployed.
    do you have any documentation handy on sharing tasks between flows?
    Kevin Kho

    Kevin Kho

    7 months ago
    Not specifically, but that would be similar to our task library
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    im trying to create a script that registers my flows on a git push. im getting an unauthenticated error with prefect. how do i login with my api key in code?
    Kevin Kho

    Kevin Kho

    7 months ago
    If using CLI
    prefect auth login -k API_KEY
    . If using code, I think you need to use
    client.save_auth_to_disk
    like this
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    is there a way to know how much memory i should be using for a pod? i regularly see this message when spinning up pods:
    message: 0/3 nodes are available: 3 Insufficient cpu, 3 Insufficient memory.
    Kevin Kho

    Kevin Kho

    7 months ago
    Isn’t this saying that there isn’t even nodes available though?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    i read it as there are 3 nodes and. all had insufficient cpu and insufficient memory
    Kevin Kho

    Kevin Kho

    7 months ago
    Ah ok. The answer though is there isn’t a good way ahead of time (at least that I know of)
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ok. im memory and cpu requests in the run_config like so
    run_config = KubernetesRun(
    cpu_request=0.5,
    memory_request="2Gi",
    )
    is that the correct place to do it? and is there a good place to tell after the fact?
    Kevin Kho

    Kevin Kho

    7 months ago
    I personally don’t know a good way because the pod gets cleaned after execution. So I would just exec in and try to get metrics before it ends. But actually for Prefect 2.0, this is one of the things we’ll expose so that you can send these metrics out because it’s one of the common questions we get. I suggest you post in the community channel so we can see what others have to say?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    sure. am i setting the requests in the correct way tho? also, how do i ensure the pod gets cleaned up? i am looking at my gke cluster right now and its still running the same amount of vcpus and memory as it was when my flow was running (it ended about 10 mins ago).
    and the pods are up with not ready statuses
    Kevin Kho

    Kevin Kho

    7 months ago
    Yes the request is right. I have a call with a teammate that regularly uses Kubernetes. Let me ask them and I’ll get back to you in like 30 mins
    George Coyne

    George Coyne

    7 months ago
    Hey Andrew, what flavor of k8s are you using?
    It sounds like an autoscaling event is not occurring
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    gke
    George Coyne

    George Coyne

    7 months ago
    We can see the job/pod name scheduled by a prefect flow from the prefect cloud ui. We can then use that name to search the GKE workloads for the pod
    This will show you a very clear profile of memory usage vs requests/limits
    This will clean up shortly after the pod runs, but is useful for now.
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    how do i search gke workloads?
    George Coyne

    George Coyne

    7 months ago
    In the bar click where it says
    filter workloads
    then paste the pod name
    or job name
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    also, i think what is happening is my sidecars are causing my pods not to get torn down. i created sidecars to connect to google cloud sql, and those sidecars seem to stay up (my get pods commands shows 1 pod for every job i run, with 1/2 ready). so the pods stay up after the job completes. would you know how to make a pod shutdown after the prefect job completes, even if the sql connection isnt necessarily "done" yet
    George Coyne

    George Coyne

    7 months ago
    How did you create the sidecars?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    in the job template
    George Coyne

    George Coyne

    7 months ago
    How are you passing the job template to the prefect flow?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    which looks like
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: prefect-job
    spec:
      template:
        metadata:
          labels:
            app: prefect-agent
        spec:
          serviceAccountName: default
          containers:
          - name: flow-container
          - name: cloud-sql-proxy
            lifecycle:
              type: Sidecar
            image: <http://gcr.io/cloudsql-docker/gce-proxy:latest|gcr.io/cloudsql-docker/gce-proxy:latest>
            command:
              - "/cloud_sql_proxy"
              - "-instances=<CONNECTION>"
            resources:
              requests:
                memory: "128Mi"
                cpu:    "0.25"
    in the run config
    run_config = KubernetesRun(
        image='IMAGE',
        labels=['etl'],
        cpu_request=0.25,
        memory_request="128Mi",
        env={ENV_VARS},
        job_template=job_template
      )
    George Coyne

    George Coyne

    7 months ago
    I think your job manifest needs some tweaking, I would start here https://cloud.google.com/kubernetes-engine/docs/how-to/jobs
    Prefect also offers professional services for things like this if you would like some more hands on support for infrastructure setup
    I think there is an autoscaling issue, and a pod definition issue
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    its not very clear to me what the pod definition issue is. i have read the job docs, and dont see anything obvious. i needed to add the sidecar in the job definition to ensure it was always deployed with each pod so i could connect to cloud sql. it doesnt seem to spin down correctly, but the only thing i could see that might help with that is the completionCount, but it defaults to 1 which is what id want anyway
    Kevin Kho

    Kevin Kho

    7 months ago
    George is normally on enterprise support calls, so just mentioning he’ll probably respond later in the day
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    George Coyne

    George Coyne

    7 months ago
    What flavor cloudsql
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    mysql
    George Coyne

    George Coyne

    7 months ago
    So I use a pgbouncer as a single proxy as opposed to sidecars, but I think you can do a proxysql
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    i do currently have a single proxy service
    and it is working
    George Coyne

    George Coyne

    7 months ago
    Thats excellent
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ya. im a little thrown off by me not being able to get the sidecar to work, given that its what google recommends. but this is working for now. do you deploy gke with public ip and in autopilot mode?
    George Coyne

    George Coyne

    7 months ago
    Personally control pane public, all instances private
    I haven’t tested out autopilot yet, not sure if I will. I like the granular control of node pools.
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    hmmm maybe i should try that then. ive never used kubernetes before so im not really sure. went autopilot since it seemed easier to setup
    after talking to google, they suggested doing this instead of a cloudsql proxy service in GKE https://cotton-ori.medium.com/how-to-terminate-a-side-car-container-in-kubernetes-job-2468f435ca99 when i try it, it doesnt work, and i think the issue is because i am overwriting the command that the flow container runs to now signal for it to end, and it seems to only do that, and not actually run my flow. what is the command it needs to run to start the flow, and can i have it run it?
    @Kevin Kho @George Coyne did you see my above message? id like to try to go back to using sidecars if possible, instead of kubernetes services
    Kevin Kho

    Kevin Kho

    7 months ago
    I have something for the next few hours but will take a look in a bit.
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    ok thanks
    Kevin Kho

    Kevin Kho

    7 months ago
    So the first answer though is that whatever container you have, Prefect adds the commands on top of your container through the agent when it starts the Flow so you shouldn’t need to add it yourself. It’s something like
    prefect execute …
    I think this is the line that does it
    What do you mean that you overrode the command to signal for it to end? You mean to immediately spin down the sidecar?
    Andrew Lawlor

    Andrew Lawlor

    7 months ago
    i set the arg for the flow container to be
    args:
                - |
                  sleep 2s
                  trap "touch /tmp/pod/terminated" EXIT
    Kevin Kho

    Kevin Kho

    7 months ago
    Sorry I’m a bit confused why the need for that?