Jason Prado
06/03/2021, 7:33 PMPostgresFetch
.
db_host = PrefectSecret('db_host')
fetch = PostgresFetch(
host=db_host,
...,
)
I can’t do this because UserWarning: A Task was passed as an argument to PostgresFetch, you likely want to first initialize PostgresFetch with any static (non-Task)...
. Does this mean I need to either put the DB host in my code or get it from another method like an environment variable? Why is host
part of the static args to PostgresFetch
if I’m likely to extract it from a Secret?Kevin Kho
run
method.Jason Prado
06/03/2021, 7:36 PMKevin Kho
#You can modify the task from the task library like this:
class PostgresExecute(Task):
"""
Task for executing a query against a Postgres database.
Args:
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""
def __init__(
self,
**kwargs
):
super().__init__(**kwargs)
def run(
self,
db_name: str = None,
user: str = None,
host: str = None,
port: int = None,
query: str = None,
data: tuple = None,
commit: bool = False,
password: str = None,
):
"""
Task run method. Executes a query against Postgres database.
Args:
- query (str, optional): query to execute against database
- data (tuple, optional): values to use in query, must be specified using
placeholder is query string
- commit (bool, optional): set to True to commit transaction, defaults to false
- password (str): password used to authenticate; should be provided from a `Secret` task
Returns:
- None
Raises:
- ValueError: if query parameter is None or a blank string
- DatabaseError: if exception occurs when executing the query
"""
if not query:
raise ValueError("A query string must be provided")
# connect to database, open cursor
# allow psycopg2 to pass through any exceptions raised
conn = pg.connect(
dbname=db_name,
user=user,
password=password,
host=host,
port=port,
)
# try to execute query
# context manager automatically rolls back failed transactions
try:
with conn, conn.cursor() as cursor:
executed = cursor.execute(query=query, vars=data)
if commit:
conn.commit()
else:
conn.rollback()
return executed
# ensure connection is closed
finally:
conn.close()
Kevin Kho
Jason Prado
06/03/2021, 7:38 PMKevin Kho
class PostgresCustom(PostgresFetch):
def run(self, **kwargs):
self.hostname = Secret('hostname').get()
# repeat for other secrets
super(PostgresCustom, self).run()
Kevin Kho