Felipe Saldana
03/04/2021, 7:40 PMaurora_user_val = EnvVarSecret("AURORA_USERNAME", raise_if_missing=True).run()
aurora_pass_val = EnvVarSecret("AURORA_PASSWORD", raise_if_missing=True).run()
aurora_host_val = EnvVarSecret("AURORA_HOST", raise_if_missing=True).run()
with Flow("test_vars") as flow:
Zanie
from prefect import Flow, task
from prefect.tasks.secrets.env_var import EnvVarSecret
@task
def show_shell(shell):
print("Shh, it's secret. This is my shell: {}".format(shell))
with Flow("env-secret-passing") as flow:
shell = EnvVarSecret("SHELL")
# Pass the secret in, it'll be resolved to a value at runtime
show_shell(shell)
# Task doesn't care if it's a secret or a normal value passed in
show_shell("my-fake-shell")
flow.run()
Felipe Saldana
03/04/2021, 8:01 PMclass MyGenericTask(Task):
def __init__(self, auroraUser, auroraPass, auroraHost, *args, **kwargs):
super().__init__(*args, **kwargs)
self.auroraUser = auroraUser
self.auroraPass = auroraPass
self.auroraHost = auroraHost
def _do_work(self, views: list) -> None:
i = 0
def run(self, different_per_run):
<http://logger.info|logger.info>(f'user: {self.auroraUser}')
<http://logger.info|logger.info>(f'pass: {self.auroraPass}')
<http://logger.info|logger.info>(f'host: {self.auroraHost}')
<http://logger.info|logger.info>(f'different_per_run: {different_per_run}')
self._do_work(different_per_run)
with Flow("test_vars") as flow:
aurora_user_val = EnvVarSecret("AURORA_USERNAME", raise_if_missing=True)
aurora_pass_val = EnvVarSecret("AURORA_PASSWORD", raise_if_missing=True)
aurora_host_val = EnvVarSecret("AURORA_HOST", raise_if_missing=True)
refresh_views = MyGenericTask(
auroraUser=aurora_user_val,
auroraPass=aurora_pass_val,
auroraHost=aurora_host_val,
name="refresh_views"
)
refresh_views.bind(different_per_run="testing")
Zanie
import os
from prefect import Flow, task, Task
from prefect.tasks.secrets.env_var import EnvVarSecret
@task
def show_shell(shell):
print("Shh, it's secret. This is my shell: {}".format(shell))
class ShowShell(Task):
def __init__(self, secret_name: str = "SHELL", **kwargs):
self.secret_name = secret_name
super().__init__(**kwargs)
def run(self):
# You can use the EnvVarSecret here
shell = EnvVarSecret(self.secret_name).run()
# but really there's no reason to not just pull it from the env
# shell = os.environ.get(self.secret_name)
print("Shh, it's secret. This is my shell: {}".format(shell))
# Initialize some subclass style tasks with configuration
show_shell_task = ShowShell("SHELL")
show_fake_shell_task = ShowShell("FAKE_SHELL")
with Flow("env-secret-passing") as flow:
shell = EnvVarSecret("SHELL")
# Pass the secret in, it'll be resolved to a value at runtime
show_shell(shell)
# Task doesn't care if it's a secret or a normal value passed in
show_shell("my-fake-shell")
# Run the subclass style tasks in our flow
show_shell_task()
show_fake_shell_task()
# Set the FAKE_SHELL env var before runtime
os.environ["FAKE_SHELL"] = "my-fake-shell"
flow.run()
.run()
on a task while defining your flow. Task runs are meant to be deferred and if you run them beforehand then it'll be confusing when they contain the wrong values. Best practice for subclass-style tasks using secrets is to pass the name of the secret to the task init then retrieve the secret at runtime.Felipe Saldana
03/05/2021, 4:14 PMdef run(self):
# You can use the EnvVarSecret here
shell = EnvVarSecret(self.secret_name).run()
# but really there's no reason to not just pull it from the env
# shell = os.environ.get(self.secret_name)
print("Shh, it's secret. This is my shell: {}".format(shell))
Zanie
EnvVarSecret
is to automatically convert an environment value to a secret as in the non-subclass @task
way that I showed first. If you're not using it like that, it makes more sense to just use os.environ.get(...)
in my opinon.Felipe Saldana
03/05/2021, 4:22 PMAdam
03/09/2021, 7:02 PMEnvVarSecret
all the credentials and pass them in to PostgresFetch
(we also prefer the NamedTupleCursor) so we created the task below.
Should we rather use os.environ.get
instead of using EnvVarSecret
as we’re doing below:
import psycopg2 as pg
from prefect import Task
from prefect.tasks.secrets import EnvVarSecret
from prefect.utilities.tasks import defaults_from_attrs
from psycopg2.extras import NamedTupleCursor
from sable_batch.utils.sql import read_sql
class PostgresQuery(Task):
def __init__(self, query: str = None, sql_file: str = None, **kwargs):
self.query = query
self.sql_file = sql_file
super().__init__(**kwargs)
def run(
self,
):
if not self.query and not self.sql_file:
raise ValueError("A query string or path must be provided")
if not self.query and self.sql_file:
self.query = read_sql(self.sql_file)
pg_user = EnvVarSecret("POSTGRES_USER").run()
pg_password = EnvVarSecret("POSTGRES_PASSWORD").run()
pg_host = EnvVarSecret("POSTGRES_HOST").run()
conn = pg.connect(
dbname="xxx",
user=pg_user,
password=pg_password,
host=pg_host,
port=5432,
)
try:
with conn, conn.cursor(cursor_factory=NamedTupleCursor) as cursor:
cursor.execute(query=self.query)
records = cursor.fetchall()
return records
finally:
conn.close()
Zanie
os.environ
instead of using a Prefect utility that's designed to be used differently.Adam
03/09/2021, 7:27 PM