Abhishek Mitra
02/06/2024, 9:06 AM@task(log_prints=True)
def create_connection(secret, db_name):
''' Create SQL DB connection '''
db = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.MYSQL_MYSQLCONNECTOR,
host=secret['host'],
username=secret['username'],
password=secret['password'],
port=secret['port'],
database=db_name
)
)
engine = db.get_engine()
conn = engine.raw_connection()
cursor = conn.cursor()
return db, conn, cursor
@task(log_prints=True)
def fetch_sql(cursor, query, value_list=())->any:
''' Function to execute SQL queries '''
cursor.execute(query, value_list)
try:
if cursor.rowcount >0:
res=cursor.fetchall()
else:
res=None
return res
except Exception as e:
return None
@task
def get_id(cursor, job_name):
rst = fetch_sql(cursor, f"SELECT id from table where job_name = {job_name};")
return rst[0][0]
The code where I am using it:
@flow()
def main_flow():
secret = get_secret()
db, con, cursor = create_connection(secret, db_name)
job_id = get_id(cursor, "SAMPLE_JOB_001")
This flow works only upto create_connection after that it throws InterfaceError :
Crash detected! Execution was interrupted by an unexpected exception: InterfaceError: No result set to fetch from
The above code works only when I use .fn() while calling tasks
.
utility function change:
@task
def get_id(cursor, job_name):
rst = fetch_sql.fn(cursor, f"SELECT id from table where job_name = {job_name};")
return rst[0][0]
flow code change:
@flow()
def main_flow():
secret = get_secret()
db, con, cursor = create_connection(secret, db_name)
job_id = get_id.fn(cursor, "SAMPLE_JOB_001")
This works and doesn't give any errors but we can't see get_run_id or fetch_sql in the logs or in the graph. Should I change the design or make any changes so that the tasks also run and we can see them in the logs/flow grpah. Please advise