https://prefect.io logo
Title
j

Jonathan Mathews

05/04/2022, 5:07 PM
Hi, what’s the correct way to use EnvVarSecret within a Flow, currently my code is below, but it tells me the task has not been added to the flow. If I don’t call it with
.run()
it complains that it is not JSON serializable:
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
    snowflake_password=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True).run()
....
    upload_results = SnowflakeQuery(
        user=SNOWFLAKE_USER,
        password=snowflake_password,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        role=SNOWFLAKE_ROLE,
        schema=SNOWFLAKE_SCHEMA).map(query=queries)
k

Kevin Kho

05/04/2022, 5:14 PM
If you do:
snow = SnowflakeQuery()
with Flow(...) as flow:
    snow(...)
the first is the
init
and the second is the
run
. The init is done during registration time and the
run
is done during flow execution time. Even if you do
with Flow(...) as flow:
    snow = SnowflakeQuery()
    snow(...)
the
init
is done during build time. You are getting an
EnvVarSecret
which materializes at runtime. And it seems you are using
run()
to force it to apply during the
init
time?
You can also do:
with Flow(...) as flow:
    snow = SnowflakeQuery(init_stuff_here)(run_stuff_here)
j

Jonathan Mathews

05/04/2022, 5:26 PM
Thanks Kevin. So could I define everything else at init time and then pass in the password at run time?
That way I can use EnvVarSecret
I tried this but it’s still not picking up the password:
snowflake_query_task_init = SnowflakeQuery(
        user=SNOWFLAKE_USER,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        role=SNOWFLAKE_ROLE,
        schema=SNOWFLAKE_SCHEMA,)

with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
    SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
    upload_results = snowflake_query_task_init(password=SNOWFLAKE_PASSWORD).map(query=queries)
k

Kevin Kho

05/04/2022, 5:31 PM
It would be
with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
    SNOWFLAKE_PASSWORD=EnvVarSecret(name='SNOWFLAKE_PASSWORD', raise_if_missing=True)
    upload_results = snowflake_query_task_init.map(password=unmapped(SNOWFLAKE_PASSWORD),query=queries)
Where unmapped is imported from Prefect
j

Jonathan Mathews

05/04/2022, 8:24 PM
Excellent. Works, thanks a lot!