<@ULVA73B9P> I'm setting up the prefect-snowflake ...
# ask-marvin
s
@Marvin I'm setting up the prefect-snowflake library and so far it has been successful writing data to Snowflake but i have issues from reading the data. I have the most up to date prefect-snowflake package 0.28.0 and running the query on prefect 3.0.4 with mentioned in the example in the official docs:
Copy code
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows


if __name__=="__main__":
    block_name = 'testw'
    snowflake_flow()
What i get in response running this is that the write is successful but reading the results back is failing:
Copy code
File "/home/vscode/.local/lib/python3.12/site-packages/snowflake/connector/cursor.py", line 927, in execute
    command = command.strip(" \t\n\r") if command else None
              ^^^^^^^^^^^^^
AttributeError: 'SnowflakeCursor' object has no attribute 'strip'
m
Your message was too long by 180 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
I'm setting up the prefect-snowflake library and so far it has been successful writing data to Snowflake but i have issues from reading the data. I have the most up to date prefect-snowflake package 0.28.0 and running the query on prefect 3.0.4 with mentioned in the example in the official docs:
```from prefect import flow, task from prefect_snowflake import SnowflakeConnector @task def setup_table(block_name: str) -> None: with SnowflakeConnector.load(block_name) as connector: connector.execute( "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);" ) connector.execute_many( "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);", seq_of_parameters=[ {"name": "Ford", "address": "Highway 42"}, {"name": "Unknown", "address": "Space"}, {"name": "Me", "address": "Myway 88"}, ], ) @task def fetch_data(block_name: str) -> list: all_rows = [] with SnowflakeConnector.load(block_name) as connector: while True: # Repeated fetch* calls using the same operation will # skip re-executing and instead return the next set of results new_rows = connector.fetch_many("SELECT * FROM customers", size
s
@Marvin I'm setting up the prefect-snowflake library and so far it has been successful writing data to Snowflake but i have issues from reading the data. I have the most up to date prefect-snowflake package 0.28.0 and running the query on prefect 3.0.4 with mentioned in the example in the official docs:
m
It looks like there was an error while searching the Prefect 3.x documentation and GitHub issues. Let me correct that and try again. I'll first search the Prefect 3.x documentation with specific queries related to your issue. It looks like you're using the
prefect-snowflake
module with Prefect 3.0.4. The documentation excerpts mention using the
SnowflakeConnector
for executing SQL statements to read and write data, as well as handling connections appropriately using context managers. To resolve your issue with reading data: 1. Make sure you are using the
fetch
or
fetch_many
methods to retrieve data from Snowflake within the context manager for proper connection handling. 2. Verify that your code follows the setup similar to examples provided in the documentation: - Use
SnowflakeConnector.load("your-block-name")
to establish connections. - Ensure your SQL commands for reading data are correct. 3. Double-check that all blocks are correctly registered and that credentials are saved in a Prefect block, as mentioned in the documentation. Here's the link to the Prefect 3.x documentation on using the prefect-snowflake integration: https://docs-3.prefect.io/integrations/prefect-snowflake/index If the problem persists, consider posting your specific error messages or code snippets for more tailored assistance. Additionally, check the Prefect GitHub issues for ongoing discussions about similar issues you might be experiencing.
s
@Marvin im doing all these already: from prefect import flow, task from prefect_snowflake import SnowflakeConnector @task def setup_table(block_name: str) -> None: with SnowflakeConnector.load(block_name) as connector: connector.execute( "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);" ) connector.execute_many( "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);", seq_of_parameters=[ {"name": "Ford", "address": "Highway 42"}, {"name": "Unknown", "address": "Space"}, {"name": "Me", "address": "Myway 88"}, ], ) @task def fetch_data(block_name: str) -> list: all_rows = [] with SnowflakeConnector.load(block_name) as connector: while True: # Repeated fetch* calls using the same operation will # skip re-executing and instead return the next set of results new_rows = connector.fetch_many("SELECT * FROM customers", size=2) if len(new_rows) == 0: break all_rows.append(new_rows) return all_rows @flow def snowflake_flow(block_name: str) -> list: setup_table(block_name) all_rows = fetch_data(block_name) return all_rows if name=="__main__": block_name = 'testw' snowflake_flow()
m
Your message was too long by 22 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
im doing all these already: from prefect import flow, task
from prefect_snowflake import SnowflakeConnector @task def setup_table(block_name: str) -> None: with SnowflakeConnector.load(block_name) as connector: connector.execute( "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);" ) connector.execute_many( "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);", seq_of_parameters=[ {"name": "Ford", "address": "Highway 42"}, {"name": "Unknown", "address": "Space"}, {"name": "Me", "address": "Myway 88"}, ], ) @task def fetch_data(block_name: str) -> list: all_rows = [] with SnowflakeConnector.load(block_name) as connector: while True: # Repeated fetch* calls using the same operation will # skip re-executing and instead return the next set of results new_rows = connector.fetch_many("SELECT * FROM customers", size=2) if len(new_rows) == 0: break all_rows.append(new_rows) return all_rows @flow def snowflake_flow(block_name: str) -> list: setup_table(block_name) all_rows = fetch_data(block_name) return all_rows
s
@Marvin already doing what mentioned, it looks as if it applies the strip function on the connector object rather than the query string itself.
m
It seems that there was an issue with accessing GitHub to search for related issues. However, I can suggest some steps to try and resolve the issue you're facing: 1. Debugging: Log the query and connection objects to ensure they are properly initialized and are of the correct type. 2. Code Review: Double-check the part of your code where the query is executed. Make sure that you're not inadvertently calling methods like
strip()
on the wrong objects. 3. Latest Version: Ensure both Prefect and prefect-snowflake integrations are updated to their latest versions, as there may be bug fixes addressing this issue. If your issue is due to a bug, reporting it on Prefect's GitHub repository is a good approach. You can open a new issue with detailed information to get further assistance from the developer community. For now, you might want to manually test altering any
strip()
calls in your code to ensure they are applied to strings and not objects like connectors. If more persistent issues arise, trying the Prefect Slack community or their discourse forum might also help gather more insights.
k
Hey Sonny, I am getting the same issue:
command = command.strip(" \t\n\r") if command else None
s
hi @Keith Smith i have submitted an issue if you want to watch it!:) https://github.com/PrefectHQ/prefect/issues/15712
🙌 1
hey @Nate! Tagging you here for visibility on this issue. Do you know when it will be addressed? My team is currently blocked on this.