https://prefect.io logo
c

Charles Lariviere

01/07/2021, 4:20 PM
Hey folks 👋 I’m a bit confused with how Secrets are behaving with this flow/config. I have production Secrets set up in Prefect Cloud, local dev Secrets in
~/.prefect/config.toml
, a Kubernetes agent, and a Flow configured with
Docker
storage. When I register the flow, it looks like my local dev credentials are packaged in the Docker image for that flow, and the flow runs deployed through Prefect Cloud, running on our Kubernetes agent, do not use the Secrets logged in Prefect Cloud — they instead use my local secrets. The only way I have found for that not to happen is to comment out or delete my local config before registering the flow. Is that expected? If so, how does one ensure that devs to not accidentally package their local credentials when registering flows?
It may also be with the way I’m using Secrets in the flow? Let me know if that isn’t the right way to do so:
Copy code
SNOWFLAKE_ACCOUNT = Secret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = Secret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = Secret("SNOWFLAKE_PASSWORD")

run_query = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT.get(),
    user=SNOWFLAKE_USER.get(),
    password=SNOWFLAKE_PASSWORD.get(),
    ...
d

Dylan

01/07/2021, 4:23 PM
Hi @Charles Lariviere! Looks like you found a place where we should update our docs just a bit. I believe what you’re looking for is a
PrefectSecret
https://docs.prefect.io/api/latest/tasks/secrets.html#secretbase
This should behave in the way you expect (i.e. the local run pulls from your
config.toml
and the Cloud-orchestrated run pulls from the secrets set in Prefect Cloud)
Also, you don’t need to call the get method on your secrets in the example above
c

Charles Lariviere

01/07/2021, 4:27 PM
Oh wow, thanks! I would not have figured this out otherwise 😅 So to confirm, I should update it like so?
Copy code
from prefect.tasks.secrets import PrefectSecret

SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")

run_query = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT.run(),
    user=SNOWFLAKE_USER.run(),
    password=SNOWFLAKE_PASSWORD.run(),
    ...
d

Dylan

01/07/2021, 4:28 PM
Copy code
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
run_query = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT,
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    ...
c

Charles Lariviere

01/07/2021, 4:28 PM
Ah, beautiful! Thanks 🙏
d

Dylan

01/07/2021, 4:29 PM
Anytime! Let us know if you have any other questions 😄
c

Charles Lariviere

01/07/2021, 4:33 PM
Oops sorry, it looks like I indeed need to pass
.run()
here? I get the following error otherwise 🤔
Copy code
Unexpected error: TypeError("argument of type 'PrefectSecret' is not iterable",)
Which is solved when I add
.run()
to those variables (i.e.
SNOWFLAKE_ACCOUNT.run()
)
It may be something else in my setup that’s not right then?
d

Dylan

01/07/2021, 4:34 PM
Hmmm
Can you share your flow code & storage config?
c

Charles Lariviere

01/07/2021, 4:36 PM
Absolutely, this is a toy example as I try to learn Prefect and get us onboarded:
Copy code
import os
from prefect import Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.snowflake import SnowflakeQuery


FLOW_NAME = os.path.splitext(os.path.basename(__file__))[0]     # filename
PROJECT_NAME = "PROJECT_NAME"
REGISTRY_URL = PrefectSecret("ECR_REGISTRY_URL").run()

config = {
    "executor": LocalExecutor(),
    "run_config": KubernetesRun(),
    "storage": Docker(
        registry_url=REGISTRY_URL,
        python_dependencies=["prefect[snowflake]==0.14.1"],
        image_tag="latest"
    )
}

SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT").run()
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER").run()
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD").run()


query = """
    SHOW DATABASES;
"""

show_databases = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT,
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    database="RAW",
    schema="PUBLIC",
    # role="TRANSFORMER",
    # warehouse="TRANSFORMING",
    query=query
)


with Flow(FLOW_NAME, **config) as flow:
    show_databases()


if __name__ == '__main__':
    # flow.register(project_name=PROJECT_NAME)
    flow.run()
