Jonathan Mathews
05/04/2022, 5:07 PM.run()
it complains that it is not JSON serializable:
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
snowflake_password=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True).run()
....
upload_results = SnowflakeQuery(
user=SNOWFLAKE_USER,
password=snowflake_password,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA).map(query=queries)
Kevin Kho
05/04/2022, 5:14 PMsnow = SnowflakeQuery()
with Flow(...) as flow:
snow(...)
the first is the init
and the second is the run
. The init is done during registration time and the run
is done during flow execution time.
Even if you do
with Flow(...) as flow:
snow = SnowflakeQuery()
snow(...)
the init
is done during build time.
You are getting an EnvVarSecret
which materializes at runtime. And it seems you are using run()
to force it to apply during the init
time?with Flow(...) as flow:
snow = SnowflakeQuery(init_stuff_here)(run_stuff_here)
Jonathan Mathews
05/04/2022, 5:26 PMsnowflake_query_task_init = SnowflakeQuery(
user=SNOWFLAKE_USER,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA,)
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init(password=SNOWFLAKE_PASSWORD).map(query=queries)
Kevin Kho
05/04/2022, 5:31 PMwith Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init.map(password=unmapped(SNOWFLAKE_PASSWORD),query=queries)
Jonathan Mathews
05/04/2022, 8:24 PM