Felipe Saldana
03/04/2021, 7:40 PMaurora_user_val = EnvVarSecret("AURORA_USERNAME", raise_if_missing=True).run()
aurora_pass_val = EnvVarSecret("AURORA_PASSWORD", raise_if_missing=True).run()
aurora_host_val = EnvVarSecret("AURORA_HOST", raise_if_missing=True).run()
with Flow("test_vars") as flow:Zanie
Zanie
from prefect import Flow, task
from prefect.tasks.secrets.env_var import EnvVarSecret
@task
def show_shell(shell):
    print("Shh, it's secret. This is my shell: {}".format(shell))
with Flow("env-secret-passing") as flow:
    shell = EnvVarSecret("SHELL")
    # Pass the secret in, it'll be resolved to a value at runtime
    show_shell(shell)
    # Task doesn't care if it's a secret or a normal value passed in
    show_shell("my-fake-shell")
flow.run()Felipe Saldana
03/04/2021, 8:01 PMFelipe Saldana
03/04/2021, 11:00 PMclass MyGenericTask(Task):
    def __init__(self, auroraUser, auroraPass, auroraHost, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.auroraUser = auroraUser
        self.auroraPass = auroraPass
        self.auroraHost = auroraHost
    def _do_work(self, views: list) -> None:
        i = 0
    def run(self, different_per_run):
        <http://logger.info|logger.info>(f'user: {self.auroraUser}')
        <http://logger.info|logger.info>(f'pass: {self.auroraPass}')
        <http://logger.info|logger.info>(f'host: {self.auroraHost}')
        <http://logger.info|logger.info>(f'different_per_run: {different_per_run}')
        self._do_work(different_per_run)
with Flow("test_vars") as flow:
    aurora_user_val = EnvVarSecret("AURORA_USERNAME", raise_if_missing=True)
    aurora_pass_val = EnvVarSecret("AURORA_PASSWORD", raise_if_missing=True)
    aurora_host_val = EnvVarSecret("AURORA_HOST", raise_if_missing=True)
    refresh_views = MyGenericTask(
        auroraUser=aurora_user_val,
        auroraPass=aurora_pass_val,
        auroraHost=aurora_host_val,
        name="refresh_views"
    )
    refresh_views.bind(different_per_run="testing")Felipe Saldana
03/05/2021, 12:05 AMFelipe Saldana
03/05/2021, 3:33 PMZanie
import os
from prefect import Flow, task, Task
from prefect.tasks.secrets.env_var import EnvVarSecret
@task
def show_shell(shell):
    print("Shh, it's secret. This is my shell: {}".format(shell))
class ShowShell(Task):
    def __init__(self, secret_name: str = "SHELL", **kwargs):
        self.secret_name = secret_name
        super().__init__(**kwargs)
    def run(self):
        # You can use the EnvVarSecret here
        shell = EnvVarSecret(self.secret_name).run()
        # but really there's no reason to not just pull it from the env 
        # shell = os.environ.get(self.secret_name)
        print("Shh, it's secret. This is my shell: {}".format(shell))
# Initialize some subclass style tasks with configuration
show_shell_task = ShowShell("SHELL")
show_fake_shell_task = ShowShell("FAKE_SHELL")
with Flow("env-secret-passing") as flow:
    shell = EnvVarSecret("SHELL")
    # Pass the secret in, it'll be resolved to a value at runtime
    show_shell(shell)
    # Task doesn't care if it's a secret or a normal value passed in
    show_shell("my-fake-shell")
    # Run the subclass style tasks in our flow
    show_shell_task()
    show_fake_shell_task()
# Set the FAKE_SHELL env var before runtime
os.environ["FAKE_SHELL"] = "my-fake-shell"
flow.run()Zanie
.run()Felipe Saldana
03/05/2021, 4:14 PMdef run(self):
        # You can use the EnvVarSecret here
        shell = EnvVarSecret(self.secret_name).run()
        # but really there's no reason to not just pull it from the env 
        # shell = os.environ.get(self.secret_name)
        print("Shh, it's secret. This is my shell: {}".format(shell))Zanie
EnvVarSecret@taskos.environ.get(...)Felipe Saldana
03/05/2021, 4:22 PMAdam
03/09/2021, 7:02 PMEnvVarSecretPostgresFetchos.environ.getEnvVarSecretimport psycopg2 as pg
from prefect import Task
from prefect.tasks.secrets import EnvVarSecret
from prefect.utilities.tasks import defaults_from_attrs
from psycopg2.extras import NamedTupleCursor
from sable_batch.utils.sql import read_sql
class PostgresQuery(Task):
    def __init__(self, query: str = None, sql_file: str = None, **kwargs):
        self.query = query
        self.sql_file = sql_file
        super().__init__(**kwargs)
    def run(
        self,
    ):
        if not self.query and not self.sql_file:
            raise ValueError("A query string or path must be provided")
        if not self.query and self.sql_file:
            self.query = read_sql(self.sql_file)
        pg_user = EnvVarSecret("POSTGRES_USER").run()
        pg_password = EnvVarSecret("POSTGRES_PASSWORD").run()
        pg_host = EnvVarSecret("POSTGRES_HOST").run()
        conn = pg.connect(
            dbname="xxx",
            user=pg_user,
            password=pg_password,
            host=pg_host,
            port=5432,
        )
        try:
            with conn, conn.cursor(cursor_factory=NamedTupleCursor) as cursor:
                cursor.execute(query=self.query)
                records = cursor.fetchall()
                return records
        finally:
            conn.close()Zanie
os.environAdam
03/09/2021, 7:27 PM