Hi, I'm looking through <https://github.com/Prefec...
# prefect-community
i
Hi, I'm looking through https://github.com/PrefectHQ/prefect/discussions/4042 to define the most optimal deployment approach. We have a single Agent acting more as an orchestrator and 1000 Dataflows that are using DockerRun() and S3 Storage. In order to set up CI\CD we have to : 1. Copy the whole project to the Agent host and register all flows by means of some custom Python scripts (which we have to write). Dataflows will be saved on S3 at this moment. 2. Ensure that the image used in DockerRun() has all requirements/env variables installed and set. 3. Execute DataFlow from UI, which will go to S3 to pick flow, run new Docker container from the predefined image, and execute flow on that image. I'm bit confused here that copy of all DataFlows will sit on Agent and in fact Agent is not a worker. Can this approach be simplified?
a
Did I understand correctly that the problem you are facing is packaging your custom modules during CI and ensuring that your flow run will get the latest image?
i
Not really. There is no issue with packaging. I'm trying to understand whether this approach is 'correct'. The only thing that confuses me is: in order to register flows we have to copy all of them to the agent and then during registration, in fact, another copy of the same flows will be created on S3.
a
you don't need to copy any code to the agent - during the flow run, Prefect will pull the container image from the specified registry e.g. from ECR, spin up a Docker container, and within this container, it will start a flow run. During a flow run, it will retrieve the flow code from S3. The only two things that you need to take care of are: #1 push the flow code to S3 - Prefect can take care of it during flow registration if you configure it this way:
Copy code
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)
#2 package code dependencies into a Docker image and push the image to some registry e.g. ECR Both steps - pushing flow code to S3 and pushing the image to a registry, can be automated with a CI/CD pipeline LMK if you need some examples
i
ok, so we take a flow, put that in S3 Bucket (1) and then we need to tell prefect to register that flow from S3. How do I tell that to prefect?
a
you can do it this way: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/s3_docker_run_local_image.py
Copy code
import platform
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun
import subprocess

PREFECT_PROJECT_NAME = "community"
FLOW_NAME = "s3_docker_run_local_image"
AGENT_LABEL = "docker"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)

RUN_CONFIG = DockerRun(labels=[AGENT_LABEL],)


@task(log_stdout=True)
def hello_world(x: str):
    print(f"Hello {x} from {FLOW_NAME}!")
    print(
        f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
    )


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    user_input = Parameter("user_input", default="Marvin")
    hw = hello_world(user_input)

if __name__ == "__main__":
    subprocess.run(
        f"prefect register --project {PREFECT_PROJECT_NAME} -p flows/s3_docker_run_local_image.py",
        shell=True,
    )
