Hey folks :wave: I’m a bit confused with how Secre...
# prefect-community
c
Hey folks 👋 I’m a bit confused with how Secrets are behaving with this flow/config. I have production Secrets set up in Prefect Cloud, local dev Secrets in
~/.prefect/config.toml
, a Kubernetes agent, and a Flow configured with
Docker
storage. When I register the flow, it looks like my local dev credentials are packaged in the Docker image for that flow, and the flow runs deployed through Prefect Cloud, running on our Kubernetes agent, do not use the Secrets logged in Prefect Cloud — they instead use my local secrets. The only way I have found for that not to happen is to comment out or delete my local config before registering the flow. Is that expected? If so, how does one ensure that devs to not accidentally package their local credentials when registering flows?
It may also be with the way I’m using Secrets in the flow? Let me know if that isn’t the right way to do so:
Copy code
SNOWFLAKE_ACCOUNT = Secret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = Secret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = Secret("SNOWFLAKE_PASSWORD")

run_query = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT.get(),
    user=SNOWFLAKE_USER.get(),
    password=SNOWFLAKE_PASSWORD.get(),
    ...
d
Hi @Charles Lariviere! Looks like you found a place where we should update our docs just a bit. I believe what you’re looking for is a
PrefectSecret
https://docs.prefect.io/api/latest/tasks/secrets.html#secretbase
This should behave in the way you expect (i.e. the local run pulls from your
config.toml
and the Cloud-orchestrated run pulls from the secrets set in Prefect Cloud)
Also, you don’t need to call the get method on your secrets in the example above
c
Oh wow, thanks! I would not have figured this out otherwise 😅 So to confirm, I should update it like so?
Copy code
from prefect.tasks.secrets import PrefectSecret

SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")

run_query = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT.run(),
    user=SNOWFLAKE_USER.run(),
    password=SNOWFLAKE_PASSWORD.run(),
    ...
d
Copy code
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
run_query = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT,
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    ...
c
Ah, beautiful! Thanks 🙏
d
Anytime! Let us know if you have any other questions 😄
c
Oops sorry, it looks like I indeed need to pass
.run()
here? I get the following error otherwise 🤔
Copy code
Unexpected error: TypeError("argument of type 'PrefectSecret' is not iterable",)
Which is solved when I add
.run()
to those variables (i.e.
SNOWFLAKE_ACCOUNT.run()
)
It may be something else in my setup that’s not right then?
d
Hmmm
Can you share your flow code & storage config?
c
Absolutely, this is a toy example as I try to learn Prefect and get us onboarded:
Copy code
import os
from prefect import Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.snowflake import SnowflakeQuery


FLOW_NAME = os.path.splitext(os.path.basename(__file__))[0]     # filename
PROJECT_NAME = "PROJECT_NAME"
REGISTRY_URL = PrefectSecret("ECR_REGISTRY_URL").run()

config = {
    "executor": LocalExecutor(),
    "run_config": KubernetesRun(),
    "storage": Docker(
        registry_url=REGISTRY_URL,
        python_dependencies=["prefect[snowflake]==0.14.1"],
        image_tag="latest"
    )
}

SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT").run()
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER").run()
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD").run()


query = """
    SHOW DATABASES;
"""

show_databases = SnowflakeQuery(
    account=SNOWFLAKE_ACCOUNT,
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    database="RAW",
    schema="PUBLIC",
    # role="TRANSFORMER",
    # warehouse="TRANSFORMING",
    query=query
)


with Flow(FLOW_NAME, **config) as flow:
    show_databases()


if __name__ == '__main__':
    # flow.register(project_name=PROJECT_NAME)
    flow.run()
My
config.toml
looks like this:
Copy code
[cloud]
use_local_secrets = false

[context.secrets]
ECR_REGISTRY_URL = ""
SNOWFLAKE_ACCOUNT = ""
SNOWFLAKE_USER = ""
SNOWFLAKE_PASSWORD = ""
and FWIW, toggling
use_local_secrets
between
true
or
false
does not seem to have any effect on the flow above. Regardless of what the
use_local_secrets
value is, as soon as my
config.toml
has those secrets, they are used when running
flow.run()
.
d
So you’re running into an issue with the difference between flow build time and flow execution time
I’m making some quick changes to this flow I’ll explain in just a moment
but basically
calling
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT").run()
is setting the value permanently at flow build time
which is why your local secrets are being used to run when the run is in cloud
Task execution inside the Flow context block is deferred
Copy code
import os
from prefect import Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.snowflake import SnowflakeQuery

# FLOW_NAME = os.path.splitext(os.path.basename(__file__))[0]  # filename
PROJECT_NAME = "PROJECT_NAME"
REGISTRY_URL = PrefectSecret("ECR_REGISTRY_URL").run()
config = {
    "executor": LocalExecutor(),
    "run_config": KubernetesRun(),
    "storage": Docker(
        registry_url=REGISTRY_URL,
        python_dependencies=["prefect[snowflake]==0.14.1"],
        image_tag="latest",
    ),
}

query = """
    SHOW DATABASES;
"""

show_databases = SnowflakeQuery(
    database="RAW",
    schema="PUBLIC",
    # role="TRANSFORMER",
    # warehouse="TRANSFORMING",
    query=query,
)

