https://prefect.io logo
Title
h

Hash Lin

04/16/2022, 2:28 PM
Hi, I'm new to Prefect. It's fantastic to deploy pipelines by using Python code. However, I had faced some issues when deploying a pipeline to Prefect. Here's my scenario. • I want to run Prefect Agent in the K8s cluster, and I successfully plug the agent into our cluster. • I built a docker image in Gitlab CI and pushed the docker image to ECR. Then, I wrote the Dockerfile and reference from here. • I write a flow script and register to the Prefect cloud by using my local computer. Unfortunately, after running the pipeline, I got some errors: the python model was not found. As a result, I can't successfully import the module that I wrote in the project. And here's my flow script.
Failed to load and execute flow run: ModuleNotFoundError("No module named '/Users/xxx/'")
Thanks for helping. 🙇
k

Kevin Kho

04/16/2022, 5:43 PM
Hey @Hash Lin, this will be a good read for that. The problem is that Prefect pulls your Flow from Storage and the default Storage if your local machine. You need to use some Storage available to the Kubernetes pod (Github, S3, etc.)
When you get the chance, could you move the code to this thread, to keep the main channel a bit neater? Thank you!
h

Hash Lin

04/17/2022, 7:18 AM
import prefect
from prefect import task, Flow
from prefect.run_configs import KubernetesRun

from app.job.experiment import ExperimentJob


@task
def process():
    logger = prefect.context.get("logger")
    experiment_job = ExperimentJob(logger)
    experiment_job.update_experiment_user_count()


with Flow(
        "update_experiment_user_count",
        run_config=KubernetesRun(
            image_pull_policy="Always",
            labels=["k8s"],
            image="<http://xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test|xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test>",
        ),
) as flow:
    process()

flow.register(project_name="test_project", labels=["k8s"])
Hi @Kevin Kho, thx for your reply. I'd moved my sample flow code to the thread. I think I had a little confused with the usage of Storage in Prefect. Originally we'll put the source code inside the docker image. But, in Prefect, we don't need to put the source code in the docker image. We put the source code and the flow code in Storage, right?
I'm trying using GitLab storage with Prefect, but I got file not found error message when running the pipeline. So here's my sample code.
flow = Flow("my-flow")
flow.storage = GitLab(repo="my/repo", path="/flow.py", ref="MISOCU-3337")
https://cln.sh/GPi8xP I'm not quite sure where I got wrong here. 🤦
I think I know the problem. I should use
flow.py
as path, not
/flow.py
. I think the document guild me in the wrong direction. 🤦
Sorry I still facing the
ModuleNotFoundError
problem after I used GitLab Storage. Here's the error message and the example code.
prefect ModuleNotFoundError: No module named 'app'
import prefect
from prefect import task, Flow
from prefect.run_configs import KubernetesRun

from app.job.experiment import ExperimentJob


@task
def process():
    logger = prefect.context.get("logger")
    experiment_job = ExperimentJob(logger)
    experiment_job.update_experiment_user_count()


with Flow(
        "update_experiment_user_count",
        run_config=KubernetesRun(
            image_pull_policy="Always",
            labels=["k8s"],
            image="<http://xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test|xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test>",
        ),
        storage=GitLab(
            access_token_secret="gitlab-access-token",
            repo="my/repo",
            ref="MISOCU-3337",
            path="flow.py"
        ),
) as flow:
    process()

flow.register(project_name="test_project", labels=["k8s"])
I also create a
setup.py
file and define the dependency.
from setuptools import setup, find_packages

with open('requirements.txt') as f:
    requirements = f.read().splitlines()

setup(
    name="flow_utilities",
    version="0.1",
    packages=find_packages(),
    install_requires=requirements,
)
k

Kevin Kho

04/17/2022, 3:03 PM
That is because Storage only uploads/downloads the flow file. If you need to include the dependencies, you can either use Docker storage and package them, or you can add them to the image you are passing to KubernetesRun
h

Hash Lin

04/17/2022, 3:12 PM
I fixed this problem. Thx Kevin.
👍 1
k

Kevin Kho

04/17/2022, 3:13 PM
Nice 🙂
t

Tanasorn Chindasook

05/05/2022, 6:49 PM
@Hash Lin Hi Hash! I'm running into the same problems as you currently, could you spare some time for a chat on the solution?
k

Kevin Kho

05/05/2022, 6:51 PM
My first response has a detailed write up on that. Did you check it?
t

