Hi, I'm new to Prefect. It's fantastic to deploy p...
# prefect-community
h
Hi, I'm new to Prefect. It's fantastic to deploy pipelines by using Python code. However, I had faced some issues when deploying a pipeline to Prefect. Here's my scenario. • I want to run Prefect Agent in the K8s cluster, and I successfully plug the agent into our cluster. • I built a docker image in Gitlab CI and pushed the docker image to ECR. Then, I wrote the Dockerfile and reference from here. • I write a flow script and register to the Prefect cloud by using my local computer. Unfortunately, after running the pipeline, I got some errors: the python model was not found. As a result, I can't successfully import the module that I wrote in the project. And here's my flow script.
Copy code
Failed to load and execute flow run: ModuleNotFoundError("No module named '/Users/xxx/'")
Thanks for helping. 🙇
k
Hey @Hash Lin, this will be a good read for that. The problem is that Prefect pulls your Flow from Storage and the default Storage if your local machine. You need to use some Storage available to the Kubernetes pod (Github, S3, etc.)
When you get the chance, could you move the code to this thread, to keep the main channel a bit neater? Thank you!
h
Copy code
import prefect
from prefect import task, Flow
from prefect.run_configs import KubernetesRun

from app.job.experiment import ExperimentJob


@task
def process():
    logger = prefect.context.get("logger")
    experiment_job = ExperimentJob(logger)
    experiment_job.update_experiment_user_count()


with Flow(
        "update_experiment_user_count",
        run_config=KubernetesRun(
            image_pull_policy="Always",
            labels=["k8s"],
            image="<http://xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test|xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test>",
        ),
) as flow:
    process()

flow.register(project_name="test_project", labels=["k8s"])
Hi @Kevin Kho, thx for your reply. I'd moved my sample flow code to the thread. I think I had a little confused with the usage of Storage in Prefect. Originally we'll put the source code inside the docker image. But, in Prefect, we don't need to put the source code in the docker image. We put the source code and the flow code in Storage, right?
I'm trying using GitLab storage with Prefect, but I got file not found error message when running the pipeline. So here's my sample code.
Copy code
flow = Flow("my-flow")
flow.storage = GitLab(repo="my/repo", path="/flow.py", ref="MISOCU-3337")
https://cln.sh/GPi8xP I'm not quite sure where I got wrong here. 🤦
I think I know the problem. I should use
flow.py
as path, not
/flow.py
. I think the document guild me in the wrong direction. 🤦
Sorry I still facing the
ModuleNotFoundError
problem after I used GitLab Storage. Here's the error message and the example code.
Copy code
prefect ModuleNotFoundError: No module named 'app'
Copy code
import prefect
from prefect import task, Flow
from prefect.run_configs import KubernetesRun

from app.job.experiment import ExperimentJob


@task
def process():
    logger = prefect.context.get("logger")
    experiment_job = ExperimentJob(logger)
    experiment_job.update_experiment_user_count()


with Flow(
        "update_experiment_user_count",
        run_config=KubernetesRun(
            image_pull_policy="Always",
            labels=["k8s"],
            image="<http://xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test|xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test>",
        ),
        storage=GitLab(
            access_token_secret="gitlab-access-token",
            repo="my/repo",
            ref="MISOCU-3337",
            path="flow.py"
        ),
) as flow:
    process()

flow.register(project_name="test_project", labels=["k8s"])
I also create a
setup.py
file and define the dependency.
Copy code
from setuptools import setup, find_packages

with open('requirements.txt') as f:
    requirements = f.read().splitlines()

setup(
    name="flow_utilities",
    version="0.1",
    packages=find_packages(),
    install_requires=requirements,
)
k
That is because Storage only uploads/downloads the flow file. If you need to include the dependencies, you can either use Docker storage and package them, or you can add them to the image you are passing to KubernetesRun
h
I fixed this problem. Thx Kevin.
👍 1
k
Nice 🙂
t
@Hash Lin Hi Hash! I'm running into the same problems as you currently, could you spare some time for a chat on the solution?
k
My first response has a detailed write up on that. Did you check it?
t
@Kevin Kho yes I did! Thank you again for linking the detailed write up 🙂 I actually used the first 2 responses to make it work locally, but the response to ECSRun and GitHub storage actually pertains exactly to our use case. Must have missed it the first skim haha thank you!
k
Nice work!
m
I'm coming through this thread because I'm having the same issue with pandas: "Failed to load and execute flow run: ModuleNotFoundError("No module named 'pandas'")". I'm working with Github Storage and Kubernetes agent. I read the article and I have the same
setup.py
@Hash Lin shared above. My Dockerfile is:
Copy code
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY src/requirements.txt .
COPY src/setup.py .
RUN pip install .
I built this image, pushed to Artifact Registry in GCP, and then I have the cluster pointing this image. Should I create another file to be able to add pandas and have it available during the flow run?
k
Are you using the DaskExecutor maybe?
m
How can I check that? I installed the agent following this guide: https://gist.github.com/palewire/072513a9940478370697323c0d15c6ec An before use pandas the agent was working good
This is the
flow_test.py
Copy code
import platform
import pandas as pd
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
import subprocess

PREFECT_PROJECT_NAME = "testing"
FLOW_NAME = "flow_test"
AGENT_LABEL = "etl"

STORAGE = GitHub(
    repo="mateo2181/prefect-flows-test",
    path=f"flows/{FLOW_NAME}.py",
    access_token_secret="GITHUB_ACCESS_TOKEN"
)

RUN_CONFIG = KubernetesRun(labels=[AGENT_LABEL])

@task
def extract_and_load(dataset: str) -> None:
    logger = prefect.context.get("logger")
    file = f"<gs://football_transfers/transfers/{dataset}>"
    df = pd.read_csv(file)
    <http://logger.info|logger.info>("Dataset %s with %d rows loaded to DB", dataset, len(df))

@task(log_stdout=True)
def hello_world():
    print(f"Hello from {FLOW_NAME} v2!")
    print(f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}")


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    # user_input = Parameter("user_input", default="Marvin")
    hw = hello_world()
    extract_and_load("1999_dutch_eredivisie.csv")
k
Ah if you are not specifying it, you are not using Dask. I guess you could use this syntax to add other libraries because the default PRefect image doesn’t have it. I could recommend making your own image (also found in the same page) because adding it here will re-download it every time the flow is run
m
Yes, I'm building an image (I shared my dockerfile above). In my
k8s.cfg
I have this line:
Copy code
spec:
      containers:
      - args:
        - prefect agent kubernetes start
        image: europe-west6-docker.pkg.dev/zoomcamp-340819/prefect-agents/prefect-agents-etl:latest
If I have this, I don't need to specify the image in KubernetesRun conf, right?
k
Ah I see what you mean. No that’s not right. This is the image for the container that holds the agent. But the agent will spin up a new job with another image, and this image is not carried over by default. You need to specify it in the
KubernetesRun
or you need to specify it in the job template that the agent passes here
m
Oh now I understand. So I can use the default prefect image for my Kubernetes agent and only build and push the image to use it in the new job that the agent create, right?
k
Yes that’s right because the agent just needs Prefect installed in that container. You can even have that agent outside a container as long as it can submit to your k8s cluster. The point being that the agent “container” doesn’t do anything.
m
Yes, but to run the agent I need an image with prefect
Or you mean the agent outside Kubernetes?
k
Maybe that statement was confusing. Yes any image with Prefect will go for agent
m
Thanks so much for your help @Kevin Kho! I really appreciate it. simple smile
k
Of course!
153 Views