My
config.toml
looks like this:
Copy code
[cloud]
use_local_secrets = false

[context.secrets]
ECR_REGISTRY_URL = ""
SNOWFLAKE_ACCOUNT = ""
SNOWFLAKE_USER = ""
SNOWFLAKE_PASSWORD = ""
and FWIW, toggling
use_local_secrets
between
true
or
false
does not seem to have any effect on the flow above. Regardless of what the
use_local_secrets
value is, as soon as my
config.toml
has those secrets, they are used when running
flow.run()
.
d

Dylan

01/07/2021, 4:51 PM
So you’re running into an issue with the difference between flow build time and flow execution time
I’m making some quick changes to this flow I’ll explain in just a moment
but basically
calling
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT").run()
is setting the value permanently at flow build time
which is why your local secrets are being used to run when the run is in cloud
Task execution inside the Flow context block is deferred
Copy code
import os
from prefect import Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.snowflake import SnowflakeQuery

# FLOW_NAME = os.path.splitext(os.path.basename(__file__))[0]  # filename
PROJECT_NAME = "PROJECT_NAME"
REGISTRY_URL = PrefectSecret("ECR_REGISTRY_URL").run()
config = {
    "executor": LocalExecutor(),
    "run_config": KubernetesRun(),
    "storage": Docker(
        registry_url=REGISTRY_URL,
        python_dependencies=["prefect[snowflake]==0.14.1"],
        image_tag="latest",
    ),
}

query = """
    SHOW DATABASES;
"""

show_databases = SnowflakeQuery(
    database="RAW",
    schema="PUBLIC",
    # role="TRANSFORMER",
    # warehouse="TRANSFORMING",
    query=query,
)

with Flow("FLOW_NAME", **config) as flow:
    SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
    SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
    SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
    show_databases(
        account=SNOWFLAKE_ACCOUNT,
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
    )

if __name__ == "__main__":
    # flow.register(project_name=PROJECT_NAME)
    flow.run()
So I am having a little trouble with the snowflake extra at the moment
but
Basically, in order for the task execution to be deferred to runtime, you’ll want to make sure you’re passing the secrets inside the flow context block, like so
When you call
.run()
on the task, you’re executing that task immediately
Does that make sense?
c

Charles Lariviere

01/07/2021, 4:58 PM
Ah interesting! I was having a hard time finding the “best practice” way to handle secrets for something like that. Then the updated flow you shared is the “right” way to handle passing something like database credentials for a task?
d

Dylan

01/07/2021, 4:59 PM
Yes 👍
c

Charles Lariviere

01/07/2021, 5:00 PM
Awesome, thanks @Dylan! 🙏 If I may, a quick example like the above would be absolutely amazing in the docs — as a newcomer, I was having a hard time figuring this one out.
Appreciate the help with this 🙏
d

Dylan

01/07/2021, 5:00 PM
Anytime!
Definitely could use a doc update, I’ll open an issue 👍
c

Charles Lariviere

01/07/2021, 5:05 PM
Blah, I’m so sorry but it still doesn’t work 😬 I’m getting the following when trying to run or register 🤔
Copy code
Traceback (most recent call last):
  File "flows/examples/snowflake-example.py", line 34, in <module>
    query=query
  File "/Users/charleslariviere/opt/anaconda3/envs/analytics/lib/python3.6/site-packages/prefect/core/task.py", line 157, in init
    old_init(self, *args, **kwargs)
TypeError: __init__() missing 2 required positional arguments: 'account' and 'user'
So it sounds like the
SnowflakeQuery
task expects to have the credentials when initiated 🤔
d

Dylan

01/07/2021, 5:33 PM
Ah yes
hmm
I can’t remember if you can define and instantiate a task inside the Flow context block
Are those credentials different between your local environment and production?
Or are you communicating with the same snowflake db?
c

Charles Lariviere

01/07/2021, 5:35 PM
It’s the same Snowflake db, but we’re using different credentials per dev and per tool (i.e. Prefect Cloud)
d

Dylan

01/07/2021, 5:35 PM
Gotcha
c

Charles Lariviere

