Sonny Nguyen
10/24/2024, 9:22 AMMarvin
10/24/2024, 9:22 AMSonny Nguyen
10/24/2024, 9:22 AMMarvin
10/24/2024, 9:23 AMprefect-snowflake
integration. Here's a basic example to help you get started:
1. Install Required Libraries: Make sure you have the snowflake-connector-python
and pandas
installed.
2. Define the Flow: You'll create a Prefect flow that uses the SnowflakeConnector
to establish a connection to Snowflake and the write_pandas
function to transfer your DataFrame to a Snowflake table.
Here's a sample code snippet:
python
import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas
@flow
def snowflake_write_pandas_flow():
connector = SnowflakeConnector.load("my-block")
with connector.get_connection() as connection:
table_name = "TABLE_NAME"
ddl = "NAME STRING, NUMBER INT"
statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
with connection.cursor() as cursor:
cursor.execute(statement)
# Example DataFrame to write
df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
# Write DataFrame to Snowflake
success, num_chunks, num_rows, _ = write_pandas(
conn=connection,
df=df,
table_name=table_name,
database=connector.database,
schema=connector.schema_
)
3. Setup Snowflake Credentials: Use the SnowflakeCredentials block in Prefect for secure access to your Snowflake account. Make sure to replace placeholders like my-block
with the actual block name or credentials you have set up.
You can find more details and examples in the Prefect 3.x Snowflake documentation.
Feel free to ask if you have any more questions or need further assistance!Sonny Nguyen
10/24/2024, 9:25 AMMarvin
10/24/2024, 9:26 AMwrite_pandas
method to upload a DataFrame to Snowflake, the table must exist in advance—it does not automatically create the table for you. Thus, you need to define the Data Definition Language (DDL) to create the table if it doesn't exist.
In the example I provided earlier, the snippet includes these lines to ensure that the table is created if it does not already exist:
python
table_name = "TABLE_NAME"
ddl = "NAME STRING, NUMBER INT"
statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
with connection.cursor() as cursor:
cursor.execute(statement)
You can specify the appropriate schema for your table based on your DataFrame's structure, replacing the NAME STRING, NUMBER INT
DDL with the actual columns and data types of your DataFrame.
Once the table is created, you can then proceed to upload your DataFrame. If you prefer not to handle this manually every time, you can prepare a function within your flow that checks for the existence of the table and creates it as needed before using write_pandas
to upload the data.
If you have any further questions or would like more detailed guidance, feel free to ask!