Adi Gandra
01/24/2022, 4:52 PMflow.storage = Docker(registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
image_name="y",
image_tag='latest')
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})
# flow.run()
flow.register(project_name="SA-Test", build=False)
I want to build this manually as part of our CI/CD and push it to the ECR. That part is done. Now when I want to run the prefect flow, the kub pod gets the image but then I get an error:
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage',)
Is there anything special I need to do in my Dockerfile or when I build this image to allow it to be successfully used without building inside PrefectKevin Kho
flow.register(build=False)
2. Point the Docker storage to the file where the Flow lives. Add path=…
and stored_as_script=True
Adi Gandra
01/24/2022, 5:19 PMflow.storage = Docker( registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
image_name="y",
image_tag='latest',
path="/usr/src/app/flow.py",
stored_as_script=True)
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})
flow.register(project_name="SA-Test", build=False)
Kevin Kho
Adi Gandra
01/24/2022, 5:48 PM# pull official base image
FROM python:3.6-bullseye
# set work directory
WORKDIR /usr/src/app
# set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# install dependencies
RUN pip install --upgrade pip
COPY ./requirements.txt .
RUN pip install -r requirements.txt
# copy project
COPY . /usr/src/app
Anna Geller
docker_storage.add_flow(flow)
See this for a full exampleAdi Gandra
01/24/2022, 6:19 PMdocker_storage = Docker( registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
image_name="y",
image_tag='latest',
path="/usr/src/app/flow.py",
stored_as_script=True)
flow.storage = docker_storage
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})
docker_storage.add_flow(flow)
# flow.run()
flow.register(project_name="SA-Test", build=False)
I still get the same error. Do I need to pass storage into the Flow() constructor also?Anna Geller
Kevin Kho
<http://flow.xxx|flow.xxx>
syntax should workAdi Gandra
01/24/2022, 6:25 PMKevin Kho
Adi Gandra
01/24/2022, 6:52 PMfrom Flow_Retailer_Download.download_feeds import flow_task_download_feeds
Anna Geller
Kevin Kho
Adi Gandra
01/24/2022, 7:14 PMAnna Geller
Adi Gandra
01/24/2022, 7:33 PMFailed to load and execute Flow's environment: ValueError('Required parameter name not set',)
Is there a way to get more information on the error, so I can figure out how to debug. I wonder if this is due to env variables not being setKevin Kho
Adi Gandra
01/24/2022, 7:37 PMKevin Kho
flow.run()
?Adi Gandra
01/24/2022, 7:39 PMKevin Kho
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return x
Adi Gandra
01/24/2022, 8:17 PMINFO
DaskExecutor
Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
Beginning Flow run for 'X-ETL'
Kevin Kho
Adi Gandra
01/25/2022, 12:14 AMAnna Geller
kubectl get nodes
30 mins is definitely too long, maybe you can check in the AWS console if everything is OK with your clusterSarah Floris
03/15/2022, 10:19 PM