Srujan Jabbireddy
05/21/2024, 2:00 PM@flow(name="Push Records to Database", log_prints=True)
def github_pipeline():
 
    try:
        # connector = SnowflakeConnector.load(SNOWFLAKE_BLOCK_NAME)
        with SnowflakeConnector.load(SNOWFLAKE_BLOCK_NAME) as conn:
            
            # Execute the query
            # cursor = conn.cursor()
            print('creating cursor')
            conn.execute("TRUNCATE TABLE GITHUB_DATA;")
            print('execute the query')
            # Commit the transaction
            # connection.commit()
            # conn.close()Marvin
05/21/2024, 2:00 PMpython
from prefect import flow
@flow
def always_fails_flow():
    raise ValueError("This flow immediately fails")
always_fails_flow()Failedtrypython
from prefect import flow
from prefect.engine import signals
@flow(name="Push Records to Database", log_prints=True)
def github_pipeline():
 
    try:
        with SnowflakeConnector.load(SNOWFLAKE_BLOCK_NAME) as conn:
            print('creating cursor')
            conn.execute("TRUNCATE TABLE GITHUB_DATA;")
            print('execute the query')
    except Exception as e:
        print('An error occurred:', e)
        # You can either directly raise the original exception or use Prefect signals.
        # raise e  # Re-raises the original exception, marking the flow as failed.
        raise signals.FAIL('Task failed due to an error: ' + str(e))  # Uses a Prefect signal to fail the task.Srujan Jabbireddy
05/21/2024, 2:03 PMMarvin
05/21/2024, 2:04 PMexecutepython
from prefect import flow, task
from snowflake.connector import ProgrammingError
from your_package import SnowflakeConnector  # Replace with your actual import
@task
def execute_sql(sql: str):
    try:
        with SnowflakeConnector.load("your_snowflake_connection_block_name") as conn:
            with conn.cursor() as cursor:
                cursor.execute(sql)
                # If the execution is successful, you can get the number of affected rows if necessary
                affected_rows = cursor.rowcount
                print(f"The SQL statement was executed successfully, {affected_rows} rows affected.")
    except ProgrammingError as e:
        # This example error handling simply prints and re-raises the error, 
        # but you could handle it in a way that's appropriate for your use case.
        print(f"An error occurred during SQL statement execution: {e}")
        raise
@flow(name="Run SQL in Snowflake")
def run_sql_flow():
    sql_statement = "TRUNCATE TABLE GITHUB_DATA;"
    execute_sql(sql_statement)
run_sql_flow()execute_sqlexecuteProgrammingErrorexceptSnowflakeConnectorSrujan Jabbireddy
05/21/2024, 2:10 PMMarvin
05/21/2024, 2:10 PMImportError("cannot import name 'signals' from 'prefect.engine'signalsprefect.engineprefect.engine.signalsFAILSKIPprefect.exceptionsPAUSEFAILpython
from prefect import flow
from prefect.exceptions import FAIL
@flow
def failing_flow():
    # ... some code ...
    if error_condition:
        raise FAIL("An error occurred, stopping the flow")Srujan Jabbireddy
05/21/2024, 2:22 PMMarvin
05/21/2024, 2:23 PM'SnowflakeConnector' object has no attribute 'cursor'cursorSnowflakeConnectorSnowflakeConnectorSnowflakeConnectorsnowflake-connector-pythonsnowflake-connector-pythonpython
import snowflake.connector
# Connect to Snowflake
conn = snowflake.connector.connect(
    user='<user>',
    password='<password>',
    account='<account>'
)
# Create a cursor object
cursor = conn.cursor()SnowflakeConnectorSnowflakeConnectorprefect-snowflake