Is it possible to build the docker image ourselves...
# prefect-community
a
Is it possible to build the docker image ourselves that prefect uses to execute the code? For instance I have:
Copy code
flow.storage = Docker(registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                         image_name="y",
                         image_tag='latest')
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})

# flow.run()
flow.register(project_name="SA-Test", build=False)
I want to build this manually as part of our CI/CD and push it to the ECR. That part is done. Now when I want to run the prefect flow, the kub pod gets the image but then I get an error:
Copy code
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage',)
Is there anything special I need to do in my Dockerfile or when I build this image to allow it to be successfully used without building inside Prefect
k
Hi @Adi Gandra, two things: 1. Do
flow.register(build=False)
2. Point the Docker storage to the file where the Flow lives. Add
path=…
and
stored_as_script=True
a
Copy code
flow.storage = Docker(  registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                        image_name="y",
                        image_tag='latest',
                        path="/usr/src/app/flow.py",
                        stored_as_script=True)
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})

flow.register(project_name="SA-Test", build=False)
so this would be what i want, if flow.py was my file i defined all this in
i built that, and i’m still getting the same error
k
Yes and if the flow lived in that path. Would you be able to share your Dockerfile? This looks right
a
Copy code
# pull official base image
FROM python:3.6-bullseye
# set work directory
WORKDIR /usr/src/app

# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

# install dependencies
RUN pip install --upgrade pip
COPY ./requirements.txt .
RUN pip install -r requirements.txt

# copy project
COPY . /usr/src/app
so i’m building the image with docker image build . -t x:latest
push the image to ECR
docker run -v $(pwd):/usr/src/app --rm -it --env-file ./.env x bash
then i bash in
and run python flow.py
and then i kick off the flow from prefect cloud
a
You are very close, only one line is missing to add the flow object to storage:
Copy code
docker_storage.add_flow(flow)
See this for a full example
a
Hmm, i’m not sure that worked still.
Copy code
docker_storage = Docker(  registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                        image_name="y",
                        image_tag='latest',
                        path="/usr/src/app/flow.py",
                        stored_as_script=True)


flow.storage = docker_storage
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})
docker_storage.add_flow(flow)
# flow.run()
flow.register(project_name="SA-Test", build=False)
I still get the same error. Do I need to pass storage into the Flow() constructor also?
Or does flow.storage, flow.executor, flow.run_config work?
a
ideally you can set it up as in the example because I’m sure the definition as in this example has worked
👍 1
k
Yes though that the
<http://flow.xxx|flow.xxx>
syntax should work
a
Changing it to match the example, and trying again
So I think that fixed one problem, but now i’m getting failed to load and execute Flow’s environment: ModuleNotFoundError(“flow_retailer_download”)
k
That looks like you have a custom module that is not importable at the default path where the Flow runs. It would help to pip install that custom module
upvote 1
a
hmm it’s just another file that i’m importing
Copy code
from Flow_Retailer_Download.download_feeds import flow_task_download_feeds
a
the same repo I shared before has an example of how you can package your dependencies
k
Yes but that import is dependent on where you are running the script from so you either need to add that path to PYTHONPATH or pip install the custom module to make it importable from any location
a
Quick question while I play around with this, do people usually have different flow files for local development vs deploys? Like for the kubernetes run config, i want that to be used for my prod workflows. But while i’m working locally - should I just setup another local_flow.py file and just do flow.run(). What are the normal conventions?
a
you could e.g. comment that out for local development or setup a function like this one
👍 1
a
So making slow progress, thanks for all the help walking me through this.
Copy code
Failed to load and execute Flow's environment: ValueError('Required parameter name not set',)
Is there a way to get more information on the error, so I can figure out how to debug. I wonder if this is due to env variables not being set
k
I think that is the real error message. It looks like you have a Parameter in your Flow that was not set properly?
a
hmm its possible, anyway to get like a line number or something? or file the error originated from
k
Ah I see what you mean. Is this ran with an agent or using
flow.run()
?
a
an agent
k
You can try doing this to propagate the error:
Copy code
def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)

@custom_task
def abc(x):
    return x
👍 1
a
So, got that fixed. Finally running!
My flow keeps looping through
Copy code
INFO
DaskExecutor
Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
Pulling the image and then restarting
its been about 30 minutes now of waiting
none of the actual tasks kick off
Copy code
Beginning Flow run for 'X-ETL'
and then goes into creating a new dask cluster, and then repeating
my docker pods, go from container creating, running to terminating and then a new one appearing and restarting
does it just take a long time for this to kick off? Docker Image is like 2GB
k
It shouldn’t be that long. Your Kubernetes jobs should have logs that indicate why they failed. This sounds like what happens when the Flow can’t get compute you request from Kubernetes. If you want a LocalCluster, maybe use the LocalDaskExecutor?
a
I’m running this on EKS though, using the prefect agent install kubernetes. Mainly just trying to follow along: https://towardsdatascience.com/distributed-data-pipelines-made-easy-with-aws-eks-and-prefect-106984923b30
a
You need to be patient with Fargate 😄 it’s super easy to use and cheap but the latency is non-negligible (it can take minutes to spin up compute). You can check if AWS Fargate provisioned compute for your workload doing:
Copy code
kubectl get nodes
30 mins is definitely too long, maybe you can check in the AWS console if everything is OK with your cluster
s
@Adi Gandra try to look in the log files in s3 or in cloudwatch.
Or try to just rerun the task.. I've had the ECS tasks just die for no obvious reason.
(and it would still say it was running btw)