Mehdi Nazari
08/02/2021, 5:12 PMTask<name>
instance instead.Kevin Kho
Kevin Kho
Secret("...").get()
Mehdi Nazari
08/02/2021, 5:14 PMdef get_engine(server: str, database: str, user: str) -> sqlalchemy.engine:
secret = PrefectSecret(user)
#
<http://logger.info|logger.info>(f'Creating engine for: {server}, {database}, {user}, {secret}')
_engine = sqlalchemy.create_engine('<mssql+pyodbc://%s:%s@%s/%s?driver=%s>' % (
user, parse.quote_plus(secret), server, database, 'ODBC+Driver+17+for+SQL+Server'))
#
return _engine
Mehdi Nazari
08/02/2021, 5:15 PM_eng = get_engine(server='server',
database='db',
user='user')
load_to_mssql_table(df=data, engine=_eng, table_name='prefect_test')
Kevin Kho
PrefectSecret
, I think you can do PrefectSecret(user).run()
because the secret is just a special task and you need to call the run method to use a task inside another task (or function)Mehdi Nazari
08/02/2021, 5:20 PMValueError: Secrets should only be retrieved during a Flow run, not while building a Flow.
Jacob Hayes
08/02/2021, 5:21 PM_eng
to load_to_mssql_table
since the sqlalchemy.Engine
cannot be pickled (which will happen when the flow is serialized, depending on the storage, and also when run)Mehdi Nazari
08/02/2021, 5:23 PMKevin Kho
get_engine
a task? Task inputs and outputs have to be serializeable by cloudpickle.Jacob Hayes
08/02/2021, 5:32 PM[nav] In [1]: from sqlalchemy import create_engine
[ins] In [2]: import pickle
[nav] In [3]: engine = create_engine("sqlite:///memory")
[ins] In [4]: pickle.dumps(engine)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-4-3d64b05c2285> in <module>
----> 1 pickle.dumps(engine)
TypeError: cannot pickle '_thread._local' object
one quick/hacky way to get around that first secret error is to set prefect.context.config.cloud.use_local_secrets=False
but I think for both these reasons, it's preferred to not access these at flow definition (in non-tasks) and rather defer it somehow. I think the way we've worked around it before was by wrapping the "normal" functions (eg: in a @decorator
) w/ one that creates the engine. the wrapping function task takes the secret w/ the user/pass/etc, creates the engine, then passes it on to the underlying functionMehdi Nazari
08/02/2021, 5:37 PMKevin Kho
def create_engine(creds):
engine = connection(creds)
return engine
@task
def query(..., creds):
engine = create_engine(creds)
result = query_with_engine(engine)
return result
with Flow("...") as flow:
creds = PrefectSecret("user")
result = query(..., creds)
In this case the engine creation is deferred to when you have the Secret during runtime.Mehdi Nazari
08/02/2021, 6:35 PMKevin Kho
try
statement I think and then pass it to the func
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return x