Hey All, Can prefect’s secret retrieval tasks be d...
# ask-community
m
Hey All, Can prefect’s secret retrieval tasks be done in a flow definition? I have a flow during which I’d want to retrieve a secret from Prefect Cloud and pass the value to a resident function (not decorated with task). I keep getting a
Task<name>
instance instead.
k
Hey @Mehdi Nazari, can you show how you’re using it?
You might be able to do
Secret("...").get()
m
Copy code
def get_engine(server: str, database: str, user: str) -> sqlalchemy.engine:
    secret = PrefectSecret(user)
    #
    <http://logger.info|logger.info>(f'Creating engine for: {server}, {database}, {user}, {secret}')
    _engine = sqlalchemy.create_engine('<mssql+pyodbc://%s:%s@%s/%s?driver=%s>' % (
        user, parse.quote_plus(secret), server, database, 'ODBC+Driver+17+for+SQL+Server'))
    #
    return _engine
This is my intention @Kevin Kho, then in flow definition:
Copy code
_eng = get_engine(server='server',
                               database='db',
                               user='user')
    load_to_mssql_table(df=data, engine=_eng, table_name='prefect_test')
k
If you’re using
PrefectSecret
, I think you can do
PrefectSecret(user).run()
because the secret is just a special task and you need to call the run method to use a task inside another task (or function)
m
There is a catch tho! when registering the flow, there is an error popping up saying that Secret retrieval must only happen at the execution time, which make sense from the security standpoint.
ValueError: Secrets should only be retrieved during a Flow run, not while building a Flow.
j
If you did work around ^, I think you may also have challenges passing
_eng
to
load_to_mssql_table
since the
sqlalchemy.Engine
cannot be pickled (which will happen when the flow is serialized, depending on the storage, and also when run)
👍 2
m
Thanks @Jacob Hayes, have you been able to test that?
k
Jacob is right in that you can’t return an engine from a task. Is
get_engine
a task? Task inputs and outputs have to be serializeable by cloudpickle.
👍 1
j
Not fully, just seen similar before and confirmed the engine can't be pickled Kevin confirmed 🙂
Copy code
[nav] In [1]: from sqlalchemy import create_engine

[ins] In [2]: import pickle

[nav] In [3]: engine = create_engine("sqlite:///memory")

[ins] In [4]: pickle.dumps(engine)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-3d64b05c2285> in <module>
----> 1 pickle.dumps(engine)

TypeError: cannot pickle '_thread._local' object
one quick/hacky way to get around that first secret error is to set
prefect.context.config.cloud.use_local_secrets=False
but I think for both these reasons, it's preferred to not access these at flow definition (in non-tasks) and rather defer it somehow. I think the way we've worked around it before was by wrapping the "normal" functions (eg: in a
@decorator
) w/ one that creates the engine. the wrapping function task takes the secret w/ the user/pass/etc, creates the engine, then passes it on to the underlying function
👍 1
m
Sounds like an alternative. Do you happen to have an example I can take a look at?
k
So the recommended approach is to create the engines/connections inside the task instead of passing it. With regards to the Secret, you can just take it inside the task that needs it and create the connection there, like what Jacob is suggesting (from my understanding). In pseudo code:
Copy code
def create_engine(creds):
    engine = connection(creds)
    return engine

@task
def query(..., creds):
    engine = create_engine(creds)
    result = query_with_engine(engine)
    return result

with Flow("...") as flow:
     creds = PrefectSecret("user")
     result = query(..., creds)
In this case the engine creation is deferred to when you have the Secret during runtime.
🙏 1
m
Thank you @Kevin Kho, this is basic approach. I’d be more interested in what @Jacob Hayes suggested; having a decorator which will essentially wrap the task, creates the engine and call the task with the engine. this way we wouldn’t have repeating engine creation code everywhere database engine is needed! Did I get it right @Jacob Hayes?
k
Ah I missed that. So I have code here to subclass task and make your own decorator. Insert the database connection inside the
try
statement I think and then pass it to the
func
Copy code
def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)

@custom_task
def abc(x):
    return x
👍 1
🙏 1