Hash Lin

    Hash Lin

    5 months ago
    Hi, I'm new to Prefect. It's fantastic to deploy pipelines by using Python code. However, I had faced some issues when deploying a pipeline to Prefect. Here's my scenario. • I want to run Prefect Agent in the K8s cluster, and I successfully plug the agent into our cluster. • I built a docker image in Gitlab CI and pushed the docker image to ECR. Then, I wrote the Dockerfile and reference from here. • I write a flow script and register to the Prefect cloud by using my local computer. Unfortunately, after running the pipeline, I got some errors: the python model was not found. As a result, I can't successfully import the module that I wrote in the project. And here's my flow script.
    Failed to load and execute flow run: ModuleNotFoundError("No module named '/Users/xxx/'")
    Thanks for helping. 🙇
    Kevin Kho

    Kevin Kho

    5 months ago
    Hey @Hash Lin, this will be a good read for that. The problem is that Prefect pulls your Flow from Storage and the default Storage if your local machine. You need to use some Storage available to the Kubernetes pod (Github, S3, etc.)
    When you get the chance, could you move the code to this thread, to keep the main channel a bit neater? Thank you!
    Hash Lin

    Hash Lin

    5 months ago
    import prefect
    from prefect import task, Flow
    from prefect.run_configs import KubernetesRun
    
    from app.job.experiment import ExperimentJob
    
    
    @task
    def process():
        logger = prefect.context.get("logger")
        experiment_job = ExperimentJob(logger)
        experiment_job.update_experiment_user_count()
    
    
    with Flow(
            "update_experiment_user_count",
            run_config=KubernetesRun(
                image_pull_policy="Always",
                labels=["k8s"],
                image="<http://xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test|xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test>",
            ),
    ) as flow:
        process()
    
    flow.register(project_name="test_project", labels=["k8s"])
    Hi @Kevin Kho, thx for your reply. I'd moved my sample flow code to the thread. I think I had a little confused with the usage of Storage in Prefect. Originally we'll put the source code inside the docker image. But, in Prefect, we don't need to put the source code in the docker image. We put the source code and the flow code in Storage, right?
    I'm trying using GitLab storage with Prefect, but I got file not found error message when running the pipeline. So here's my sample code.
    flow = Flow("my-flow")
    flow.storage = GitLab(repo="my/repo", path="/flow.py", ref="MISOCU-3337")
    https://cln.sh/GPi8xP I'm not quite sure where I got wrong here. 🤦
    I think I know the problem. I should use
    flow.py
    as path, not
    /flow.py
    . I think the document guild me in the wrong direction. 🤦
    Sorry I still facing the
    ModuleNotFoundError
    problem after I used GitLab Storage. Here's the error message and the example code.
    prefect ModuleNotFoundError: No module named 'app'
    import prefect
    from prefect import task, Flow
    from prefect.run_configs import KubernetesRun
    
    from app.job.experiment import ExperimentJob
    
    
    @task
    def process():
        logger = prefect.context.get("logger")
        experiment_job = ExperimentJob(logger)
        experiment_job.update_experiment_user_count()
    
    
    with Flow(
            "update_experiment_user_count",
            run_config=KubernetesRun(
                image_pull_policy="Always",
                labels=["k8s"],
                image="<http://xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test|xxx.dkr.ecr.us-west-2.amazonaws.com/prefect-test>",
            ),
            storage=GitLab(
                access_token_secret="gitlab-access-token",
                repo="my/repo",
                ref="MISOCU-3337",
                path="flow.py"
            ),
    ) as flow:
        process()
    
    flow.register(project_name="test_project", labels=["k8s"])
    I also create a
    setup.py
    file and define the dependency.
    from setuptools import setup, find_packages
    
    with open('requirements.txt') as f:
        requirements = f.read().splitlines()
    
    setup(
        name="flow_utilities",
        version="0.1",
        packages=find_packages(),
        install_requires=requirements,
    )
    Kevin Kho

    Kevin Kho

    5 months ago
    That is because Storage only uploads/downloads the flow file. If you need to include the dependencies, you can either use Docker storage and package them, or you can add them to the image you are passing to KubernetesRun
    Hash Lin

    Hash Lin

    5 months ago
    I fixed this problem. Thx Kevin.
    Kevin Kho

    Kevin Kho

    5 months ago
    Nice 🙂
    Tanasorn Chindasook

    Tanasorn Chindasook

    4 months ago
    @Hash Lin Hi Hash! I'm running into the same problems as you currently, could you spare some time for a chat on the solution?
    Kevin Kho

    Kevin Kho

    4 months ago
    My first response has a detailed write up on that. Did you check it?
    Tanasorn Chindasook

    Tanasorn Chindasook

    4 months ago
    @Kevin Kho yes I did! Thank you again for linking the detailed write up 🙂 I actually used the first 2 responses to make it work locally, but the response to ECSRun and GitHub storage actually pertains exactly to our use case. Must have missed it the first skim haha thank you!
    Kevin Kho

    Kevin Kho

    4 months ago
    Nice work!
    Mateo Merlo

    Mateo Merlo

    4 months ago
    I'm coming through this thread because I'm having the same issue with pandas: "Failed to load and execute flow run: ModuleNotFoundError("No module named 'pandas'")". I'm working with Github Storage and Kubernetes agent. I read the article and I have the same
    setup.py
    @Hash Lin shared above. My Dockerfile is:
    FROM prefecthq/prefect:latest-python3.9
    RUN /usr/local/bin/python -m pip install --upgrade pip
    WORKDIR /opt/prefect
    COPY src/requirements.txt .
    COPY src/setup.py .
    RUN pip install .
    I built this image, pushed to Artifact Registry in GCP, and then I have the cluster pointing this image. Should I create another file to be able to add pandas and have it available during the flow run?
    Kevin Kho

    Kevin Kho

    4 months ago
    Are you using the DaskExecutor maybe?
    Mateo Merlo

    Mateo Merlo

    4 months ago
    How can I check that? I installed the agent following this guide: https://gist.github.com/palewire/072513a9940478370697323c0d15c6ec An before use pandas the agent was working good
    This is the
    flow_test.py
    import platform
    import pandas as pd
    import prefect
    from prefect import Flow, Parameter, task
    from prefect.client.secrets import Secret
    from prefect.storage import GitHub
    from prefect.run_configs import KubernetesRun
    import subprocess
    
    PREFECT_PROJECT_NAME = "testing"
    FLOW_NAME = "flow_test"
    AGENT_LABEL = "etl"
    
    STORAGE = GitHub(
        repo="mateo2181/prefect-flows-test",
        path=f"flows/{FLOW_NAME}.py",
        access_token_secret="GITHUB_ACCESS_TOKEN"
    )
    
    RUN_CONFIG = KubernetesRun(labels=[AGENT_LABEL])
    
    @task
    def extract_and_load(dataset: str) -> None:
        logger = prefect.context.get("logger")
        file = f"<gs://football_transfers/transfers/{dataset}>"
        df = pd.read_csv(file)
        <http://logger.info|logger.info>("Dataset %s with %d rows loaded to DB", dataset, len(df))
    
    @task(log_stdout=True)
    def hello_world():
        print(f"Hello from {FLOW_NAME} v2!")
        print(f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}")
    
    
    with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
        # user_input = Parameter("user_input", default="Marvin")
        hw = hello_world()
        extract_and_load("1999_dutch_eredivisie.csv")
    Kevin Kho

    Kevin Kho

    4 months ago
    Ah if you are not specifying it, you are not using Dask. I guess you could use this syntax to add other libraries because the default PRefect image doesn’t have it. I could recommend making your own image (also found in the same page) because adding it here will re-download it every time the flow is run
    Mateo Merlo

    Mateo Merlo

    4 months ago
    Yes, I'm building an image (I shared my dockerfile above). In my
    k8s.cfg
    I have this line:
    spec:
          containers:
          - args:
            - prefect agent kubernetes start
            image: europe-west6-docker.pkg.dev/zoomcamp-340819/prefect-agents/prefect-agents-etl:latest
    If I have this, I don't need to specify the image in KubernetesRun conf, right?
    Kevin Kho

    Kevin Kho

    4 months ago
    Ah I see what you mean. No that’s not right. This is the image for the container that holds the agent. But the agent will spin up a new job with another image, and this image is not carried over by default. You need to specify it in the
    KubernetesRun
    or you need to specify it in the job template that the agent passes here
    Mateo Merlo

    Mateo Merlo

    4 months ago
    Oh now I understand. So I can use the default prefect image for my Kubernetes agent and only build and push the image to use it in the new job that the agent create, right?
    Kevin Kho

    Kevin Kho

    4 months ago
    Yes that’s right because the agent just needs Prefect installed in that container. You can even have that agent outside a container as long as it can submit to your k8s cluster. The point being that the agent “container” doesn’t do anything.
    Mateo Merlo

    Mateo Merlo

    4 months ago
    Yes, but to run the agent I need an image with prefect
    Or you mean the agent outside Kubernetes?
    Kevin Kho

    Kevin Kho

    4 months ago
    Maybe that statement was confusing. Yes any image with Prefect will go for agent
    Mateo Merlo

    Mateo Merlo

    4 months ago
    Thanks so much for your help @Kevin Kho! I really appreciate it. 😒imple_smile:
    Kevin Kho

    Kevin Kho

    4 months ago
    Of course!