https://prefect.io logo
Title
d

Daniel

06/19/2022, 3:44 AM
Dear Prefect Community, I have written some custom python to extract an load data into a snowflake database. It works well from my local machine but attempts to orchestrate these pipelines with Prefect cloud fail. I have followed @Anna Geller 's brilliant article on using prefect and AWS ACS Fargate as a serverless pipeline solution using the
prefecthq/prefect:latest-python3.10
docker image with some additional packages including the snowflake python connector and it's dependencies. Flows without snowflake interactions work perfectly when run from Prefect cloud however my EL flows which insert data into snowflake tables fail to load and execute returning >ModuleNotFoundError("No module named 'snowflake'"). Haven't been able to fine any similar reports among the community so wondering if someone could suggest what I may have done wrong. With thanks, Daniel.
k

Kevin Kho

06/19/2022, 3:55 AM
The base prefect image does not have all of the task library packages installed. You can do something like this to install it or make your own image on top of the base Prefect image
a

Anna Geller

06/19/2022, 12:19 PM
Hi Daniel, looks like Snowflake hasn't been installed correctly in your image, as Kevin pointed out. Could you share your storage and run configuration? Did you build a custom image and if so, can you share your Dockerfile?
d

Daniel

06/22/2022, 8:53 AM
Thanks Anna and Kevin! So sorry for this late reply, I've been offline after having to say goodbye to our 10.5 year old dog 😢 on Monday afternoon (Sydney time). Feeling awkward sharing this with you but did so to offer an explanation for not responding to your prompt replies sooner. I really did appreciate it. Yes, Anna, I did attempt to install snowflake and all it's official dependencies, as instructed by snowflake. The resulting requirements.txt file is:
asn1crypto==1.5.1
boto3==1.24.7
certifi==2022.5.18.1
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==36.0.2
datetime==4.4
idna==3.3
oscrypto==1.3.0
pycparser==2.21
pycryptodomex==3.14.1
PyJWT==2.4.0
pyOpenSSL==22.0.0
pytz==2022.1
requests==2.28.0
urllib3==1.26.9
snowflake-connector-python==2.7.8
Docker file, builds on your image:
# syntax=docker/dockerfile:1
FROM prefecthq/prefect:latest-python3.10
RUN pip install --upgrade pip
COPY requirements.txt .
RUN pip install -r requirements.txt
Storage and run configurations:
FLOW_NAME = "PoC_refresh_medidata_snfk"

STORAGE = S3(
    bucket="XXXXXXXXX",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"{FLOW_NAME}.py",
)
RUN_CONFIG = ECSRun(
    labels=["prod"],
    task_role_arn="arn:aws:iam::XXXXXXXXXXXX:role/prefectTaskRole",
    run_task_kwargs=dict(cluster="prefectEcsCluster", launchType="FARGATE",),
)


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    ETL_Medidata(select="keyword", keyword="Daniel")

if __name__ == "__main__":
    flow.register("tester")
Thanks again, Daniel.
k

Kevin Kho

06/22/2022, 1:41 PM
Sorry about that Daniel 😞 . We absolutely don’t mind the late responses. We’re just around. I see the Dockerfile but if you don’t specify an image in ECSRun, it will just pull the latest prefect base image, which doesn’t have snowflake installed. How do you specify what container to use?
d

Daniel

06/23/2022, 1:13 PM
Silly me! I was under the impression that the image was specified in the ECS Cluster task definition. Adding it to the ECS Run within the flow did the trick! Thanks Kevin.