Charles Lariviere
01/07/2021, 4:20 PM~/.prefect/config.toml
, a Kubernetes agent, and a Flow configured with Docker
storage.
When I register the flow, it looks like my local dev credentials are packaged in the Docker image for that flow, and the flow runs deployed through Prefect Cloud, running on our Kubernetes agent, do not use the Secrets logged in Prefect Cloud — they instead use my local secrets. The only way I have found for that not to happen is to comment out or delete my local config before registering the flow.
Is that expected? If so, how does one ensure that devs to not accidentally package their local credentials when registering flows?SNOWFLAKE_ACCOUNT = Secret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = Secret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = Secret("SNOWFLAKE_PASSWORD")
run_query = SnowflakeQuery(
account=SNOWFLAKE_ACCOUNT.get(),
user=SNOWFLAKE_USER.get(),
password=SNOWFLAKE_PASSWORD.get(),
...
Dylan
PrefectSecret
https://docs.prefect.io/api/latest/tasks/secrets.html#secretbaseconfig.toml
and the Cloud-orchestrated run pulls from the secrets set in Prefect Cloud)Charles Lariviere
01/07/2021, 4:27 PMfrom prefect.tasks.secrets import PrefectSecret
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
run_query = SnowflakeQuery(
account=SNOWFLAKE_ACCOUNT.run(),
user=SNOWFLAKE_USER.run(),
password=SNOWFLAKE_PASSWORD.run(),
...
Dylan
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
run_query = SnowflakeQuery(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
...
Charles Lariviere
01/07/2021, 4:28 PMDylan
Charles Lariviere
01/07/2021, 4:33 PM.run()
here? I get the following error otherwise 🤔
Unexpected error: TypeError("argument of type 'PrefectSecret' is not iterable",)
Which is solved when I add .run()
to those variables (i.e. SNOWFLAKE_ACCOUNT.run()
)Dylan
Charles Lariviere
01/07/2021, 4:36 PMimport os
from prefect import Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.snowflake import SnowflakeQuery
FLOW_NAME = os.path.splitext(os.path.basename(__file__))[0] # filename
PROJECT_NAME = "PROJECT_NAME"
REGISTRY_URL = PrefectSecret("ECR_REGISTRY_URL").run()
config = {
"executor": LocalExecutor(),
"run_config": KubernetesRun(),
"storage": Docker(
registry_url=REGISTRY_URL,
python_dependencies=["prefect[snowflake]==0.14.1"],
image_tag="latest"
)
}
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT").run()
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER").run()
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD").run()
query = """
SHOW DATABASES;
"""
show_databases = SnowflakeQuery(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
database="RAW",
schema="PUBLIC",
# role="TRANSFORMER",
# warehouse="TRANSFORMING",
query=query
)
with Flow(FLOW_NAME, **config) as flow:
show_databases()
if __name__ == '__main__':
# flow.register(project_name=PROJECT_NAME)
flow.run()
config.toml
looks like this:
[cloud]
use_local_secrets = false
[context.secrets]
ECR_REGISTRY_URL = ""
SNOWFLAKE_ACCOUNT = ""
SNOWFLAKE_USER = ""
SNOWFLAKE_PASSWORD = ""
and FWIW, toggling use_local_secrets
between true
or false
does not seem to have any effect on the flow above. Regardless of what the use_local_secrets
value is, as soon as my config.toml
has those secrets, they are used when running flow.run()
.Dylan
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT").run()
is setting the value permanently at flow build timeimport os
from prefect import Flow
from prefect.executors import LocalExecutor
from prefect.run_configs import KubernetesRun
from prefect.storage import Docker
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.snowflake import SnowflakeQuery
# FLOW_NAME = os.path.splitext(os.path.basename(__file__))[0] # filename
PROJECT_NAME = "PROJECT_NAME"
REGISTRY_URL = PrefectSecret("ECR_REGISTRY_URL").run()
config = {
"executor": LocalExecutor(),
"run_config": KubernetesRun(),
"storage": Docker(
registry_url=REGISTRY_URL,
python_dependencies=["prefect[snowflake]==0.14.1"],
image_tag="latest",
),
}
query = """
SHOW DATABASES;
"""
show_databases = SnowflakeQuery(
database="RAW",
schema="PUBLIC",
# role="TRANSFORMER",
# warehouse="TRANSFORMING",
query=query,
)
with Flow("FLOW_NAME", **config) as flow:
SNOWFLAKE_ACCOUNT = PrefectSecret("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = PrefectSecret("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = PrefectSecret("SNOWFLAKE_PASSWORD")
show_databases(
account=SNOWFLAKE_ACCOUNT,
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
)
if __name__ == "__main__":
# flow.register(project_name=PROJECT_NAME)
flow.run()
.run()
on the task, you’re executing that task immediatelyCharles Lariviere
01/07/2021, 4:58 PMDylan
Charles Lariviere
01/07/2021, 5:00 PMDylan
Charles Lariviere
01/07/2021, 5:05 PMTraceback (most recent call last):
File "flows/examples/snowflake-example.py", line 34, in <module>
query=query
File "/Users/charleslariviere/opt/anaconda3/envs/analytics/lib/python3.6/site-packages/prefect/core/task.py", line 157, in init
old_init(self, *args, **kwargs)
TypeError: __init__() missing 2 required positional arguments: 'account' and 'user'
SnowflakeQuery
task expects to have the credentials when initiated 🤔Dylan
Charles Lariviere
01/07/2021, 5:35 PMDylan
Charles Lariviere
01/07/2021, 5:35 PMDylan
Charles Lariviere
01/07/2021, 6:50 PMSnowflakeQuery
task work with Prefect Secrets as you described above, it should expect to receive credentials in the Task class’ run()
method instead of __init__()
. Looking at other examples of built-in Prefect tasks, that’s how most handle authentication. For example; JiraTask gets the username
and access_token
not during initialization but instead in the run()
methodrun()
method — meaning it also won’t work with Prefect Secrets? I’m honestly quite a bit confused with this. Are these simply artifacts of a deprecated way that Secrets previously worked?Dylan
Charles Lariviere
01/07/2021, 9:25 PMSnowflakeQuery
task in my project to adjust the authentication. However, running into issues including this custom module in my Docker storage.Zanie
flow.storage = Docker(
files={"path/to/my_module": "my_module"},
extra_dockerfile_commands=["RUN pip install -e my_module}"],
)
Dylan
Marvin
01/07/2021, 9:29 PMCharles Lariviere
01/07/2021, 9:33 PMfiles
option in the Docker()
storage and formatted my project as a package package, but still getting
/opt/prefect/healthcheck.py:147: UserWarning: Flow uses module which is not importable.
Are there any examples that I could go off of, apart from the docs on Docker()
storage which I’ve tried to follow?Zanie
pip install
your module like I show?Charles Lariviere
01/07/2021, 9:39 PMconfig = {
"executor": LocalExecutor(),
"run_config": KubernetesRun(),
"storage": Docker(
registry_url=REGISTRY_URL,
python_dependencies=["prefect[snowflake]==0.14.1"],
files={
# absolute path source -> destination in image
os.getcwd() + "/workflows/tasks/snowflake.py": "snowflake.py",
},
extra_dockerfile_commands=["RUN pip install -e snowflake.py}"],
image_tag="latest",
secrets=["SNOWFLAKE_ACCOUNT"]
)
}
Getting the following error:
ERROR: snowflake.py is not a valid editable requirement. It should either be a path to a local project or a VCS URL (beginning with svn+, git+, hg+, or bzr+).
I attached how my project is structured 🤔Zanie
workflows
module (since that has the setup.py
) and then pip install -e workflows
Charles Lariviere
01/07/2021, 9:43 PM