https://prefect.io logo
#prefect-community
Title
# prefect-community
t

Todd de Quincey

05/24/2022, 9:45 AM
Prefect 2.0 - Snowflake PUT command from local file error
When attempting to run the below task
Copy code
result = snowflake_query(
        query="put file:///data.csv @my_csv_stage",
        snowflake_credentials=SNOWFLAKE_CREDENTIALS,
)
I get the following error
Copy code
11:40:45.794 | ERROR   | Task run 'snowflake_query-eb69c8ef-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/prefect/engine.py", line 796, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/prefect_snowflake/database.py", line 68, in snowflake_query
    response = cursor.execute_async(query, params=params)
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 804, in execute_async
    return self.execute(*args, **kwargs)
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/cursor.py", line 755, in execute
    sf_file_transfer_agent.execute()
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py", line 355, in execute
    self._parse_command()
  File "/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py", line 852, in _parse_command
    response = self._ret["data"]
KeyError: 'command'
Setting a breakpoint at
/Users/todddequincey/repos/midnite/data/prefect/.venv/lib/python3.9/site-packages/snowflake/connector/file_transfer_agent.py::854
and inspecting the
response = _self_._ret["data"]
object, there is indeed no
command
object in there. Not sure if this is a user error, but haven’t been able to find much in the way of docs or examples for this. Note: I have manually checked in snowflake that the user and role has appropriate access to the stage. And inspecting the os path, the file is accessible from the code.
a

Anna Geller

05/24/2022, 10:54 AM
Could you share the full flow code showing how you use it in your flow? Hide any credentials
t

Todd de Quincey

05/24/2022, 10:57 AM
That is the full flow at this stage - I stripped everything out to isolate the problem
Copy code
from prefect_snowflake.database import snowflake_query
from prefect import flow

from util.const import AWS_CREDENTIALS, SNOWFLAKE_CREDENTIALS, S3_RAW_BUCKET


@flow
def snowflake_pipeline():
    result = snowflake_query(
        query="put file:///data.csv @my_csv_stage",
        snowflake_credentials=SNOWFLAKE_CREDENTIALS,
    )

    return None


state = snowflake_pipeline()
And replacing the query with a select or insert works fine. So I know the connection is fine
Literally just doing a spike to see if Prefect is going to meet our needs
a

Anna Geller

05/24/2022, 11:20 AM
A dumb question but is a put command considered a query by Snowflake API?
I think this might be true, based on the top answer here, you can't execute put command using a Python connector, you need to do it via SnowSQL
t

Todd de Quincey

05/24/2022, 11:23 AM
It’s listed under their list of SQL commands https://docs.snowflake.com/en/sql-reference/sql/put.html
a

Anna Geller

05/24/2022, 11:23 AM
Quote from Snowflake: "You can't execute a PUT with the python connector, you need to use SnowSQL. If you want to execute PUT through python, you'd have to create script file and then execute SnowSQL through an OS command function in python. It's not the cleanest way to go. If you use your own S3 buckets, then you have a lot more options on how to get your data from the local machine to S3 through python, which might be cleaner."
t

Todd de Quincey

05/24/2022, 11:24 AM
That’ll be it then. Thought I’d check since this connector is so new. Not an issue for prod, as we would be copying from an external S3 stage
👍 1
Apologies for wasting your time!
a

Anna Geller

05/24/2022, 11:24 AM
do you want to open an issue with feature request in the repo?
I'm sure @Andrew Huang could investigate whether we may add a task for this, this seems to be like a common pattern and quite useful (put is faster than inserts)
never wasting time discussing Prefect Collections! ask us anytime
and btw this comment is from 2019, this might have changed since then I can open an issue for you here https://github.com/PrefectHQ/prefect-snowflake/issues/new
t

Todd de Quincey

05/24/2022, 11:28 AM
Yeah S3 to {INSERT_FAVFOURITE_WH} will be a very common pattern I feel and would definitely be useful if this was pre-packaged in a task
👍 1
If I end up creating the task myself, happy to donate it back 🙂
please contribute or comment directly on the issue
6 Views