https://prefect.io logo
#prefect-community
Title
# prefect-community
i

Ievgenii Martynenko

05/04/2022, 12:45 PM
Hi, I'm looking through https://github.com/PrefectHQ/prefect/discussions/4042 to define the most optimal deployment approach. We have a single Agent acting more as an orchestrator and 1000 Dataflows that are using DockerRun() and S3 Storage. In order to set up CI\CD we have to : 1. Copy the whole project to the Agent host and register all flows by means of some custom Python scripts (which we have to write). Dataflows will be saved on S3 at this moment. 2. Ensure that the image used in DockerRun() has all requirements/env variables installed and set. 3. Execute DataFlow from UI, which will go to S3 to pick flow, run new Docker container from the predefined image, and execute flow on that image. I'm bit confused here that copy of all DataFlows will sit on Agent and in fact Agent is not a worker. Can this approach be simplified?
a

Anna Geller

05/04/2022, 12:57 PM
Did I understand correctly that the problem you are facing is packaging your custom modules during CI and ensuring that your flow run will get the latest image?
i

Ievgenii Martynenko

05/04/2022, 1:02 PM
Not really. There is no issue with packaging. I'm trying to understand whether this approach is 'correct'. The only thing that confuses me is: in order to register flows we have to copy all of them to the agent and then during registration, in fact, another copy of the same flows will be created on S3.
a

Anna Geller

05/04/2022, 1:06 PM
you don't need to copy any code to the agent - during the flow run, Prefect will pull the container image from the specified registry e.g. from ECR, spin up a Docker container, and within this container, it will start a flow run. During a flow run, it will retrieve the flow code from S3. The only two things that you need to take care of are: #1 push the flow code to S3 - Prefect can take care of it during flow registration if you configure it this way:
Copy code
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)
#2 package code dependencies into a Docker image and push the image to some registry e.g. ECR Both steps - pushing flow code to S3 and pushing the image to a registry, can be automated with a CI/CD pipeline LMK if you need some examples
i

Ievgenii Martynenko

05/04/2022, 1:13 PM
ok, so we take a flow, put that in S3 Bucket (1) and then we need to tell prefect to register that flow from S3. How do I tell that to prefect?
a

Anna Geller

05/04/2022, 1:27 PM
you can do it this way: https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows/s3_docker_run_local_image.py
Copy code
import platform
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun
import subprocess

PREFECT_PROJECT_NAME = "community"
FLOW_NAME = "s3_docker_run_local_image"
AGENT_LABEL = "docker"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)

RUN_CONFIG = DockerRun(labels=[AGENT_LABEL],)


@task(log_stdout=True)
def hello_world(x: str):
    print(f"Hello {x} from {FLOW_NAME}!")
    print(
        f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
    )


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    user_input = Parameter("user_input", default="Marvin")
    hw = hello_world(user_input)

if __name__ == "__main__":
    subprocess.run(
        f"prefect register --project {PREFECT_PROJECT_NAME} -p flows/s3_docker_run_local_image.py",
        shell=True,
    )
i

Ievgenii Martynenko

05/04/2022, 1:29 PM
This is for local agent, while I'm looking for a way when Agent is remote
If I copy python files to S3 manually, can't a just execute below command on remote Agent? "prefect register --project "Test" -p https://path_to_bucket.amazonaws.com/flows/test.py"
To put it simpler: how can I register dataflows on remote Prefect environment?
a

Anna Geller

05/04/2022, 1:50 PM
What do you mean by remote agent? The example I sent was for Docker agent, not a local agent.
Both local and Docker agent can be started from a remote VM and Prefect will deploy your flow to those remote agents automatically, as long as you specify the same labels on your agent and flow's run configuration
i

Ievgenii Martynenko

05/04/2022, 2:02 PM
Let's do it in another way: 1. Let's take your dataflow s3_docker_run_local_image.py, but without if name == "__main__": section. 2. We have a Prefect environment running somewhere in AWS (Agent, UI, DB, Hasura etc) using docker-compose. We have access to bash for those servers. 3. We have S3 Bucket. What do I need to do to register s3_docker_run_local_image.py on that remote environment?
a

Anna Geller

05/04/2022, 2:05 PM
you need to register your flow. You can do it either using
flow.register("your_project")
or using CLI;
Copy code
prefect register --project yourproject -p yourflow.py
if this is confusing to you, check those resources: ā€¢ https://docs.prefect.io/orchestration/getting-started/registering-and-running-a-flow.html ā€¢ https://discourse.prefect.io/t/when-do-i-need-to-reregister-my-flow-which-changes-in-flow-metadata-result-in-prefect-bumping-up-the-flow-version/403
i

Ievgenii Martynenko

05/04/2022, 2:07 PM
Right. I need to register my flows. So I need to execute CLI "prefect register --project yourproject -p yourflow.py". But where do I execute that command?
a

Anna Geller

05/04/2022, 2:10 PM
you can do it from any CLI provided that you authenticate with your orchestration backend from that CLI
I understood you are on Server correct? in that case you would need to do this before registering
šŸ™Œ 1
i

