Chris Gunderson
12/29/2022, 3:57 PMNate
12/29/2022, 4:05 PMNone.cursor()
, so it seems that however you're generating your connection object is where the issue lies
if you could share your code for generating your connection object it might make it more clear how to resolve thisChris Gunderson
12/29/2022, 4:08 PMclass MySQLController(object):
"""
Python Class for connecting with MySQL servers and querying,updating, or deleting using MySQL
"""
__instance = None
__session = None
__connection = None
__secret_name = None
__config = None
__logger = None
def __init__(self, secret_name):
warnings.simplefilter(action='ignore', category=UserWarning)
if not secret_name:
raise Exception(
'\n Please select a Secret Name from Prefect Block!')
else:
self.__secret_name = secret_name
self.__auth()
# End def __init__
def __auth(self):
aws_credentials_block = AwsCredentials.load("scheduler-user")
aws_secret = read_secret(self.__secret_name.value, aws_credentials_block)
auth = json.loads(aws_secret)
if 'username' in auth.keys():
auth['user'] = auth['username']
del auth['username']
if 'port' in auth.keys():
auth['port'] = int(auth['port'])
if 'dbname' in auth.keys():
auth['database'] = auth['dbname']
del auth['dbname']
if 'engine' in auth.keys():
del auth['engine']
if 'engin' in auth.keys():
del auth['engin']
auth['autocommit'] = 'true'
auth['local_infile'] = 'true'
self.__config = auth
# End def __auth
def __open(self):
try:
if self.__connection:
self.__connection.ping(reconnect=True)
self.__session = self.__connection.cursor()
else:
cnx = Connect(**self.__config)
self.__connection = cnx
self.__session = cnx.cursor()
except MySQLError as e:
print("Error %d: %s" % (e.args[0], e.args[1]))
# End def __open
def __close(self):
try:
if self.__connection:
if self.__connection.open:
self.__connection.close()
else:
print('No connection to close.')
except MySQLError as e:
print("Error %d: %s" % (e.args[0], e.args[1]))
def query(self, select="*", db=None, table=None, where=None, *args, **kwargs):
strSELECT = "SELECT "
strWHERE = " "
lstVals = []
if not type(select) == list:
for val in select.split(","):
lstVals.append(val.strip(" "))
select = lstVals
strSELECT += ", ".join(select)
strFROM = f" FROM {db}.{table} "
if where:
if not type(where) == list:
where = [where]
for constraint in tuple(where):
strWHERE += f" AND {constraint}"
query = f"""
{strSELECT}
{strFROM}
WHERE 1=1
{strWHERE}
"""
self.__open()
if kwargs:
result = pd.read_sql_query(query, self.__connection, **kwargs)
else:
result = pd.read_sql_query(query, self.__connection)
self.__close()
return result
class sraController(MySQLController):
def __init__(self):
""" Creates a controller to connect to sra database
Args:
secret_name (Optional[str]): use DEV_**** or PROD_***** for corresponding db
Parameters:
secret_name can be 'DEV_****' or 'PROD_****'
Returns:
Controller to database
"""
self.__secret_name = String.load("sra-database")
super().__init__(self.__secret_name)
Nate
12/29/2022, 4:34 PMMySQLController.__open
but I'm not sure what that Connect
object that you're using there is actually doing, so I wouldn't be able to say anything for sure
I would suggest that it may make sense to use the functionality already built into the sqlalchemy collection, as you could use the DatabaseCredentials
and a MySQL driver to make queries / generate connection objects in a potentially simpler mannerChris Gunderson
12/29/2022, 4:35 PM