01/07/2021, 5:35 PM
I tried instantiating the task in the Flow block and it won’t work either
So I take it there’s no good way to use Prefect’s Secrets here? It sounds like such a typical use case though
d

Dylan

01/07/2021, 5:36 PM
So you have a couple of options here
You can set the secrets in a different way (with environment variables for example) and set different ones in your storage object
You can also write your own Task that can handle deferred creation of the Snowflake client to use secrets in this way
I tend to do this even if we have tasks in the task library since I usually want to customize how something works anyway
I’ll open an issue for an improvement to this particular task
c

Charles Lariviere

01/07/2021, 6:50 PM
Ahhh I see, so essentially to make the
SnowflakeQuery
task work with Prefect Secrets as you described above, it should expect to receive credentials in the Task class’
run()
method instead of
__init__()
. Looking at other examples of built-in Prefect tasks, that’s how most handle authentication. For example; JiraTask gets the
username
and
access_token
not during initialization but instead in the
run()
method
But then the PostgresExecute task also expects credentials/secrets to be passed during initialization and not in the
run()
method — meaning it also won’t work with Prefect Secrets? I’m honestly quite a bit confused with this. Are these simply artifacts of a deprecated way that Secrets previously worked?
d

Dylan

01/07/2021, 9:25 PM
Hey @Charles Lariviere, Tasks in the Task library are often written by contributors outside of Prefect. I’m not sure if we have a centralized scheme to authentication for them
However, we definitely should
c

Charles Lariviere

01/07/2021, 9:25 PM
Ahh that makes a ton of sense, thanks! 👍
I went on to re-write the
SnowflakeQuery
task in my project to adjust the authentication. However, running into issues including this custom module in my Docker storage.
z

Zanie

01/07/2021, 9:27 PM
To use your custom module in your docker storage you’ll need to add the files and pip install it. e.g.
Copy code
flow.storage = Docker(
    files={"path/to/my_module": "my_module"},
    extra_dockerfile_commands=["RUN pip install -e my_module}"],
)
d

Dylan

01/07/2021, 9:29 PM
@Marvin open “Some Tasks Incompatible with Secrets”
c

Charles Lariviere

01/07/2021, 9:33 PM
Thanks @Zanie! I tried leveraging the
files
option in the
Docker()
storage and formatted my project as a package package, but still getting
Copy code
/opt/prefect/healthcheck.py:147: UserWarning: Flow uses module which is not importable.
Are there any examples that I could go off of, apart from the docs on
Docker()
storage which I’ve tried to follow?
z

Zanie

01/07/2021, 9:34 PM
Sorry, I edited my comment. Did you
pip install
your module like I show?
Alternatively you can add an environment variable so it’s on the PYTHONPATH
c

Charles Lariviere

01/07/2021, 9:39 PM
Ah interesting! I tried the following:
Copy code
config = {
    "executor": LocalExecutor(),
    "run_config": KubernetesRun(),
    "storage": Docker(
        registry_url=REGISTRY_URL,
        python_dependencies=["prefect[snowflake]==0.14.1"],
        files={
            # absolute path source -> destination in image
            os.getcwd() + "/workflows/tasks/snowflake.py": "snowflake.py",
        },
        extra_dockerfile_commands=["RUN pip install -e snowflake.py}"],
        image_tag="latest",
        secrets=["SNOWFLAKE_ACCOUNT"]
    )
}
Getting the following error:
Copy code
ERROR: snowflake.py is not a valid editable requirement. It should either be a path to a local project or a VCS URL (beginning with svn+, git+, hg+, or bzr+).
I attached how my project is structured 🤔
z

Zanie

01/07/2021, 9:40 PM
So here you’d copy in your entire
workflows
module (since that has the
setup.py
) and then
pip install -e workflows
c

Charles Lariviere

01/07/2021, 9:43 PM
Aaaaaah I see! I got confused since the docs referred to specific files, but that makes sense and seems to work as my Docker image is building now 🙌
success! 🙌 Yall rock! Thanks @Dylan and @Zanie 🙏
🙌 1