paramecio2fm/paramecio2/libraries/db/webmodel.py
2022-04-19 01:41:22 +02:00

1433 lines
44 KiB
Python

#!/usr/bin/env python3
import sys
import re
import uuid
from importlib import import_module, reload
from collections import OrderedDict
from paramecio2.libraries.db.sqlalchemy import SqlClass
from paramecio2.libraries.db.coreforms import BaseForm, HiddenForm
import copy
import traceback
class PhangoField:
"""Base class for fields used in WebModel classes
PhangoField is a class with all elements and variables that you can imagine for a database mysql field in a table. You have many types similar to mysql field types.
"""
def __init__(self, name, size=255, required=False):
"""
Args:
name (str): The name of the field
size (int): The size of sql field.
required (bool): If the field is required or not.
Attributes:
name (str): The name of the field
label (str): A label or generic name for use in text labels used for representate the field
required (bool): If the field is required or not.
size (int): The size of sql field.
protected (bool): If the field can be updated or not in WebModel update method.
quote_close (str): In old versions was used for get more protection for sql sentences
error (bool): If error in query, set to True.
txt_error (str): Variable where the basic text error is saved
model (str): The model where this component or field live
indexed (bool): Property used for set this field how indexed in the database table.
unique (bool): Property used for set this field how unique value in the database table.
foreignkey (bool): Simple property for make more easy identify foreignkeyfields.
default_value (str): Property that define the default value for this field
update (bool): Property that define if this field is in an update operation or insert operation
check_blank (bool): Property used for check if this value cannot change if is in blank and is filled
name_form(BaseForm): Define the form, when is created forms with create_forms you can change the properties of this class
escape (bool): Property that define if make escape in show_formatted. This property control the html transformation of <>" characters in html entities.If false, convert.
file_related (bool): File related: if the field have a file related, delete the file
extra_parameters (list): Extra parameters for the form related with this field
t (PTemplate): Template manager for the form if needed
error_default (str): Error text by default
show_formatted_value (bool): Show this value formatted
help (str): Value used for help strings in tooltips in forms
"""
# The name of the field in database table
self.name=name
# The label for the Field
self.label=name.replace('_', ' ').title()
# If field is required, self.required is True
self.required=required
# The size of field in database
self.size=size
# Protected, if this value != False, cannot use it in insert or update.
self.protected=False
# $quote_open is used if you need a more flexible sql sentence,
# @warning USE THIS FUNCTION IF YOU KNOW WHAT YOU ARE DOING
self.quot_open='\''
# $quote_close is used if you need a more flexible sql sentence,
# @warning USE THIS PROPERTY IF YOU KNOW WHAT YOU ARE DOING
self.quot_close='\''
# Variable where the basic text error is saved
self.error=None
self.txt_error=""
# The model where this component or field live
self.model=None
# Property used for set this field how indexed in the database table.
self.indexed=False
# Property used for set this field how unique value in the database table.
self.unique=False
# Simple property for make more easy identify foreignkeyfields.
self.foreignkey=False
# Property that define the default value for this field
self.default_value=""
# Property that define if this field is in an update operation or insert operation
self.update=False
# Property used for check if this value cannot change if is in blank and is filled
self.check_blank=False
# Define the form, when is created forms with create_forms you can change the properties of this class
self.name_form=BaseForm
# Property that define if make escape in show_formatted. This property control the html transformation of <>" characters in html entities.If false, convert.
self.escape=False
# File related: if the field have a file related, delete the file
self.file_related=False
# Extra parameters for the form
self.extra_parameters=[]
# Template manager for the form if needed
self.t=None
# Error by default
self.error_default='Error: field required'
# Show this value formatted
self.show_formatted_value=False
# Value used for help strings in tooltips in forms
self.help=''
def get_type_sql(self):
"""This method is used for describe the new field in a sql language format."""
return 'VARCHAR('+str(self.size)+') NOT NULL DEFAULT "'+self.default_value+'"'
def show_formatted(self, value):
"""Method for format the value to show in html or others text outputs"""
return value
def check(self, value):
"""Method for check if value is valid for this type field"""
self.error=False
self.txt_error=''
value=str(value).strip()
#Minimal escape for prevent basic js injection.
if self.escape==False:
value=value.replace('<', '&lt;')
value=value.replace('>', '&gt;')
value=value.replace('"', '&quot;')
#value=WebModel.escape_sql(value)
if value=="":
self.txt_error=self.error_default
self.error=True
return value
def set_relationships(self):
pass
def create_form(self):
"""Create a BaseForm object for use in forms functions and methods"""
#self.name, self.default_value,
final_parameters=copy.copy(self.extra_parameters)
final_parameters.insert(0, self.name)
final_parameters.insert(1, self.default_value)
form=self.name_form(*final_parameters)
form.default_value=self.default_value
form.required=self.required
form.label=self.label
form.field=self
form.help=self.help
return form
def change_form(self, new_form, parameters):
"""Change the base form of the field and its parameters"""
self.name_form=new_form
self.extra_parameters=parameters
def post_register(self):
pass
class PrimaryKeyField(PhangoField):
"""Primary key field based in PhangoField.
This field is used for create a typical id field in a mariadb/mysql table
"""
def __init__(self, name, size=11, required=False):
super(PrimaryKeyField, self).__init__(name, size, required)
self.protected=True
self.name_form=HiddenForm
self.required=False
self.error_default="The value is zero"
def check(self, value):
self.error=None
self.txt_error=self.error_default
if value=='':
value='0'
try:
value=str(int(value))
except:
value=0
if value==0:
self.txt_error=self.error_default
self.error=True
return value
def get_type_sql(self):
return 'INT NOT NULL PRIMARY KEY AUTO_INCREMENT'
# The most important class for the framework
#
# Webmodel is a class for create objects that represent models. This models are a mirage of SQL tables. You can create fields, add indexes, foreign keys, and more.
#
#
class WebModel:
"""The most important class for the framework
Webmodel is a class for create objects that represent models. This models are a mirage of SQL tables. You can create fields, add indexes, foreign keys, and more.
Attributes:
arr_sql_index (dict):
"""
__slots__=('sqlclass', 'fields', 'forms')
#Globals class variables for internal tasks
arr_sql_index={}
arr_sql_set_index={}
arr_sql_unique={}
arr_sql_set_unique={}
last_query=""
connection_pool=[]
first_primary_key=PrimaryKeyField('id')
#A dictionary for add models here
model=OrderedDict()
connections={'default': {'host': 'localhost', 'user': 'user', 'password': '', 'db': 'default', 'charset': 'utf8', 'set_connection': False, 'db_type': 'pymysql'} }
connection_id="default"
webmodel=True
global_cached=False
#sqlclass=SqlClass()
#make_connection=sqlclass.connect_to_db
@staticmethod
def connection():
return SqlClass(WebModel.connections['default'])
# Init the class
def __init__(self, sqlclass=None, name_field_id="id"):
self.cached=WebModel.global_cached
self.cached_runquery=WebModel.global_cached
self.type_cache='file'
#The name of the table
self.name=type(self).__name__.lower()
self.label=self.name
self.label_general=self.name
self.name_field_id=name_field_id
#Fields of the table, inserted with register method
self.fields=OrderedDict()
# Errors of fields of the table, for safe thread reasons.
self.fields_error=OrderedDict()
#The tables related with foreignkeyfield to this table
self.related=[]
#A dictionary where forms of this model are saved
self.forms=OrderedDict()
self.cache_method=''
# A dictionary with the errors in fields.
self.fields_errors={}
self.errors={}
self.num_errors=0
self.query_error=""
self.values_query=[]
self.conditions=["WHERE 1=1", []]
self.order_by="ORDER BY `"+self.name+"`.`id` ASC"
self.limit=""
self.related_models_deleted=[]
self.required_save={}
#Create id field
primary_key=WebModel.first_primary_key
primary_key.name=self.name_field_id
self.register(primary_key)
#self.register(PrimaryKeyField(self.name_field_id))
#self.model[name]=self
self.yes_reset_conditions=True
#self.create_fields()
self.updated=False
self.valid_fields=[]
self.last_id=0
self.distinct=''
# A simple dictionary where post values are saved for use of fields classes
self.post={}
# A simple dictionary that save the fields that have files related. If i delete the row in database i need delete the files related
self.files_delete={}
self.sqlclass=sqlclass
self.fields_to_clean=[]
self.create_fields()
# property for use show_formatted property if needed
self.show_formatted=False
# property for def if the model have enctype
self.enctype=False
self.dummy=0
# A method for add the connection
def conn(self, sqlclass):
self.sqlclass=sqlclass
# Reset conditions
self.yes_reset_conditions=True
self.conditions=["WHERE 1=1", []]
self.order_by="ORDER BY `"+self.name+"`.`id` ASC"
self.limit=""
# A method for change the name of table
def change_name(self, name):
self.name=name
self.order_by="ORDER BY `"+self.name+"`.`id` ASC"
# A method where create the new fields of this model
def create_fields(self):
#print([i for i in dir(self.__class__) if i[:1] != '_'])
#print(dir(self))
pass
# A method for register the fields
def register(self, field_model, required=False):
#self.fields_required[field_model]=field_model.required
self.fields[field_model.name]=field_model
self.fields[field_model.name].model=self
if required:
self.fields[field_model.name].required=required
self.fields[field_model.name].post_register()
#self.files_delete[field_model.name]=field_model.file_related
# A method for create the id field.
def create_id_field(self, field_name="id"):
pass
# A method for connect to database
def connect_to_db(self):
#if WebModel.make_connection(self.connections[self.connection_id])==False:
#raise NameError(sqlclass.error_connection)
#self.connection_pool.append(True)
#if self.sqlclass.connect_to_db(self.connections[self.connection_id])==False:
# raise NameError(sqlclass.error_connection)
#WebModel.make_connection=sqlclass.dummy_connect
pass
def dummy_connect(self, connection):
return True
# Method for make queries
def query(self, str_query, args=[], connection_id='default'):
self.connect_to_db()
return self.sqlclass.query(str_query, args, connection_id)
# Method for clean fields
def clean_fields(self):
clean=self.fields_to_clean
for field in self.fields_to_clean:
del self.fields[field]
# Insert method, for insert a row in database.using a dictionary
# External agent define if the update is in code or from external source, how a form.
def insert(self, dict_values, external_agent=True):
self.clean_fields()
# Connect to db
self.post=dict_values
#self.connect_to_db()
self.query_error=''
#self.fields[self.name_field_id].required=False
try:
arr_return=self.check_all_fields(dict_values, external_agent)
if arr_return:
fields, values, update_values=arr_return
else:
return False
except:
self.query_error='Cannot insert the new row '+sys.exc_info()[0]
#print(sys.exc_info()[0])
return False
c=len(values)
arr_str=['%s' for x in range(c)]
sql="insert into `"+self.name+"` (`"+"`, `".join(fields)+"`) VALUES ("+", ".join(arr_str)+")"
cursor=self.query(sql, values, self.connection_id)
if cursor.rowcount>0:
self.last_id=cursor.lastrowid
cursor.close()
# Delete cache for this table.
return True
else:
self.query_error='Cannot insert the new row'
cursor.close()
return False
# Update method. For update one or many rows.
def update(self, dict_values, external_agent=True):
self.clean_fields()
self.post=dict_values
# Connect to db
#self.fields[self.name_field_id].required=False
if self.name_field_id in dict_values:
del dict_values[self.name_field_id]
#self.connect_to_db()
self.query_error=''
#try:
self.updated=True
try:
arr_return=self.check_all_fields(dict_values, external_agent, True, 'update')
if arr_return:
fields, values, update_values=arr_return
else:
return False
except:
self.query_error+="\n"+traceback.format_exc()
#print(traceback.format_exc())
return False
sql="update `"+self.name+"` SET "+", ".join(update_values)+" "+self.conditions[0]
cursor=self.query(sql, values+self.conditions[1], self.connection_id)
if self.yes_reset_conditions:
self.reset_conditions()
cursor.close()
return True
"""
if cursor.rowcount>0:
if self.yes_reset_conditions:
self.reset_conditions()
return True
else:
self.query_error='Cannot update the row'
return False
"""
"""
except:
#self.query_error=sqlclass.error_connection
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in query: %s %s" % (e, v)
return False
"""
def reset_conditions(self):
self.conditions=["WHERE 1=1", []]
self.limit=''
# A method for select fields from a table in db. Support for foreignkeys.
#Type assoc can be assoc for return dictionaries
def select(self, arr_select=[], raw_query=False):
self.clean_fields()
# Connect to db
#self.connect_to_db()
conditions=self.conditions
final_fields=[]
extra_fields=[]
self.query_error=''
#First table selecction
tables_to_select=['`'+self.name+'`']
keys=list(self.fields.keys())
if len(arr_select)==0:
arr_select=keys
# Array intersect for obtain the valid fields
#fields = list(set(keys) & set(arr_select))
fields=[ select for select in arr_select if select in keys ]
"""
for select in arr_select:
if select in keys:
fields.append(select)
"""
#Creating the fields
arr_repeat_field={}
new_fields=OrderedDict()
for field in fields:
#Check if foreignkeyfield
if type(self.fields[field]).__name__=="ForeignKeyField" and raw_query==False:
if self.fields[field].table_name in arr_repeat_field:
arr_repeat_field[self.fields[field].table_name]+=1
else:
arr_repeat_field[self.fields[field].table_name]=0
table_name=self.fields[field].table_name+'` as `'+self.fields[field].table_name+str(arr_repeat_field[self.fields[field].table_name])
final_table_name=self.fields[field].table_name+str(arr_repeat_field[self.fields[field].table_name])
# The name with its alias of this related table model
tables_to_select.append('`'+table_name+'`')
# Add field from related table
# as "+table_name+"_"+self.fields[field].named_field
extra_fields.append("`"+final_table_name+"`.`"+self.fields[field].named_field+"` as "+field)
# Add a condition to sql query for join the two tables.
conditions[0]+=" AND `"+final_table_name+"`.`"+self.fields[field].identifier_field+"`=`"+self.name+"`.`"+field+"`"
# Add extra fields from related table from select_fields ForeignKeyField class member
select_fields=self.fields[field].select_fields
for extra_field in select_fields:
self.fields[field+'_'+extra_field]=self.fields[field].related_model.fields[extra_field]
self.fields_to_clean.append(field+'_'+extra_field)
# Check if extra_field is ForeignKeyField, if yes, call this function recursively.
extra_fields.append("`"+final_table_name+"`.`"+extra_field+"` as `"+field+"_"+extra_field+"`")
else:
# Add normal field to sql query
final_fields.append("`"+self.name+"`.`"+field+"`")
#if len(new_fields)>0:
#self.fields.update(new_fields)
extra_sql_field=""
if len(extra_fields)>0:
extra_sql_field=", "+", ".join(extra_fields)
if len(final_fields)==0:
self.query_error="Error: without fields to search"
return False
sql= ("select "+" "+self.distinct+", ".join(final_fields)+extra_sql_field+" from "+", ".join(tables_to_select)+' '+conditions[0]+' '+self.order_by+' '+self.limit).strip()
self.last_query=sql
if self.yes_reset_conditions:
self.reset_conditions()
cursor=self.query(sql, conditions[1], self.connection_id)
if not cursor:
self.query_error=self.sqlclass.error_connection
return False
else:
return cursor
# Show results in a dictionary
def fetch(self, cursor):
return cursor.fetchone()
def insert_id(self):
return self.last_id
def element_exists(self, id):
self.conditions=['WHERE `'+self.name_field_id+'`=%s', [id]]
count=self.select_count(self.name_field_id)
if self.yes_reset_conditions:
self.reset_conditions()
if count>0:
return True
return False
def select_a_field(self, field):
pass
def select_a_row(self, id, fields_selected=[], raw_query=0):
self.conditions=['WHERE `'+self.name+'`.`'+self.name_field_id+'`=%s', [id]]
self.limit="limit 1"
with self.select(fields_selected, raw_query) as cursor:
self.reset_conditions()
row=cursor.fetchone()
if row==None:
row=False
else:
if self.show_formatted:
#for k, col in row.items():
row[k]={k:self.fields[k].show_formatted(col) for k,col in row.items()}
return row
def select_a_row_where(self, fields_selected=[], raw_query=0, begin=0):
self.limit="limit "+str(begin)+", 1"
with self.select(fields_selected, raw_query) as cursor:
row=cursor.fetchone()
if row==None:
row=False
else:
if self.show_formatted:
for k, col in row.items():
row[k]=self.fields[k].show_formatted(col)
return row
def select_to_array(self, fields_selected=[], raw_query=0):
if len(fields_selected)==0:
fields_selected=list(self.fields.keys())
if (self.name_field_id not in fields_selected):
fields_selected.append(self.name_field_id)
def del_row_id(row):
try:
index_id=row.index(self.name_field_id)
del row[index_id]
except:
pass
else:
def del_row_id(row):
pass
results=[] #OrderedDict()
with self.select(fields_selected, raw_query) as cursor:
for row in cursor:
if self.show_formatted and row:
for k, col in row.items():
if self.fields[k].show_formatted_value:
row[k]=self.fields[k].show_formatted(col)
results.append(row)
del_row_id(results)
return results
def select_to_dict(self, fields_selected=[], raw_query=0, integer=True):
if integer:
def check_index(index):
return index
else:
def check_index(index):
return str(index)
if len(fields_selected)==0:
fields_selected=list(self.fields.keys())
if (self.name_field_id not in fields_selected):
fields_selected.append(self.name_field_id)
def del_row_id(row):
try:
index_id=row.index(self.name_field_id)
del row[index_id]
except:
pass
else:
def del_row_id(row):
pass
results=OrderedDict()
with self.select(fields_selected, raw_query) as cursor:
for row in cursor:
if self.show_formatted and row:
for k, col in row.items():
row[k]=self.fields[k].show_formatted(col)
results[check_index(row[self.name_field_id])]=row
del_row_id(results)
return results
# A method por count num rows affected for sql conditions
def select_count(self, field_to_count='id', raw_query=True):
# Connect to db
self.connect_to_db()
conditions=self.conditions
#First table selecction
tables_to_select=['`'+self.name+'`']
fields=list(self.fields.keys())
#Creating the fields
for field in fields:
#Check if foreignkeyfield
if type(self.fields[field]).__name__=="ForeignKeyField" and raw_query==False:
table_name=self.fields[field].table_name
tables_to_select.append('`'+table_name+'`')
# Add a condition to sql query for join the two tables.
conditions[0]+=" AND `"+table_name+"`.`"+self.fields[field].identifier_field+"`=`"+self.name+"`.`"+field+"`"
sql= "select count(`"+field_to_count+"`) from "+", ".join(tables_to_select)+' '+conditions[0]
count=0
with self.query(sql, conditions[1], self.connection_id) as cursor:
count=list(cursor.fetchone().values())[0]
if self.yes_reset_conditions:
self.reset_conditions()
return count
#+' ORDER BY '+self.order_by+' '+self.limit).strip()
# A method for delete rows using sql conditions
def delete(self):
#self.connect_to_db()
#Need delete rows from other related tables save in self.related_models_deleted
sql=("delete from `"+self.name+"` "+self.conditions[0]+' '+self.order_by+' '+self.limit).strip()
result=self.query(sql, self.conditions[1], self.connection_id)
if self.yes_reset_conditions:
self.reset_conditions()
if result.rowcount>0:
result.close()
return True
else:
result.close()
return False
def set_conditions(self, sql_text, values:list) -> object:
self.conditions=[sql_text, values]
return self
@staticmethod
def check_in_list(in_list):
for x in range(0, len(in_list)):
try:
in_list[x]=str(int(in_list[x]))
except:
in_list[x]='0'
return '('+', '.join(in_list)+')'
def check_in_list_str(self, field, in_list):
for x in range(0, len(in_list)):
in_list[x]=str(self.fields[field].check(in_list[x]))
return '("'+'", "'.join(in_list)+'")'
def set_order(self, order:dict) -> object:
arr_order=[]
arr_order.append('ASC')
arr_order.append('DESC')
final_order=[]
for o,v in order.items():
if o in self.fields:
final_order.append(o+' '+arr_order[v])
self.order_by='order by '+", ".join(final_order)
return self
def set_limit(self, limit: tuple) -> None:
limit[0]=int(limit[0])
sql_limit=str(limit[0])
if len(limit)>1:
sql_limit+=', '+str(limit[1])
self.limit='limit '+sql_limit
return self
# Method for create sql tables
def create_table(self):
#self.connect_to_db()
self.arr_sql_index[self.name]={}
self.arr_sql_set_index[self.name]={}
self.arr_sql_unique[self.name]={}
self.arr_sql_set_unique[self.name]={}
#foreach($this->components as $field => $data)
table_fields=[]
#Create id field
#Not neccesary
#table_fields.append('`'+self.name_field_id+"` INT NOT NULL PRIMARY KEY AUTO_INCREMENT")
fields=self.fields
for field, data in fields.items():
table_fields.append('`'+field+'` '+data.get_type_sql())
#Check if indexed
if fields[field].indexed==True:
self.arr_sql_index[self.name][field]='CREATE INDEX `index_'+self.name+'_'+field+'` ON '+self.name+'(`'+field+'`);'
self.arr_sql_set_index[self.name][field]=""
#Check if unique
if fields[field].unique==True:
self.arr_sql_unique[self.name][field]='ALTER TABLE `'+self.name+'` ADD UNIQUE (`'+field+'`)'
self.arr_sql_set_unique[self.name][field]=""
if type(fields[field]).__name__=="ForeignKeyField":
self.arr_sql_index[self.name][field]='CREATE INDEX `index_'+self.name+'_'+field+'` ON '+self.name+'(`'+field+'`);'
table_related=fields[field].table_name
id_table_related=fields[field].table_id
self.arr_sql_set_index[self.name][field]='ALTER TABLE `'+self.name+'` ADD CONSTRAINT `'+field+'_'+self.name+'IDX` FOREIGN KEY ( `'+field+'` ) REFERENCES `'+table_related+'` (`'+id_table_related+'`) ON DELETE CASCADE ON UPDATE CASCADE;'
return "create table `"+self.name+"` (\n"+",\n".join(table_fields)+"\n) DEFAULT CHARSET=utf8;";
def update_table(self, fields_to_add, fields_to_modify, fields_to_add_index, fields_to_add_constraint, fields_to_add_unique, fields_to_delete_index, fields_to_delete_unique, fields_to_delete_constraint, fields_to_delete):
# First delete fields
for field in fields_to_delete_index:
print("---Deleting index from "+field+" in "+self.name)
self.query('DROP INDEX `index_'+self.name+'_'+field+'` ON '+self.name, [], self.connection_id)
for field in fields_to_delete_unique:
print("---Deleting unique from "+field+" in "+self.name)
self.query('DROP INDEX `'+field+'` ON '+self.name, [], self.connection_id)
for field in fields_to_delete_constraint:
print("---Deleting foreignkey from "+field+" in "+self.name)
self.query('ALTER TABLE `'+self.name+'` DROP FOREIGN KEY '+field+'_'+self.name+'IDX', [], self.connection_id)
for field in fields_to_delete:
print("---Deleting "+field+" from "+self.name)
self.query('ALTER TABLE `'+self.name+'` DROP `'+field+'`', [], self.connection_id)
#Deleting indexes and constraints.
#Obtain new fields
for field in fields_to_modify:
print("---Updating "+field+" in "+self.name)
self.query('ALTER TABLE `'+self.name+'` MODIFY `'+field+'` '+self.fields[field].get_type_sql(), [], self.connection_id)
for field in fields_to_add:
print("---Adding "+field+" in "+self.name)
self.query('ALTER TABLE `'+self.name+'` ADD `'+field+'` '+self.fields[field].get_type_sql(), [], self.connection_id)
for field in fields_to_add_index:
print("---Adding index to "+field+" in "+self.name)
self.query('CREATE INDEX `index_'+self.name+'_'+field+'` ON '+self.name+' (`'+field+'`);', [], self.connection_id)
for field in fields_to_add_constraint:
print("---Adding foreign key to "+field+" in "+self.name)
table_related=self.fields[field].table_name
id_table_related=self.fields[field].table_id
self.query('ALTER TABLE `'+self.name+'` ADD CONSTRAINT `'+field+'_'+self.name+'IDX` FOREIGN KEY ( `'+field+'` ) REFERENCES `'+table_related+'` (`'+id_table_related+'`) ON DELETE CASCADE ON UPDATE CASCADE;', [], self.connection_id)
for field in fields_to_add_unique:
print("---Adding unique to "+field+" in "+self.name)
self.query('ALTER TABLE `'+self.name+'` ADD UNIQUE (`'+field+'`)', [], self.connection_id)
# Method for drop sql tables and related
def drop(self):
return self.query('DROP TABLE '+self.name, [], self.connection_id)
#Return an array with all fields
def all_fields():
pass
#Check of all fields in table.
def check_all_fields(self, dict_values, external_agent, yes_update=False, errors_set="insert"):
"""Method for check all fields of a model for insert or update a row in table db.
Args:
dict_values (dict): The dict of values to check
external_agent (bool): If True, the query is considered manipulated by external agent and the checks are stricts, if not, checks are not stricts
yes_update (bool): If True, the check need be done for update sql sentence, if False, the check is done for insert sql sentence
errors_set (str): If insert value, the errors are set for insert sql statement, if update value, then the errors are set for update sql statement.
"""
fields=[]
values=[]
update_values=[]
self.errors[errors_set]=[]
self.num_errors=0
#A dictionary that define if update property is added
updated_field={}
updated_field['insert']=0
updated_field['update']=1
error=False
if yes_update==True:
f_update=lambda field, value: "`"+field+"`=%s"
else:
f_update=lambda field, value: ""
# I can optimize this later
for k, v in self.fields.items():
#List where the errors are saved
self.fields_errors[k]=[]
if k in dict_values:
# If fields is protected, but external_agent =0, then insert
# If fields is not protected always insert if not error checking
value=dict_values[k]
# Cleaning the error
self.fields[k].error=False
if (self.fields[k].protected==None or self.fields[k].protected==False or external_agent==False):
if k in self.valid_fields:
self.fields[k].update=updated_field[errors_set]
value=self.fields[k].check(value)
if self.fields[k].check_blank==False or self.updated==False:
# If error checking, value=False
if self.fields[k].error==True and self.fields[k].required==True:
#Error, need this fields.
self.num_errors+=1
if self.fields[k].txt_error=='':
self.fields_errors[k].append("Error: "+v.label+" field required")
else:
self.fields_errors[k].append(self.fields[k].txt_error)
error=True
else:
fields.append(k)
#final_value=self.fields[k].quot_open+value+self.fields[k].quot_close
#final_value=self.fields[k].quot_open+value+self.fields[k].quot_close
values.append(value)
update_values.append(f_update(k, value))
else:
self.num_errors+=1
self.fields_errors[k].append("Error: "+self.fields[k].label+" is not in valid fields")
self.fields[k].error=True
self.fields[k].txt_error="Error: "+self.fields[k].label+" is not in valid fields"
error=True
else:
self.num_errors+=1
self.fields_errors[k].append("Error: "+self.fields[k].label+" is protected field")
self.fields[k].error=True
self.fields[k].txt_error="Error: "+self.fields[k].label+" is protected field"
error=True
elif v.required==True:
self.num_errors+=1
self.fields_errors[k].append("Error: "+v.label+" field required")
error=True
elif v.required==False and k!=self.name_field_id and not yes_update:
fields.append(k)
values.append(self.fields[k].default_value)
update_values.append(f_update(k, self.fields[k].default_value))
if len(fields)==0:
self.num_errors+=1
self.errors[errors_set].append("Error: no elements to insert in table")
error=True
if error==True:
self.num_errors+=1
self.errors[errors_set].append("Error: error checking the values of the table")
return False
return (fields, values, update_values)
#Reset the require field in fields
def reset_require(self):
for k, v in self.fields.items():
self.required_save[k]=self.fields[k].required
self.fields[k].required=False
#Reload the require field in fields
def reload_require(self):
for k,r in self.fields.items():
self.fields[k].required=r
#Choose all fields to updated
def set_valid_fields(self, fields={}):
if len(fields)==0:
fields=self.fields.keys()
self.valid_fields=fields
#Create a form based in table.
def create_forms(self, arr_fields=[]):
self.forms=OrderedDict()
if len(arr_fields)==0:
arr_fields=list(self.fields.keys())
if self.name_field_id in arr_fields:
del arr_fields[arr_fields.index(self.name_field_id)]
#for name_field, field in self.fields.items():
for name_field in arr_fields:
self.valid_fields.append(name_field)
self.forms[name_field]=self.fields[name_field].create_form()
return arr_fields
def create_form_after(self, form_after, new_form):
new_dict=OrderedDict()
for name_form, form in self.forms.items():
new_dict[name_form]=form
if name_form==form_after:
new_dict[new_form.name]=new_form
self.forms=new_dict
def show_errors(self):
arr_error=[]
error_txt=''
for k_error in self.fields_errors.values():
for error in k_error:
arr_error.append(error)
for type_error in self.errors.values():
for error in type_error:
arr_error.append(error)
arr_error.append(self.query_error)
error_txt="\n".join(arr_error)
return error_txt
def collect_errors(self):
arr_error= {}
error_txt=''
for field_error, k_error in self.fields_errors.items():
for error in k_error:
arr_error[field_error]=error
"""
for type_error in self.errors.values():
for error in type_error:
arr_error[field_error]=error
"""
return arr_error
def safe_query(self):
self.create_forms()
self.reset_require()
def close(self):
self.sqlclass.close()
#connection_to_delete=[]
#WebModel.make_connection=self.sqlclass.connect_to_db
#for key in self.sqlclass.connection:
#self.sqlclass.close(key)
#connection_to_delete.append(key)
#self.sqlclass.connection={}
#for key in connection_to_delete:
#del sqlclass.connection[key]
@staticmethod
def escape_sql(value):
value=str(value)
return value.replace("'","\\'").strip()
"""
def __del__(self):
self.close()
"""
# Set post values from a post array
def set_post_values(self, post):
for k in self.fields.keys():
post[k]=post.get(k, '')
return post
class QueryModel(WebModel):
def __init__(self, model_name, sqlclass=None, name_field_id="id"):
super().__init__(sqlclass, name_field_id)
self.name=model_name.lower()
self.label=self.name
self.label_general=self.name
self.order_by="ORDER BY `"+self.name+"`.`id` ASC"