Added pymysql how dependency, not mysqldb

This commit is contained in:
Antonio de la Rosa 2016-10-11 21:47:12 +02:00
parent 144e3ba264
commit da55e6674b
3 changed files with 110 additions and 100 deletions

View file

@ -1,99 +0,0 @@
#!/usr/bin/env python3
import sys
import pymysql
pymysql.install_as_MySQLdb()
class SqlClass:
connection={}
connection_method=''
def __init__(self):
self.error_connection=""
self.connected=False
SqlClass.connection_method=self.connect_to_db_sql
def dummy_connect(self, connection, name_connection="default"):
pass
def connect_to_db(self, connection, name_connection="default"):
SqlClass.connection_method(connection, name_connection)
SqlClass.connection_method=self.dummy_connect
def connect_to_db_sql(self, connection, name_connection="default"):
try:
self.connection[name_connection] = pymysql.connect(connection['host'],
user=connection['user'],
passwd=connection['password'],
db=connection['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
#return False
raise NameError(self.error_connection)
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
#if fetch_type=="ASSOC":
#fetch_type=pymysql.cursors.DictCursor
with self.connection[name_connection].cursor(pymysql.cursors.DictCursor) as cursor:
try:
cursor.execute(sql_query, arguments)
self.connection[name_connection].commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
self.error_connection="Error in query ||"+sql_query+"||: %s %s" % (e, v)
#return False
raise NameError(self.error_connection)
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
"""
def __del__(self):
for key in self.connection:
self.close(self.connection)
"""
"""
def close(self, name_connection="default"):
if self.connection[name_connection]:
self.connection[name_connection].close()
self.connection[name_connection]=False
pass
"""

View file

@ -0,0 +1,109 @@
#!/usr/bin/env python3
import sys
import pymysql.cursors
pymysql.install_as_MySQLdb
import sqlalchemy.pool as pool
import traceback
class SqlClass:
mypool=None
def __init__(self):
self.max_overflow=45
self.pool_size=30
self.error_connection=""
self.connection={}
self.connected=False
self.connection_method=self.connect_to_db_sql
def dummy_connect(self, connection, name_connection="default"):
pass
def connect_to_db(self, connection, name_connection="default"):
self.connection_method(connection, name_connection)
self.connection_method=self.dummy_connect
def connect_to_db_sql(self, connection, name_connection="default"):
try:
def getconn():
return pymysql.connect(connection['host'],
user=connection['user'],
passwd=connection['password'],
db=connection['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
if SqlClass.mypool==None:
SqlClass.mypool=pool.QueuePool(getconn, max_overflow=self.max_overflow, pool_size=self.pool_size)
self.connection[name_connection]=SqlClass.mypool.connect()
self.connection[name_connection].ping(True)
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
#return False
raise NameError(self.error_connection)
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
#if fetch_type=="ASSOC":
#fetch_type=pymysql.cursors.DictCursor
#with self.connection[name_connection].cursor(pymysql.cursors.DictCursor) as cursor:
cursor=self.connection[name_connection].cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql_query, arguments)
self.connection[name_connection].commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
#, traceback.format_exc()
self.error_connection="Error in query ||%s||Values: %s" % (sql_query, str(arguments))
#return False
raise NameError(self.error_connection)
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
"""
def __del__(self):
self.close()
"""
def close(self, name_connection="default"):
if self.connection[name_connection]:
self.connection[name_connection].close()
#self.connection[name_connection]=False
pass

View file

@ -5,7 +5,7 @@ import re
import uuid import uuid
from importlib import import_module, reload from importlib import import_module, reload
from collections import OrderedDict from collections import OrderedDict
from paramecio.cromosoma.databases.mysqldb import SqlClass from paramecio.cromosoma.databases.pymysql import SqlClass
from paramecio.cromosoma.coreforms import BaseForm, HiddenForm from paramecio.cromosoma.coreforms import BaseForm, HiddenForm
import traceback import traceback