https://prefect.io logo
s

Santiago Gonzalez

11/12/2021, 8:44 PM
Hey. How can I create a custom implementation of `PrefectSecret`that accept a `Parameter`as an argument? is that possible?
a

Anna Geller

11/12/2021, 8:47 PM
could you describe your use case a bit more to understand how would you combine parameter with Secret? is it that you get the secret name as parameter?
k

Kevin Kho

11/12/2021, 8:47 PM
I was gonna link you to the other thread but I realized it was you also lol. I think you can try
Copy code
with Flow(...) as flow:
    x = Parameter("x")
    name = create_secret_name(x)
    s1 = PrefectSecet(name)
upvote 1
s

Santiago Gonzalez

11/12/2021, 8:49 PM
Copy code
class IDMappingDBSecret(PrefectSecret):
    environment: str

    def __init__(self, name=None, env=None, **kwargs):
        self.environment = env
        super().__init__(name, **kwargs)

    def run(self, name: str = None):
        logger = prefect.context.get("logger")
        name: str = name or self.secret_name
        if self.environment == "dev":
            name = f'dev_{name}'
        <http://logger.info|logger.info>(f'Secret {name} is going to be retrieved')
        super().run(name)
that is the implementation I ’ve done of
PrefectSecret
the idea is to change the name of the secret , depending on the value of a parameter that is called
env
Copy code
with Flow('flow_sample', schedule=None) as flow:
    env = Parameter('env', required=True)
    env_secret = IDMappingDBSecret(name='STAGE_ID_MAPPING_DB_URL', env=env)
    print_result()
this a flow sample I ’ve done to test this behavior
k

Kevin Kho

11/12/2021, 8:52 PM
I think this might be easier:
Copy code
class IDMappingDBSecret(PrefectSecret):
    environment: str

    def __init__(self, name=None, env=None, **kwargs):
        self.environment = env
        new_name = self.environment + name
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f'Secret {new_name} is going to be retrieved')
        super().__init__(new_name, **kwargs)
a

Anna Geller

11/12/2021, 8:54 PM
alternatively, you could get the parameter value from context perhaps?
Copy code
prefect.context["parameters"]
s

Santiago Gonzalez

11/12/2021, 8:55 PM
Oh, that could work. thanks, I will test it
I am getting this
"'GraphQLResult' object has no attribute 'name'"
with this code:
Copy code
class IDMappingDBSecret(PrefectSecret):
    environment: str

    def __init__(self, name=None, **kwargs):
        self.environment = prefect.config.get("environment")
        super().__init__(name=name, **kwargs)

    def run(self, name: str = None):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Environment is {self.environment}")
        name: str = name or self.secret_name
        if self.environment == "dev":
            name = f'{self.environment.upper}_{name}'
        <http://logger.info|logger.info>(f'Secret {name} is going to be retrieved')
        super().run(name=name)

@task
def print_result():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('Result is 6')


with Flow('flow_sample', schedule=None) as flow:
    env_secret = IDMappingDBSecret(name='STAGE_ID_MAPPING_DB_URL')
    print_result()
What I am doing wrong?
I ’ve decided to try
prefect.config.get("environment")
instead of create a new parameter. Is environment associated to the
label
provided in the UI (the one that specifies which agent is going to take it)?
k

Kevin Kho

11/12/2021, 10:07 PM
I think just do more in the init and it will be easier. No need to define the run. This worked on cloud for me:
Copy code
from prefect.tasks.secrets import PrefectSecret
import prefect
from prefect import Flow, task

class IDMappingDBSecret(PrefectSecret):

    def __init__(self, name=None, environment="dev", **kwargs):
        self.environment = environment
        if self.environment == "stage":
            _name = f'{self.environment.upper()}_{name}'
        else:
            _name = name
        super().__init__(name=_name, **kwargs)

@task
def print_result(secret):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(secret)
    return

