Thread
#prefect-community
    a

    Adi Gandra

    8 months ago
    Is it possible to build the docker image ourselves that prefect uses to execute the code? For instance I have:
    flow.storage = Docker(registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                             image_name="y",
                             image_tag='latest')
    flow.executor = DaskExecutor()
    flow.run_config = KubernetesRun(env={})
    
    # flow.run()
    flow.register(project_name="SA-Test", build=False)
    I want to build this manually as part of our CI/CD and push it to the ECR. That part is done. Now when I want to run the prefect flow, the kub pod gets the image but then I get an error:
    Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage',)
    Is there anything special I need to do in my Dockerfile or when I build this image to allow it to be successfully used without building inside Prefect
    Kevin Kho

    Kevin Kho

    8 months ago
    Hi @Adi Gandra, two things:1. Do
    flow.register(build=False)
    2. Point the Docker storage to the file where the Flow lives. Add
    path=…
    and
    stored_as_script=True
    a

    Adi Gandra

    8 months ago
    flow.storage = Docker(  registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                            image_name="y",
                            image_tag='latest',
                            path="/usr/src/app/flow.py",
                            stored_as_script=True)
    flow.executor = DaskExecutor()
    flow.run_config = KubernetesRun(env={})
    
    flow.register(project_name="SA-Test", build=False)
    so this would be what i want, if flow.py was my file i defined all this in
    i built that, and i’m still getting the same error
    Kevin Kho

    Kevin Kho

    8 months ago
    Yes and if the flow lived in that path. Would you be able to share your Dockerfile? This looks right
    a

    Adi Gandra

    8 months ago
    # pull official base image
    FROM python:3.6-bullseye
    # set work directory
    WORKDIR /usr/src/app
    
    # set environment variables
    ENV PYTHONDONTWRITEBYTECODE 1
    ENV PYTHONUNBUFFERED 1
    
    # install dependencies
    RUN pip install --upgrade pip
    COPY ./requirements.txt .
    RUN pip install -r requirements.txt
    
    # copy project
    COPY . /usr/src/app
    so i’m building the image with docker image build . -t x:latest
    push the image to ECR
    docker run -v $(pwd)😕usr/src/app --rm -it --env-file ./.env x bash
    then i bash in
    and run python flow.py
    and then i kick off the flow from prefect cloud
    Anna Geller

    Anna Geller

    8 months ago
    You are very close, only one line is missing to add the flow object to storage:
    docker_storage.add_flow(flow)
    See this for a full example
    a

    Adi Gandra

    8 months ago
    Hmm, i’m not sure that worked still.
    docker_storage = Docker(  registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                            image_name="y",
                            image_tag='latest',
                            path="/usr/src/app/flow.py",
                            stored_as_script=True)
    
    
    flow.storage = docker_storage
    flow.executor = DaskExecutor()
    flow.run_config = KubernetesRun(env={})
    docker_storage.add_flow(flow)
    # flow.run()
    flow.register(project_name="SA-Test", build=False)
    I still get the same error. Do I need to pass storage into the Flow() constructor also?
    Or does flow.storage, flow.executor, flow.run_config work?
    Anna Geller

    Anna Geller

    8 months ago
    ideally you can set it up as in the example because I’m sure the definition as in this example has worked
    Kevin Kho

    Kevin Kho

    8 months ago
    Yes though that the
    <http://flow.xxx|flow.xxx>
    syntax should work
    a

    Adi Gandra

    8 months ago
    Changing it to match the example, and trying again
    So I think that fixed one problem, but now i’m getting failed to load and execute Flow’s environment: ModuleNotFoundError(“flow_retailer_download”)
    Kevin Kho

    Kevin Kho

    8 months ago
    That looks like you have a custom module that is not importable at the default path where the Flow runs. It would help to pip install that custom module
    a

    Adi Gandra

    8 months ago
    hmm it’s just another file that i’m importing
    from Flow_Retailer_Download.download_feeds import flow_task_download_feeds
    Anna Geller

    Anna Geller

    8 months ago
    the same repo I shared before has an example of how you can package your dependencies
    Kevin Kho

    Kevin Kho

    8 months ago
    Yes but that import is dependent on where you are running the script from so you either need to add that path to PYTHONPATH or pip install the custom module to make it importable from any location
    a

    Adi Gandra

    8 months ago
    Quick question while I play around with this, do people usually have different flow files for local development vs deploys? Like for the kubernetes run config, i want that to be used for my prod workflows. But while i’m working locally - should I just setup another local_flow.py file and just do flow.run(). What are the normal conventions?
    Anna Geller

    Anna Geller

    8 months ago
    you could e.g. comment that out for local development or setup a function like this one
    a

    Adi Gandra

    8 months ago
    So making slow progress, thanks for all the help walking me through this.
    Failed to load and execute Flow's environment: ValueError('Required parameter name not set',)
    Is there a way to get more information on the error, so I can figure out how to debug. I wonder if this is due to env variables not being set
    Kevin Kho

    Kevin Kho

    8 months ago
    I think that is the real error message. It looks like you have a Parameter in your Flow that was not set properly?
    a

    Adi Gandra

    8 months ago
    hmm its possible, anyway to get like a line number or something? or file the error originated from
    Kevin Kho

    Kevin Kho

    8 months ago
    Ah I see what you mean. Is this ran with an agent or using
    flow.run()
    ?
    a

    Adi Gandra

    8 months ago
    an agent
    Kevin Kho

    Kevin Kho

    8 months ago
    You can try doing this to propagate the error:
    def custom_task(func=None, **task_init_kwargs):
        if func is None:
            return partial(custom_task, **task_init_kwargs)
    
        @wraps(func)
        def safe_func(**kwargs):
            try:
                return func(**kwargs)
            except Exception as e:
                print(f"Full Traceback: {traceback.format_exc()}")
                raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace
    
        safe_func.__name__ = func.__name__
        return task(safe_func, **task_init_kwargs)
    
    @custom_task
    def abc(x):
        return x
    a

    Adi Gandra

    8 months ago
    So, got that fixed. Finally running!
    My flow keeps looping through
    INFO
    DaskExecutor
    Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
    Pulling the image and then restarting
    its been about 30 minutes now of waiting
    none of the actual tasks kick off
    Beginning Flow run for 'X-ETL'
    and then goes into creating a new dask cluster, and then repeating
    my docker pods, go from container creating, running to terminating and then a new one appearing and restarting
    does it just take a long time for this to kick off? Docker Image is like 2GB
    Kevin Kho

    Kevin Kho

    8 months ago
    It shouldn’t be that long. Your Kubernetes jobs should have logs that indicate why they failed. This sounds like what happens when the Flow can’t get compute you request from Kubernetes. If you want a LocalCluster, maybe use the LocalDaskExecutor?
    a

    Adi Gandra

    8 months ago
    I’m running this on EKS though, using the prefect agent install kubernetes. Mainly just trying to follow along: https://towardsdatascience.com/distributed-data-pipelines-made-easy-with-aws-eks-and-prefect-106984923b30
    Anna Geller

    Anna Geller

    8 months ago
    You need to be patient with Fargate 😄 it’s super easy to use and cheap but the latency is non-negligible (it can take minutes to spin up compute). You can check if AWS Fargate provisioned compute for your workload doing:
    kubectl get nodes
    30 mins is definitely too long, maybe you can check in the AWS console if everything is OK with your cluster
    s

    Sarah Floris

    6 months ago
    @Adi Gandra try to look in the log files in s3 or in cloudwatch.
    Or try to just rerun the task.. I've had the ECS tasks just die for no obvious reason.
    (and it would still say it was running btw)