Sonny Nguyen
10/15/2024, 2:40 PMfrom prefect import flow, task
from prefect_snowflake import SnowflakeConnector
@task
def setup_table(block_name: str) -> None:
with SnowflakeConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SnowflakeConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def snowflake_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__=="__main__":
block_name = 'testw'
snowflake_flow()
What i get in response running this is that the write is successful but reading the results back is failing:
File "/home/vscode/.local/lib/python3.12/site-packages/snowflake/connector/cursor.py", line 927, in execute
command = command.strip(" \t\n\r") if command else None
^^^^^^^^^^^^^
AttributeError: 'SnowflakeCursor' object has no attribute 'strip'
Marvin
10/15/2024, 2:40 PMI'm setting up the prefect-snowflake library and so far it has been successful writing data to Snowflake but i have issues from reading the data. I have the most up to date prefect-snowflake package 0.28.0 and running the query on prefect 3.0.4 with mentioned in the example in the official docs:```from prefect import flow, task from prefect_snowflake import SnowflakeConnector @task def setup_table(block_name: str) -> None: with SnowflakeConnector.load(block_name) as connector: connector.execute( "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);" ) connector.execute_many( "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);", seq_of_parameters=[ {"name": "Ford", "address": "Highway 42"}, {"name": "Unknown", "address": "Space"}, {"name": "Me", "address": "Myway 88"}, ], ) @task def fetch_data(block_name: str) -> list: all_rows = [] with SnowflakeConnector.load(block_name) as connector: while True: # Repeated fetch* calls using the same operation will # skip re-executing and instead return the next set of results new_rows = connector.fetch_many("SELECT * FROM customers", size
Sonny Nguyen
10/15/2024, 2:40 PMMarvin
10/15/2024, 2:41 PMprefect-snowflake
module with Prefect 3.0.4. The documentation excerpts mention using the SnowflakeConnector
for executing SQL statements to read and write data, as well as handling connections appropriately using context managers.
To resolve your issue with reading data:
1. Make sure you are using the fetch
or fetch_many
methods to retrieve data from Snowflake within the context manager for proper connection handling.
2. Verify that your code follows the setup similar to examples provided in the documentation:
- Use SnowflakeConnector.load("your-block-name")
to establish connections.
- Ensure your SQL commands for reading data are correct.
3. Double-check that all blocks are correctly registered and that credentials are saved in a Prefect block, as mentioned in the documentation.
Here's the link to the Prefect 3.x documentation on using the prefect-snowflake integration: https://docs-3.prefect.io/integrations/prefect-snowflake/index
If the problem persists, consider posting your specific error messages or code snippets for more tailored assistance. Additionally, check the Prefect GitHub issues for ongoing discussions about similar issues you might be experiencing.Sonny Nguyen
10/15/2024, 2:47 PMMarvin
10/15/2024, 2:47 PMim doing all these already: from prefect import flow, taskfrom prefect_snowflake import SnowflakeConnector @task def setup_table(block_name: str) -> None: with SnowflakeConnector.load(block_name) as connector: connector.execute( "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);" ) connector.execute_many( "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);", seq_of_parameters=[ {"name": "Ford", "address": "Highway 42"}, {"name": "Unknown", "address": "Space"}, {"name": "Me", "address": "Myway 88"}, ], ) @task def fetch_data(block_name: str) -> list: all_rows = [] with SnowflakeConnector.load(block_name) as connector: while True: # Repeated fetch* calls using the same operation will # skip re-executing and instead return the next set of results new_rows = connector.fetch_many("SELECT * FROM customers", size=2) if len(new_rows) == 0: break all_rows.append(new_rows) return all_rows @flow def snowflake_flow(block_name: str) -> list: setup_table(block_name) all_rows = fetch_data(block_name) return all_rows
Sonny Nguyen
10/15/2024, 2:48 PMMarvin
10/15/2024, 2:48 PMstrip()
on the wrong objects.
3. Latest Version: Ensure both Prefect and prefect-snowflake integrations are updated to their latest versions, as there may be bug fixes addressing this issue.
If your issue is due to a bug, reporting it on Prefect's GitHub repository is a good approach. You can open a new issue with detailed information to get further assistance from the developer community.
For now, you might want to manually test altering any strip()
calls in your code to ensure they are applied to strings and not objects like connectors. If more persistent issues arise, trying the Prefect Slack community or their discourse forum might also help gather more insights.Keith Smith
10/16/2024, 3:21 PMcommand = command.strip(" \t\n\r") if command else None
Sonny Nguyen
10/16/2024, 3:22 PMSonny Nguyen
10/22/2024, 11:06 AM