Jonathan Mathews
05/04/2022, 5:07 PM.run() it complains that it is not JSON serializable:
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
snowflake_password=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True).run()
....
upload_results = SnowflakeQuery(
user=SNOWFLAKE_USER,
password=snowflake_password,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA).map(query=queries)Kevin Kho
snow = SnowflakeQuery()
with Flow(...) as flow:
snow(...)
the first is the init and the second is the run. The init is done during registration time and the run is done during flow execution time.
Even if you do
with Flow(...) as flow:
snow = SnowflakeQuery()
snow(...)
the init is done during build time.
You are getting an EnvVarSecret which materializes at runtime. And it seems you are using run() to force it to apply during the init time?Kevin Kho
with Flow(...) as flow:
snow = SnowflakeQuery(init_stuff_here)(run_stuff_here)Jonathan Mathews
05/04/2022, 5:26 PMJonathan Mathews
05/04/2022, 5:27 PMJonathan Mathews
05/04/2022, 5:29 PMsnowflake_query_task_init = SnowflakeQuery(
user=SNOWFLAKE_USER,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA,)
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init(password=SNOWFLAKE_PASSWORD).map(query=queries)Kevin Kho
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init.map(password=unmapped(SNOWFLAKE_PASSWORD),query=queries)Kevin Kho
Jonathan Mathews
05/04/2022, 8:24 PM