#!/usr/bin/env python3 import sys #import pymysql.cursors #pymysql.install_as_MySQLdb #import sqlalchemy.pool as pool from sqlalchemy import create_engine import traceback #mypool=None #engine = create_engine('sqlite:///:memory:', echo=True) engine=None class SqlClass: """Class used how interface to sqlalchemy for connect to mysql engine Attributes: cursors_connect (pymysql.cursors.DictCursor): Cursor dict connection to database disable_pool (boolean): If True then not exists mysql pool, if False, use sql pool for better connections. pymsql_install (boolean): If True, pymysql is used how mysqldb classic python module. pool_size (int): The size of the mysql pool. """ cursors_connect=None disable_pool=False pymysql_install=False pool_size=15 def __init__(self, connection): """ Args: connection (dict): A dict with the configurations of SqlClass connection Attributes: error_connection (str): A string where errors are saved connection (dict): A dict with the configurations of SqlClass connection conn (MySQL Connection): A PyMySQL or Mysqldb connection connected (bool): Simple bool for check if was connected to mysql pool_recycle (int): Time limite for recycle the pool by inactivity """ #self.max_overflow=-1 self.error_connection="" # Data of connection self.connection=connection # Sql connection self.conn=None self.connected=False self.pool_recycle=3600 self.last_query='' self.connect() def connect(self): """Method for connect to mysql db using pymysql or mysqldb """ global engine if not SqlClass.disable_pool: if not engine: try: if self.connection['db_type']=='pymysql': import pymysql.cursors pymysql.install_as_MySQLdb SqlClass.pymysql_install=True SqlClass.cursors_connect=pymysql.cursors.DictCursor else: import MySQLdb.cursors SqlClass.cursors_connect=MySQLdb.cursors.DictCursor engine=create_engine("mysql+%s://%s:%s@%s/%s?charset=utf8mb4" % (self.connection['db_type'], self.connection['user'], self.connection['password'], self.connection['host'], self.connection['db']), pool_recycle=self.pool_recycle, echo_pool=True, pool_size=self.pool_size, pool_pre_ping=True) #Postgre #engine = create_engine("postgresql+psycopg2://scott:tiger@localhost:5432/mydatabase") except: e = sys.exc_info()[0] v = sys.exc_info()[1] self.error_connection="Error in connection: %s %s" % (e, v) #self.conn.close() raise NameError(self.error_connection) self.conn=engine.raw_connection() #self.conn.ping(True) else: if self.connection['db_type']=='pymysql': import pymysql.cursors if not SqlClass.pymysql_install: pymysql.install_as_MySQLdb SqlClass.pymysql_install=True """ connection = pymysql.connect(host='localhost', user='user', password='passwd', database='db', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) """ self.conn=pymysql.connect(host=self.connection['host'], user=self.connection['user'], passwd=self.connection['password'], db=self.connection['db'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) else: import MySQLdb.cursors self.conn=MySQLdb.connect(host=self.connection['host'], user=self.connection['user'], passwd=self.connection['password'], db=self.connection['db'], charset='utf8mb4', cursorclass=MySQLdb.cursors.DictCursor) pass """ if self.conn==None: try: def getconn(): return pymysql.connect(self.connection['host'], user=self.connection['user'], passwd=self.connection['password'], db=self.connection['db'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor) if mypool==None: mypool=pool.QueuePool(getconn, max_overflow=self.max_overflow, pool_size=self.pool_size, recycle=self.pool_recycle, use_threadlocal=True) self.conn=mypool.connect() self.conn.ping(True) self.connected=True except: e = sys.exc_info()[0] v = sys.exc_info()[1] self.error_connection="Error in connection: %s %s" % (e, v) self.conn.close() raise NameError(self.error_connection) """ #Make def query more simple if not debugging. def query(self, sql_query, arguments=[], name_connection="default"): """Method for send a sql query to mysql server Args: sql_query (str): The sql sentence to execute. For data you should use %s character. arguments (list): The data used in sql sentence. This data substitute the %s characters. name_connection (str): The name of dict element with the configuration of connection, without used in this moment. """ cursor=self.conn.cursor(SqlClass.cursors_connect) try: cursor.execute(sql_query, arguments) self.conn.commit() #if hasattr(cursor, '_executed'): # self.last_query=cursor._executed return cursor except: e = sys.exc_info()[0] v = sys.exc_info()[1] #if hasattr(cursor, '_executed'): # self.last_query=cursor._executed self.error_connection="Error in query ||%s||Values: %s" % (self.last_query, str(arguments)) self.conn.close() #return False raise NameError(self.error_connection) #self.connect() #if fetch_type=="ASSOC": #fetch_type=MySQLdb.cursors.DictCursor #with self.conn.cursor(MySQLdb.cursors.DictCursor) as cursor: """ cursor=self.conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute(sql_query, arguments) self.conn.commit() return cursor except: e = sys.exc_info()[0] v = sys.exc_info()[1] if hasattr(cursor, '_last_executed'): sql_query=cursor._last_executed #, traceback.format_exc() self.error_connection="Error in query ||%s||Values: %s" % (sql_query, str(arguments)) #return False raise NameError(self.error_connection) """ #Fetcho row return dictionary if is defined in query. #def fetch(self, cursor): #return cursor.fetchone() def __del__(self): """Typical method used when class is deleted from memory. Close the connextion if exists. """ if self.conn: self.conn.close() def close(self, name_connection="default"): """Method used for close the connection if you want close connection and open other. """ if self.conn: self.conn.close() self.conn=None pass