i
This is for local agent, while I'm looking for a way when Agent is remote
If I copy python files to S3 manually, can't a just execute below command on remote Agent? "prefect register --project "Test" -p https://path_to_bucket.amazonaws.com/flows/test.py"
To put it simpler: how can I register dataflows on remote Prefect environment?
a
What do you mean by remote agent? The example I sent was for Docker agent, not a local agent.
Both local and Docker agent can be started from a remote VM and Prefect will deploy your flow to those remote agents automatically, as long as you specify the same labels on your agent and flow's run configuration
i
Let's do it in another way: 1. Let's take your dataflow s3_docker_run_local_image.py, but without if name == "__main__": section. 2. We have a Prefect environment running somewhere in AWS (Agent, UI, DB, Hasura etc) using docker-compose. We have access to bash for those servers. 3. We have S3 Bucket. What do I need to do to register s3_docker_run_local_image.py on that remote environment?
a
you need to register your flow. You can do it either using
flow.register("your_project")
or using CLI;
Copy code
prefect register --project yourproject -p yourflow.py
if this is confusing to you, check those resources: ā€¢ https://docs.prefect.io/orchestration/getting-started/registering-and-running-a-flow.html ā€¢ https://discourse.prefect.io/t/when-do-i-need-to-reregister-my-flow-which-changes-in-flow-metadata-result-in-prefect-bumping-up-the-flow-version/403
i
Right. I need to register my flows. So I need to execute CLI "prefect register --project yourproject -p yourflow.py". But where do I execute that command?
a
you can do it from any CLI provided that you authenticate with your orchestration backend from that CLI
I understood you are on Server correct? in that case you would need to do this before registering
šŸ™Œ 1
i
Right, one of the authenticated CLI is exactly remote Agent. So I can execute that command from remote Agent. And in order to execute that command from remote Agent I have to copy all DataFlows to remove Agent.
a
nope šŸ˜„ no need to copy anything. I know reading docs can be boring sometimes but if you read this one everything will be clearer, I promise.
i
It's not about boring, it's about that you read a lot and you don't know what is useful and what is not. But when it comes to specific questions general docs are not usually useful in case you're not developing the product or working with it daily. The link you sent, I think what was required: we just repoint local prefect in Python virtual env to remove endpoint and local command will register flows in remote endpoint
a
sort of yes, you need to point your backend to your Prefect Server API
i
I will try that, thanks.
šŸ‘ 1
Yes, that what I was looking for. Thank you for your patience.
šŸ‘ 1
m
I think this overview may be useful to understand how the architecture looks like and how they connect each other: https://docs.prefect.io/orchestration/#architecture-overview
upvote 1
@Anna Geller do you know any example about this "#2 package code dependencies into a Docker image and push the image to some registry e.g. ECR" that you mentioned above? I understand the part about S3 but I'm still struggling to understand how I can run an agent and tell Prefect that have to use that agent. "during the flow run, Prefect will pull the container image from the specified registry e.g. from ECR, spin up a Docker container, and within this container, it will start a flow run. During a flow run, it will retrieve the flow code from S3." This is totally clear but: Who is the one who pull the container that will run the flow? The agent? How can I run an agent using for example Docker?
a
technically, it's the flow runner, but you don't have to worry about this - you only need to register your flows:
Copy code
prefect register --project xyz -p yourflow.py
and start an agent:
Copy code
prefect agent docker start --label youragent
then, if your run configuration in yourflow.py is using the same label "youragent", this is how Prefect is able to determine which agent should poll for scheduled flow runs of that flow
m
Thanks for your response @Anna Geller. I am using this https://medium.com/the-prefect-blog/orchestrating-elt-on-kubernetes-with-prefect-dbt-snowflake-part-2-d915f5a65e59 that is helping me a lot to understand. In my case I'm using: ā€¢ Prefect Cloud ā€¢ Kubernetes to run the agent in GCP ā€¢ Google Cloud Storage to store the flows I need to run the agent using GOOGLE_APPLICATION_CREDENTIALS to allow read the flow from the storage, which is the best way to do this? Configuring a env variable in k8s.cfg or do it configuring KubernetesRun in run_config?
a
If you use GOOGLE_APPLICATION_CREDENTIALS, you just need the service account .json file in the execution environment
Copy code
export GOOGLE_APPLICATION_CREDENTIALS=/Users/you/.secrets/gcp.json
m
Yes, I'm trying to do that but when I run the flow I got error "Failed to load and execute flow run: DefaultCredentialsError('File /opt/prefect/gcs.json was not found.')" My Dockerfile is:
Copy code
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY requirements.txt .
COPY gcs.json .
COPY setup.py .
RUN pip install .
I built the image, pushed to registry in GCP. In my k8s.cfg I have this line
Copy code
prefect agent kubernetes start --env GOOGLE_APPLICATION_CREDENTIALS="/opt/prefect/gcs.json"
I checked from the console accessing to the pod and the file is there. Perhaps I have to upload my credentials in a different way instead of doing it with the docker image.
a
perhaps try adding this to your Dockerfile:
Copy code
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY requirements.txt .
COPY gcs.json .
COPY setup.py .
RUN pip install .
ENV GOOGLE_APPLICATION_CREDENTIALS=/opt/prefect/gcs.json
m
I've tried but same error
I tried to back to "prefect agent kubernetes start" without --env but I got the first error "Failed to load and execute flow run: Forbidden('GET https://storage.googleapis.com/storage/v1/b/prefect_flows_test?projection=noAcl&amp;prettyPrint=false: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')"
a
I can't dive deeper now, maybe you can check without Kubernetes first? here is an example for Docker you may try https://github.com/anna-geller/packaging-prefect-flows/tree/master/flows/gcs_flow_of_flows_docker_run
m
Sure, I will check it. Thanks so much simple smile
šŸ‘ 1