with Flow('flow_sample', schedule=None) as flow:
    env_secret = IDMappingDBSecret('ID_MAPPING_DB_URL', environment="stage")
    print_result(env_secret)

flow.run()
upvote 1
a

Anna Geller

11/13/2021, 12:15 PM
@Santiago Gonzalez I think you can combine what Kevin suggested with init and your environment from config like so:
Copy code
from prefect.tasks.secrets import PrefectSecret
import prefect
from prefect import Flow, task


class EnvSecret(PrefectSecret):
    def __init__(self, name=None, **kwargs):
        self.environment = prefect.config.get("environment")
        if self.environment in ["stage", "dev"]:
            _name = f"{self.environment.upper()}_{name}"
        else:
            _name = name
        super().__init__(name=_name, **kwargs)


env_secret = EnvSecret("DB_NAME")


@task
def print_result(secret):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(secret)


with Flow("flow_sample") as flow:
    secret = env_secret(task_args={"name": "DB_NAME"})
    print_result(secret)

if __name__ == "__main__":
    flow.run()
Running with value “prod”:
Copy code
[2021-11-13 13:11:35+0100] INFO - prefect.TaskRunner | Task 'DB_NAME': Starting task run...
[2021-11-13 13:11:36+0100] INFO - prefect.TaskRunner | Task 'DB_NAME': Finished task run for task with final state: 'Success'
[2021-11-13 13:11:36+0100] INFO - prefect.TaskRunner | Task 'print_result': Starting task run...
[2021-11-13 13:11:36+0100] INFO - prefect.print_result | PROD_DB
Running with value “dev” in config.toml:
Copy code
[2021-11-13 13:11:47+0100] INFO - prefect.TaskRunner | Task 'DEV_DB_NAME': Starting task run...
[2021-11-13 13:11:47+0100] INFO - prefect.TaskRunner | Task 'DEV_DB_NAME': Finished task run for task with final state: 'Success'
[2021-11-13 13:11:47+0100] INFO - prefect.TaskRunner | Task 'print_result': Starting task run...
[2021-11-13 13:11:47+0100] INFO - prefect.print_result | DEV_DB
Is that what you intended to do?
s

Santiago Gonzalez

11/13/2021, 12:24 PM
Yes, it is very similar. The label configured in the agent, is the same as environment???
Thanks @Anna Geller
a

Anna Geller

11/13/2021, 12:27 PM
no, you would need to specifically set it in your config.toml. This would be a custom variable you set.
👍 1
s

Santiago Gonzalez

11/18/2021, 2:18 PM
Hey @Anna Geller I realized that this command
Copy code
os.environ.get('environment')
When I execute that command inside of a task, it gets the value configured in the agent. However, when I try to get the value (with the same command) inside of the
PrefectSecret
class, it has no value. What do you think could be a way to get the value in the prefect secret class? I leave you a piece of the flow code:
Copy code
class IDMappingDBSecret(PrefectSecret):
    environment: str

    def __init__(self, name=None, **kwargs):
        env = os.environ.get('environment')
        if env in ["dev"]:
            _name = f"{env.upper()}_{name}"
        else:
            _name = name
        super().__init__(name=_name, **kwargs)

@task
def print_result(secret):
    env = os.environ.get('environment')
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Environment {env}")
    <http://logger.info|logger.info>(f'Result is {secret}')

with Flow('flow_sample', schedule=None) as flow:
    secret = IDMappingDBSecret('DB_URL')
    print_result(secret)
Thanks in advance
k

Kevin Kho

11/18/2021, 3:02 PM
Looking at this again, I think pickle based storage might serialize the env variable. I guess you need to do
os.environ
in the
run
method. Not sure if that would work though. I think the easier approach is to use script-based storage so that the Flow is not serialized. See this and this
a

Anna Geller

11/18/2021, 3:06 PM
@Santiago Gonzalez there must have been some mix-up here - we were talking about retrieving this value from config.toml rather than from env variable? Can you have a look at the previous posts?
Copy code
elf.environment = prefect.config.get("environment")
s

