Hi guys! I am facing an issue with prefect. I have...
# ask-community
a
Hi guys! I am facing an issue with prefect. I have defined some utility functions to be used in prefect flows:
Copy code
@task(log_prints=True)
def create_connection(secret, db_name):
    ''' Create SQL DB connection '''

    db = SqlAlchemyConnector(
        connection_info=ConnectionComponents(
            driver=SyncDriver.MYSQL_MYSQLCONNECTOR,
            host=secret['host'],
            username=secret['username'],
            password=secret['password'],
            port=secret['port'],
            database=db_name

        )
    )

    engine = db.get_engine()
    conn = engine.raw_connection()
    cursor = conn.cursor()

    return db, conn, cursor

@task(log_prints=True)
def fetch_sql(cursor, query, value_list=())->any:
    ''' Function to execute SQL queries '''
    
    cursor.execute(query, value_list)

    try:
        if cursor.rowcount >0:
            res=cursor.fetchall()
        else:
            res=None
        return res
    except Exception as e:
        return None

@task
def get_id(cursor, job_name):

    rst = fetch_sql(cursor, f"SELECT id from table where job_name = {job_name};")

    return rst[0][0]
The code where I am using it:
Copy code
@flow()
def main_flow():
    secret = get_secret()
    db, con, cursor = create_connection(secret, db_name)
    job_id = get_id(cursor, "SAMPLE_JOB_001")
This flow works only upto create_connection after that it throws InterfaceError :
Crash detected! Execution was interrupted by an unexpected exception: InterfaceError: No result set to fetch from
The above code works only when I use .fn() while calling tasks . utility function change:
Copy code
@task
def get_id(cursor, job_name):

    rst = fetch_sql.fn(cursor, f"SELECT id from table where job_name = {job_name};")

    return rst[0][0]
flow code change:
Copy code
@flow()
def main_flow():
    secret = get_secret()
    db, con, cursor = create_connection(secret, db_name)
    job_id = get_id.fn(cursor, "SAMPLE_JOB_001")
This works and doesn't give any errors but we can't see get_run_id or fetch_sql in the logs or in the graph. Should I change the design or make any changes so that the tasks also run and we can see them in the logs/flow grpah. Please advise