Jonathan Mathews
05/04/2022, 5:07 PM.run()
it complains that it is not JSON serializable:
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
snowflake_password=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True).run()
....
upload_results = SnowflakeQuery(
user=SNOWFLAKE_USER,
password=snowflake_password,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA).map(query=queries)
Kevin Kho
snow = SnowflakeQuery()
with Flow(...) as flow:
snow(...)
the first is the init
and the second is the run
. The init is done during registration time and the run
is done during flow execution time.
Even if you do
with Flow(...) as flow:
snow = SnowflakeQuery()
snow(...)
the init
is done during build time.
You are getting an EnvVarSecret
which materializes at runtime. And it seems you are using run()
to force it to apply during the init
time?Kevin Kho
with Flow(...) as flow:
snow = SnowflakeQuery(init_stuff_here)(run_stuff_here)
Jonathan Mathews
05/04/2022, 5:26 PMJonathan Mathews
05/04/2022, 5:27 PMJonathan Mathews
05/04/2022, 5:29 PMsnowflake_query_task_init = SnowflakeQuery(
user=SNOWFLAKE_USER,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA,)
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init(password=SNOWFLAKE_PASSWORD).map(query=queries)
Kevin Kho
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init.map(password=unmapped(SNOWFLAKE_PASSWORD),query=queries)
Kevin Kho
Jonathan Mathews
05/04/2022, 8:24 PM