Tanasorn Chindasook

05/06/2022, 6:58 AM
@Kevin Kho yes I did! Thank you again for linking the detailed write up 🙂 I actually used the first 2 responses to make it work locally, but the response to ECSRun and GitHub storage actually pertains exactly to our use case. Must have missed it the first skim haha thank you!
k

Kevin Kho

05/06/2022, 1:45 PM
Nice work!
m

Mateo Merlo

05/09/2022, 11:01 AM
I'm coming through this thread because I'm having the same issue with pandas: "Failed to load and execute flow run: ModuleNotFoundError("No module named 'pandas'")". I'm working with Github Storage and Kubernetes agent. I read the article and I have the same
setup.py
@Hash Lin shared above. My Dockerfile is:
FROM prefecthq/prefect:latest-python3.9
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY src/requirements.txt .
COPY src/setup.py .
RUN pip install .
I built this image, pushed to Artifact Registry in GCP, and then I have the cluster pointing this image. Should I create another file to be able to add pandas and have it available during the flow run?
k

Kevin Kho

05/09/2022, 2:02 PM
Are you using the DaskExecutor maybe?
m

Mateo Merlo

05/09/2022, 6:12 PM
How can I check that? I installed the agent following this guide: https://gist.github.com/palewire/072513a9940478370697323c0d15c6ec An before use pandas the agent was working good
This is the
flow_test.py
import platform
import pandas as pd
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import GitHub
from prefect.run_configs import KubernetesRun
import subprocess

PREFECT_PROJECT_NAME = "testing"
FLOW_NAME = "flow_test"
AGENT_LABEL = "etl"

STORAGE = GitHub(
    repo="mateo2181/prefect-flows-test",
    path=f"flows/{FLOW_NAME}.py",
    access_token_secret="GITHUB_ACCESS_TOKEN"
)

RUN_CONFIG = KubernetesRun(labels=[AGENT_LABEL])

@task
def extract_and_load(dataset: str) -> None:
    logger = prefect.context.get("logger")
    file = f"<gs://football_transfers/transfers/{dataset}>"
    df = pd.read_csv(file)
    <http://logger.info|logger.info>("Dataset %s with %d rows loaded to DB", dataset, len(df))

@task(log_stdout=True)
def hello_world():
    print(f"Hello from {FLOW_NAME} v2!")
    print(f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}")


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    # user_input = Parameter("user_input", default="Marvin")
    hw = hello_world()
    extract_and_load("1999_dutch_eredivisie.csv")
k

Kevin Kho

05/09/2022, 6:45 PM
Ah if you are not specifying it, you are not using Dask. I guess you could use this syntax to add other libraries because the default PRefect image doesn’t have it. I could recommend making your own image (also found in the same page) because adding it here will re-download it every time the flow is run
m

Mateo Merlo

05/09/2022, 7:02 PM
Yes, I'm building an image (I shared my dockerfile above). In my
k8s.cfg
I have this line:
spec:
      containers:
      - args:
        - prefect agent kubernetes start
        image: europe-west6-docker.pkg.dev/zoomcamp-340819/prefect-agents/prefect-agents-etl:latest
If I have this, I don't need to specify the image in KubernetesRun conf, right?
k

Kevin Kho

05/09/2022, 7:05 PM
Ah I see what you mean. No that’s not right. This is the image for the container that holds the agent. But the agent will spin up a new job with another image, and this image is not carried over by default. You need to specify it in the
KubernetesRun
or you need to specify it in the job template that the agent passes here
m

Mateo Merlo

05/09/2022, 7:15 PM
Oh now I understand. So I can use the default prefect image for my Kubernetes agent and only build and push the image to use it in the new job that the agent create, right?
k

Kevin Kho

05/09/2022, 7:18 PM
Yes that’s right because the agent just needs Prefect installed in that container. You can even have that agent outside a container as long as it can submit to your k8s cluster. The point being that the agent “container” doesn’t do anything.
m

Mateo Merlo

05/09/2022, 7:24 PM
Yes, but to run the agent I need an image with prefect
Or you mean the agent outside Kubernetes?
k

Kevin Kho

05/09/2022, 7:26 PM
Maybe that statement was confusing. Yes any image with Prefect will go for agent
m

Mateo Merlo

05/09/2022, 7:41 PM
Thanks so much for your help @Kevin Kho! I really appreciate it. 😒imple_smile:
k

Kevin Kho

05/09/2022, 7:46 PM
Of course!