Aaron Pickering
01/18/2022, 11:45 AM"Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n NameError("name \'err\' is not defined")')"
The task itself is straightforward, it looks like this:
snowsql_obj = SnowflakeQueriesFromFile(account=SNOWFLAKE_ACCOUNT, user=SNOWFLAKE_USER, password=SNOWFLAKE_PWD, file_path="../../sql/amplitude_raw.sql")
snowsql_obj.run()
Anna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.snowflake import SnowflakeQueriesFromFile
from prefect.tasks.secrets import PrefectSecret
snowflake_query = SnowflakeQueriesFromFile(
database="DEV", schema="jaffle_shop", autocommit=True, file_path="sample_query.sql"
)
@task(log_stdout=True)
def print_data(x):
print(x) # prints a list of tuples
with Flow("snowflake_tasks_example") as flow:
account = PrefectSecret("SNOWFLAKE_ACCOUNT_ID")
user = PrefectSecret("SNOWFLAKE_USER")
pwd = PrefectSecret("SNOWFLAKE_PASS")
role = PrefectSecret("SNOWFLAKE_ROLE")
warehouse = Parameter("SNOWFLAKE_WAREHOUSE", default="COMPUTE_WH")
data = snowflake_query(
account=account, user=user, password=pwd, role=role, warehouse=warehouse,
)
print_data(data)
if __name__ == "__main__":
flow.run()
Aaron Pickering
01/18/2022, 12:05 PMAnna Geller
Aaron Pickering
01/18/2022, 12:53 PM