Ievgenii Martynenko

05/04/2022, 2:11 PM
Right, one of the authenticated CLI is exactly remote Agent. So I can execute that command from remote Agent. And in order to execute that command from remote Agent I have to copy all DataFlows to remove Agent.
a

Anna Geller

05/04/2022, 2:13 PM
nope šŸ˜„ no need to copy anything. I know reading docs can be boring sometimes but if you read this one everything will be clearer, I promise.
i

Ievgenii Martynenko

05/04/2022, 2:16 PM
It's not about boring, it's about that you read a lot and you don't know what is useful and what is not. But when it comes to specific questions general docs are not usually useful in case you're not developing the product or working with it daily. The link you sent, I think what was required: we just repoint local prefect in Python virtual env to remove endpoint and local command will register flows in remote endpoint
a

Anna Geller

05/04/2022, 2:18 PM
sort of yes, you need to point your backend to your Prefect Server API
i

Ievgenii Martynenko

05/04/2022, 2:57 PM
I will try that, thanks.
šŸ‘ 1
Yes, that what I was looking for. Thank you for your patience.
šŸ‘ 1
m

Mateo Merlo

05/05/2022, 10:21 PM
I think this overview may be useful to understand how the architecture looks like and how they connect each other: https://docs.prefect.io/orchestration/#architecture-overview
upvote 1
@Anna Geller do you know any example about this "#2 package code dependencies into a Docker image and push the image to some registry e.g. ECR" that you mentioned above? I understand the part about S3 but I'm still struggling to understand how I can run an agent and tell Prefect that have to use that agent. "during the flow run, Prefect will pull the container image from the specified registry e.g. from ECR, spin up a Docker container, and within this container, it will start a flow run. During a flow run, it will retrieve the flow code from S3." This is totally clear but: Who is the one who pull the container that will run the flow? The agent? How can I run an agent using for example Docker?
a

Anna Geller

05/06/2022, 11:23 PM
technically, it's the flow runner, but you don't have to worry about this - you only need to register your flows:
Copy code
prefect register --project xyz -p yourflow.py
and start an agent:
Copy code
prefect agent docker start --label youragent
then, if your run configuration in yourflow.py is using the same label "youragent", this is how Prefect is able to determine which agent should poll for scheduled flow runs of that flow
m

Mateo Merlo

05/07/2022, 3:21 PM
Thanks for your response @Anna Geller. I am using this https://medium.com/the-prefect-blog/orchestrating-elt-on-kubernetes-with-prefect-dbt-snowflake-part-2-d915f5a65e59 that is helping me a lot to understand. In my case I'm using: ā€¢ Prefect Cloud ā€¢ Kubernetes to run the agent in GCP ā€¢ Google Cloud Storage to store the flows I need to run the agent using GOOGLE_APPLICATION_CREDENTIALS to allow read the flow from the storage, which is the best way to do this? Configuring a env variable in k8s.cfg or do it configuring KubernetesRun in run_config?
a

Anna Geller

05/07/2022, 4:13 PM
If you use GOOGLE_APPLICATION_CREDENTIALS, you just need the service account .json file in the execution environment
Copy code
export GOOGLE_APPLICATION_CREDENTIALS=/Users/you/.secrets/gcp.json
m

Mateo Merlo

05/07/2022, 5:27 PM
Yes, I'm trying to do that but when I run the flow I got error "Failed to load and execute flow run: DefaultCredentialsError('File /opt/prefect/gcs.json was not found.')" My Dockerfile is:
Copy code
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY requirements.txt .
COPY gcs.json .
COPY setup.py .
RUN pip install .
I built the image, pushed to registry in GCP. In my k8s.cfg I have this line
Copy code
prefect agent kubernetes start --env GOOGLE_APPLICATION_CREDENTIALS="/opt/prefect/gcs.json"
I checked from the console accessing to the pod and the file is there. Perhaps I have to upload my credentials in a different way instead of doing it with the docker image.
a

Anna Geller

05/07/2022, 5:39 PM
perhaps try adding this to your Dockerfile:
Copy code
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY requirements.txt .
COPY gcs.json .
COPY setup.py .
RUN pip install .
ENV GOOGLE_APPLICATION_CREDENTIALS=/opt/prefect/gcs.json
m

Mateo Merlo

05/07/2022, 5:49 PM
I've tried but same error
I tried to back to "prefect agent kubernetes start" without --env but I got the first error "Failed to load and execute flow run: Forbidden('GET https://storage.googleapis.com/storage/v1/b/prefect_flows_test?projection=noAcl&prettyPrint=false: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')"
a

Anna Geller

05/07/2022, 5:52 PM
I can't dive deeper now, maybe you can check without Kubernetes first? here is an example for Docker you may try https://github.com/anna-geller/packaging-prefect-flows/tree/master/flows/gcs_flow_of_flows_docker_run
m

Mateo Merlo

05/07/2022, 6:14 PM
Sure, I will check it. Thanks so much simple smile
šŸ‘ 1
10 Views