Hi, what’s the correct way to use EnvVarSecret within a Flow, currently my code is below, but it tells me the task has not been added to the flow. If I don’t call it with
.run()
it complains that it is not JSON serializable:
Copy code
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
snowflake_password=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True).run()
....
upload_results = SnowflakeQuery(
user=SNOWFLAKE_USER,
password=snowflake_password,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA).map(query=queries)
k
Kevin Kho
05/04/2022, 5:14 PM
If you do:
Copy code
snow = SnowflakeQuery()
with Flow(...) as flow:
snow(...)
the first is the
init
and the second is the
run
. The init is done during registration time and the
run
is done during flow execution time.
Even if you do
Copy code
with Flow(...) as flow:
snow = SnowflakeQuery()
snow(...)
the
init
is done during build time.
You are getting an
EnvVarSecret
which materializes at runtime. And it seems you are using
run()
to force it to apply during the
init
time?
Kevin Kho
05/04/2022, 5:14 PM
You can also do:
Copy code
with Flow(...) as flow:
snow = SnowflakeQuery(init_stuff_here)(run_stuff_here)
j
Jonathan Mathews
05/04/2022, 5:26 PM
Thanks Kevin. So could I define everything else at init time and then pass in the password at run time?
Jonathan Mathews
05/04/2022, 5:27 PM
That way I can use EnvVarSecret
Jonathan Mathews
05/04/2022, 5:29 PM
I tried this but it’s still not picking up the password:
Copy code
snowflake_query_task_init = SnowflakeQuery(
user=SNOWFLAKE_USER,
account=SNOWFLAKE_ACCOUNT,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
role=SNOWFLAKE_ROLE,
schema=SNOWFLAKE_SCHEMA,)
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init(password=SNOWFLAKE_PASSWORD).map(query=queries)
k
Kevin Kho
05/04/2022, 5:31 PM
It would be
Copy code
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
upload_results = snowflake_query_task_init.map(password=unmapped(SNOWFLAKE_PASSWORD),query=queries)
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.