Santiago Gonzalez

11/18/2021, 3:09 PM
I tried with the code that you provided and it didn’t work out. I will try in the run method then, or with the other solution.
a

Anna Geller

11/18/2021, 3:16 PM
I’m 100% sure that the method provided here will work with a local agent, as long as you set the variable in the config.toml: https://prefect-community.slack.com/archives/CL09KU1K7/p1636805732056300?thread_ts=1636749897.042500&amp;cid=CL09KU1K7 But feel free to experiment and see what works best for you
s

Santiago Gonzalez

11/18/2021, 4:55 PM
Oh. Ok, the thing is that I am setting the variable in the EC2 Agent like this:
Copy code
screen -dmS prefect-screen python3.7 -m prefect agent docker start --token {TOKEN}  --label dev --env environment=dev
a

Anna Geller

11/18/2021, 4:58 PM
sure, try this and make sure that this environment variable also exist in the environment from which you register your flow and this could work, too. btw, token is deprecated in favour of API keys, so it should be --key YOUR_KEY instead of --token
s

Santiago Gonzalez

11/18/2021, 5:00 PM
thanks, I will try then.
This is how I have config.tml
Copy code
environment = "dev"
[cloud]
   [cloud.agent]
   name = "agent"
   labels = ["dev"]

   # Set to `DEBUG` for verbose logging
   level = "INFO"
With this piece of code:
Copy code
class IDMappingDBSecret(PrefectSecret):
    environment: str

    def __init__(self, name=None, **kwargs):
        env = prefect.config.get("environment")
        if env in ["dev"]:
            _name = f"{env.upper()}_{name}"
        else:
            _name = name
        super().__init__(name=_name, **kwargs)

@task
def print_result(secret):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f'Result is {secret}')

with Flow('flow_sample', schedule=None) as flow:
    secret = IDMappingDBSecret('DB_URL')
    print_result(secret)
And yet, it is not working as expected. Is something wrong there?
k

Kevin Kho

11/18/2021, 6:34 PM
What is the issue you run into?
s

Santiago Gonzalez

11/18/2021, 8:07 PM
That the value is not being retrieved using
prefect.config.get('environment')
k

Kevin Kho

11/18/2021, 8:09 PM
Will try this in a bit
a

Anna Geller

11/18/2021, 8:14 PM
can you try this?
Copy code
from prefect.tasks.secrets import PrefectSecret
import prefect
from prefect import Flow, task


class IDMappingDBSecret(PrefectSecret):
    def __init__(self, name=None, **kwargs):
        env = prefect.config.get("environment")
        if env in ["dev"]:
            _name = f"{env.upper()}_{name}"
        else:
            _name = name
        super().__init__(name=_name, **kwargs)


env_secret = IDMappingDBSecret("DB_NAME")

@task
def print_result(secret):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>('Result is %s', secret)


with Flow('flow_sample') as flow:
    secret = env_secret(task_args={"name": "DB_NAME"})
    print_result(secret)

if __name__ == "__main__":
    flow.run()
@Santiago Gonzalez to give you some explanation: initializing the task and then calling it within the Flow block with a custom task name passed via task_args ensures that your task name remains the same in the flow structure, regardless of whether your secret name is DEV_DB or PROD_DB. As long as this
environment = "dev"
is set in the config.toml, it should work with the DEV_DB secret. You would need to set it in the environment from which you start the agent
s

Santiago Gonzalez

11/19/2021, 6:29 PM
@Anna Geller this is the thing: I am running the flow inside a EC2 Docker Agent (not a LocalAgent), and in this context, the environment variable is not being getting. I ’ve been trying to get this working , setting the variable in config.toml (as an argument in agent docker start as well) but it is not working, still getting
None
value as a result when I do
prefect.config.get('environment')
.
I figured it out. I got it working using
os.environ['environment']
in the run method. thanks
4 Views