with Flow("FLOW_NAME", **config) as flow:
    SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
    SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
    SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
    show_databases(
        account=SNOWFLAKE_ACCOUNT,
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
    )

if __name__ == "__main__":
    # flow.register(project_name=PROJECT_NAME)
    flow.run()
So I am having a little trouble with the snowflake extra at the moment
but
Basically, in order for the task execution to be deferred to runtime, you’ll want to make sure you’re passing the secrets inside the flow context block, like so
When you call
.run()
on the task, you’re executing that task immediately
Does that make sense?
c
Ah interesting! I was having a hard time finding the “best practice” way to handle secrets for something like that. Then the updated flow you shared is the “right” way to handle passing something like database credentials for a task?
d
Yes 👍
c
Awesome, thanks @Dylan! 🙏 If I may, a quick example like the above would be absolutely amazing in the docs — as a newcomer, I was having a hard time figuring this one out.
Appreciate the help with this 🙏
d
Anytime!
Definitely could use a doc update, I’ll open an issue 👍
c
Blah, I’m so sorry but it still doesn’t work 😬 I’m getting the following when trying to run or register 🤔
Copy code
Traceback (most recent call last):
  File "flows/examples/snowflake-example.py", line 34, in <module>
    query=query
  File "/Users/charleslariviere/opt/anaconda3/envs/analytics/lib/python3.6/site-packages/prefect/core/task.py", line 157, in init
    old_init(self, *args, **kwargs)
TypeError: __init__() missing 2 required positional arguments: 'account' and 'user'
So it sounds like the
SnowflakeQuery
task expects to have the credentials when initiated 🤔
d
Ah yes
hmm
I can’t remember if you can define and instantiate a task inside the Flow context block
Are those credentials different between your local environment and production?
Or are you communicating with the same snowflake db?
c
It’s the same Snowflake db, but we’re using different credentials per dev and per tool (i.e. Prefect Cloud)
d
Gotcha
c
I tried instantiating the task in the Flow block and it won’t work either
So I take it there’s no good way to use Prefect’s Secrets here? It sounds like such a typical use case though
d
So you have a couple of options here
You can set the secrets in a different way (with environment variables for example) and set different ones in your storage object
You can also write your own Task that can handle deferred creation of the Snowflake client to use secrets in this way
I tend to do this even if we have tasks in the task library since I usually want to customize how something works anyway
I’ll open an issue for an improvement to this particular task
c
Ahhh I see, so essentially to make the
SnowflakeQuery
task work with Prefect Secrets as you described above, it should expect to receive credentials in the Task class’
run()
method instead of
__init__()
. Looking at other examples of built-in Prefect tasks, that’s how most handle authentication. For example; JiraTask gets the
username
and
access_token
not during initialization but instead in the
run()
method
But then the PostgresExecute task also expects credentials/secrets to be passed during initialization and not in the
run()
method — meaning it also won’t work with Prefect Secrets? I’m honestly quite a bit confused with this. Are these simply artifacts of a deprecated way that Secrets previously worked?
d
Hey @Charles Lariviere, Tasks in the Task library are often written by contributors outside of Prefect. I’m not sure if we have a centralized scheme to authentication for them
However, we definitely should
c
Ahh that makes a ton of sense, thanks! 👍
I went on to re-write the
SnowflakeQuery
task in my project to adjust the authentication. However, running into issues including this custom module in my Docker storage.
z
To use your custom module in your docker storage you’ll need to add the files and pip install it. e.g.
Copy code
flow.storage = Docker(
    files={"path/to/my_module": "my_module"},
    extra_dockerfile_commands=["RUN pip install -e my_module}"],
)
d
@Marvin open “Some Tasks Incompatible with Secrets”
c
Thanks @Zanie! I tried leveraging the
files
option in the
Docker()
storage and formatted my project as a package package, but still getting
Copy code
/opt/prefect/healthcheck.py:147: UserWarning: Flow uses module which is not importable.
Are there any examples that I could go off of, apart from the docs on
Docker()
storage which I’ve tried to follow?
z
Sorry, I edited my comment. Did you
pip install
your module like I show?
Alternatively you can add an environment variable so it’s on the PYTHONPATH
c
Ah interesting! I tried the following:
Copy code
config = {
    "executor": LocalExecutor(),
    "run_config": KubernetesRun(),
    "storage": Docker(
        registry_url=REGISTRY_URL,
        python_dependencies=["prefect[snowflake]==0.14.1"],
        files={
            # absolute path source -> destination in image
            os.getcwd() + "/workflows/tasks/snowflake.py": "snowflake.py",
        },
        extra_dockerfile_commands=["RUN pip install -e snowflake.py}"],
        image_tag="latest",
        secrets=["SNOWFLAKE_ACCOUNT"]
    )
}
Getting the following error:
Copy code
ERROR: snowflake.py is not a valid editable requirement. It should either be a path to a local project or a VCS URL (beginning with svn+, git+, hg+, or bzr+).
I attached how my project is structured 🤔
z
So here you’d copy in your entire
workflows
module (since that has the
setup.py
) and then
pip install -e workflows
c
Aaaaaah I see! I got confused since the docs referred to specific files, but that makes sense and seems to work as my Docker image is building now 🙌
success! 🙌 Yall rock! Thanks @Dylan and @Zanie 🙏
🙌 1