Added new files for databases

This commit is contained in:
Antonio de la Rosa 2025-01-06 01:18:08 +01:00
parent b5cb79eb04
commit 9dd09d3234
53 changed files with 7170 additions and 11 deletions

View file

@ -0,0 +1,782 @@
import time
from datetime import date, datetime, tzinfo
import arrow
# from babel.dates import format_date, format_datetime, format_time, get_timezone, UTC
try:
from settings import config
except:
config={}
#from paramecio.libraries.sessions import get_session
from os import environ
"""Simple hook for timedate functions from Arrow datetime module. Maybe in the future use native python datetime functions or other libraries. Is simply an abstraction for not depend of particular library.
"""
sql_format_time='YYYYMMDDHHmmss'
"""str: variable for define basic string for format dates
By default, datetime module use YYYYMMDDHHmmss string for define dates and time. Tipically is used for sql operations in paramecio2 framework.
"""
format_date_txt="YYYY/MM/DD"
"""str: variable for define basic formatted date string
"""
format_time_txt="HH:mm:ss"
"""str: variable for define basic formatted time string
"""
timezone='Europe/Madrid'
"""str: basic timezone for dates, by default, Europe/Madrid
"""
"""If default are changed in settings/config, change variables
"""
if hasattr(config, 'format_date'):
format_date_txt=config.format_date
if hasattr(config, 'format_time'):
format_time_txt=config.format_time
if hasattr(config, 'timezone'):
timezone=config.timezone
def set_timezone():
"""Simple function for change the timezone in general environment of python
"""
environ['TZ']=environ.get('TZ', timezone)
if environ['TZ']!=timezone:
environ['TZ']=timezone
time.tzset()
"""
def set_timezone_session():
s=get_session()
timezone_local=timezone
if s!=None:
if 'timezone' in s:
timezone_local=s['timezone']
#timezone_local=s.get('timezone', timezone)
environ['TZ']=environ.get('TZ', timezone_local)
if environ['TZ']!=timezone_local:
environ['TZ']=timezone_local
time.tzset()
#request.environ['TIMEZONE'] = request.environ['PATH_INFO'].rstrip('/')
"""
def format_timedata(time):
"""Function for get separated year, month, day, hour, minute and second from sql_format_time string
Args:
time (str): A YYYYMMDDHHmmss string for get datetime components from there.
Returns:
list: A dict with datetime components (year, month, day, hour, minute, second).
"""
year=0
month=0
day=0
hour=0
minute=0
second=0
ampm=''
try:
year=int(time[:4])
month=int(time[4:6])
day=int(time[6:8])
hour=int(time[8:10])
minute=int(time[10:12])
second=int(time[12:14])
ampm=int(time[14:16])
except:
pass
if ampm=='PM' or ampm=='pm':
if hour>0:
hour+=12
return (year, month, day, hour, minute, second)
def checkdatetime(y, m, d, h, mi, s):
"""Check if a series of datetime separated elements are correct, the datetime values are type int
Args:
y (int): Year of datetime
m (int): month
d (int): day
h (int): hour
mi (int): minute
s (int): seconds
Returns:
bool: If values are correct, return True, otherwise return False
"""
try:
#test=datetime.strptime(str(y)+'-'+str(m)+'-'+str(d)+' '+str(h)+'-'+str(mi)+'-'+str(s), '%Y-%m-%d %H-%M-%S')
test=arrow.arrow.Arrow(y, m, d, h, mi, s)
return True
except:
return False
# Get the localtime
def now(utc=False, tz=''):
"""Returns the actual datetime in YYYYMMDDHHmmss format.
Args:
utc (bool): If True, the datetime is returned in UTC timezone
tz (str, optional): Timezone name, example: Europe/Madrid. If set the datetime is returned in the timezone selected
Returns:
str: Return actual datetime
"""
if not utc:
if tz=='':
actual=arrow.now().format(sql_format_time)
else:
#actual=arrow.to(tz).now().format(sql_format_time)
utc=arrow.utcnow()
actual=utc.to(tz).format(sql_format_time)
else:
actual=arrow.utcnow().format(sql_format_time)
return actual
def today(utc=False,tz=''):
"""Returns the actual date in YYYYMMDDHHmmss format.
Is different from (now) function because return the date to 00:00:00 time
Args:
utc (bool): If True, the date is returned in UTC timezone
tz (str, optional): Timezone name, example: Europe/Madrid. If set the date is returned in the timezone selected
Returns:
str: Return actual date with 00:00:00 how time
"""
return now(utc, tz)[:8]+'000000'
# Get actual timestamp
def obtain_timestamp(timeform):
"""Get the timestamp from datetime in YYYYMMDDHHmmss format.
Args:
timeform (str): Datetime in YYYYMMDDHHmmss format.
Returns:
int: datetime in timestamp format
"""
y, m, d, h, mi, s=format_timedata(timeform)
if checkdatetime(y, m, d, h, mi, s):
#timestamp=int(time.mktime((y, m, d, h, mi, s, 0, 0, -1)))
timestamp=arrow.arrow.Arrow(y, m, d, h, mi, s).timestamp()
return timestamp
#return mktime($h, $mi, $s, $m, $d, $y);
else:
return False
# timestamp is gmt time, convert in normal time
def timestamp_to_datetime(timestamp, sql_format_time=sql_format_time):
"""Turn datetime in YYYYMMDDHHmmss format.
Args:
timestamp (int): The timestamp for convert
Returns:
str: Datetime in YYYYMMDDHHmmss format
"""
return arrow.get(timestamp).format(sql_format_time)
# Get a utc timestamp and convert to local
def timestamp_to_datetime_local(timestamp, tz='', sql_format_time=sql_format_time):
"""Get a utc timestamp and convert to timezone datetime in YYYYMMDDHHmmss format.
Args:
timestamp (int): The timestamp for convert in datetime
tz (str, optional): If you want convert to other timezone, set it.
Returns:
str: Datetime in YYYYMMDDHHmmss format in selected timezone datetime
"""
t=arrow.get(timestamp)
if tz=='':
tz=environ['TZ']
return t.to(tz).format(sql_format_time)
def format_datetime(format_time, timeform, func_utc_return):
"""Get a datetime in YYYYMMDDHHmmss format and convert in other str datetime (normally, same YYYYMMDDHHmmss format). Is a primitive function for other high level datetime functions.
Args:
format_time (str): The strtime string used for format the datetime
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
func_utc_return (function): A function used for get the datetime.
Returns:
If timestamp is False, return False, if timestamp is valid, return the datetime formatted
"""
timestamp=obtain_timestamp(timeform)
if timestamp:
t=func_utc_return(timestamp)
return t.format(format_time)
else:
return False
# This method parse local time to gmt
def local_to_gmt(timeform, sql_format_time=sql_format_time):
"""Get a datetime in YYYYMMDDHHmmss format and convert in other str datetime. Is a primitive function for other high level datetime functions.
Expects that timeform was in time not gmt and convert to gmt
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
sql_format_time (str, optional): by default, the format is YYYYMMDDHHmmss, you can put other formatted str formats for date, here.
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted
"""
return format_datetime(sql_format_time, timeform, substract_utc)
# time.localtime is useless, you need sum the time offset to the date
def gmt_to_local(timeform, sql_format_time=sql_format_time):
"""Get a datetime in YYYYMMDDHHmmss format in UTC and convert in other str datetime. Is a primitive function for other high level datetime functions.
Expects that timeform was in time gmt and convert to localtime
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
sql_format_time (str, optional): by default, the format is YYYYMMDDHHmmss, you can put other formatted str formats for date, here.
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted
"""
return format_datetime(sql_format_time, timeform, sum_utc)
def format_time(timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in HH:mm:ss UTC format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted in UTC
"""
return format_datetime(format_time_txt, timeform, sum_utc)
def format_date(timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in YYYY/MM/DD UTC format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted in UTC
"""
return format_datetime(format_date_txt, timeform, sum_utc)
def format_fulldate(timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in YYYY/MM/DD HH:mm:ss UTC format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted in UTC
"""
return format_datetime(format_date_txt+' '+format_time_txt, timeform, sum_utc)
def format_local_time(timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in HH:mm:ss format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted
"""
return format_datetime(format_time_txt, timeform, no_utc)
def format_local_date(timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in YYYY/MM/DD format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted
"""
return format_datetime(format_date_txt, timeform, no_utc)
def format_local_fulldate(timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in YYYY/MM/DD HH:mm:ss format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted
"""
return format_datetime(format_date_txt+' '+format_time_txt, timeform, no_utc)
def format_strtime(strtime, timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in strtime string UTC format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted in UTC
"""
return format_datetime(strtime, timeform, sum_utc)
def format_local_strtime(strtime, timeform):
"""Get a datetime in YYYYMMDDHHmmss format and convert in strtime string format. Is a primitive function for other high level datetime functions.
Args:
timeform (str): datetime in YYYYMMDDHHmmss format to convert to new format
Returns:
If timeform is False, return False, if timeform is valid, return the datetime formatted
"""
return format_datetime(strtime, timeform, no_utc)
#Input is utc timestamp, return local arrow object
def sum_utc(timestamp, tz=''):
"""Get timestamp in UTC and convert in arrow date object with timezone datetime
Args:
timestamp (int): The timestamp for convert in other timezone
tz (str): Timezone of timestamp
Returns:
Return arrow object with new timezone selected
"""
#offset=time.altzone
#return time.localtime(timestamp-offset)
t=arrow.get(timestamp)
if tz=='':
tz=environ['TZ']
return t.to(tz)
#Input is local timestamp, return utc arrow object
def substract_utc(timestamp, tz=''):
"""Get local timestamp and convert in arrow date object with UTC datetime
Args:
timestamp (int): The timestamp for convert in UTC timezone
tz (str): Timezone of timestamp
Returns:
Return arrow object with UTC timezone selected
"""
#offset=time.altzone
#return time.localtime(timestamp+offset)
#t=arrow.get(timestamp).to('UTC')
timeform=timestamp_to_datetime(timestamp)
y, m, d, h, mi, s=format_timedata(timeform)
if tz=='':
tz=environ['TZ']
t=arrow.get(datetime(y, m, d, h, mi, s), tz).to('UTC')
return t
def no_utc(timestamp):
"""Return an arrow object based in timestamp value
Args:
timestamp (int): The timestamp for convert in UTC timezone
Returns:
Return arrow object based in timestamp value
"""
return arrow.get(timestamp)
class TimeClass:
"""Simple abstraction of arrow class, in future i can change arrow class by others
Args:
timestamp (int, str, optional): You can set the initial arrow object with timestamp date or YYYYMMDDHHmmss date format
tz (str): Timezone
Attributes:
utc (bool): If True, the default timezone is UTC, if False, timezone is system default
format_time (str): The default datetime format, YYYYMMDDHHmmss
format_time_txt (str): Time text format, usually HH:mm:ss
format_date_txt (str): Date text format, usually YYYY/MM/DD
format_date_full (str): Full DateTime text format, usually YYYY/MM/DD HH:mm:ss
tz (str): Default timezone for arrow object
"""
def __init__(self, timestamp=None, tz=''):
self.utc=False
self.format_time=sql_format_time
self.format_time_txt=format_time_txt
self.format_date_txt=format_date_txt
self.format_date_full=format_date_txt+' '+format_time_txt
self.tz=environ.get('TZ', 'utc')
if tz:
self.tz=tz
if type(timestamp).__name__=='int':
self.datetime=timestamp_to_datetime(timestamp)
else:
if not timestamp:
self.datetime=now(self.utc, tz)
else:
self.datetime=timestamp
y, m, d, h, mi, s=format_timedata(self.datetime)
self.t=arrow.get(datetime(y, m, d, h, mi, s), self.tz)
def add_month(self, num_months):
"""Method for add months to datetime
Args:
num_months (int): Number of months to add
Returns:
New added datetime
"""
m=self.t.shift(months=+num_months)
return m.format(self.format_time)
def substract_month(self, num_months):
"""Method for substract months to datetime
Args:
num_months (int): Number of months to substract
Returns:
New substracted datetime
"""
m=self.t.shift(months=-num_months)
return m.format(self.format_time)
def add_day(self, num_days):
"""Method for add days to datetime
Args:
num_days (int): Number of days to add
Returns:
New added datetime
"""
m=self.t.shift(days=+num_days)
return m.format(self.format_time)
def substract_day(self, num_days):
"""Method for substract days to datetime
Args:
num_days (int): Number of days to substract
Returns:
New substracted datetime
"""
m=self.t.shift(days=-num_days)
return m.format(self.format_time)
def add_year(self, num_years):
"""Method for add years to datetime
Args:
num_years (int): Number of years to add
Returns:
New added datetime
"""
m=self.t.shift(years=+num_years)
return m.format(self.format_time)
def substract_year(self, num_years):
"""Method for substract years to datetime
Args:
num_years (int): Number of years to substract
Returns:
New substracted datetime
"""
m=self.t.shift(years=-num_years)
return m.format(self.format_time)
def add_hour(self, num_hours):
"""Method for add hours to datetime
Args:
num_hours (int): Number of hours to add
Returns:
New added datetime
"""
m=self.t.shift(hours=+num_hours)
return m.format(self.format_time)
def substract_hour(self, num_hours):
"""Method for substract hours to datetime
Args:
num_hours (int): Number of hours to substract
Returns:
New substracted datetime
"""
m=self.t.shift(hours=-num_hours)
return m.format(self.format_time)
def format(self):
"""Method for get datetime formatted using format_date_full attribute
Returns:
Datetime formatted with format_date_full attribute
"""
return self.t.format(self.format_date_full)
def local_to_utc(self):
"""Method for convert datetime from actual timezone to UTC"""
self.t=self.t.to('utc')
# Only use
def utc_to_local(self):
"""Method for convert datetime from actual timezone from UTC to actual timezone"""
self.t=self.t.to(self.tz)
def local_to_tz(self, tz):
"""Method for convert actual timezone to other timezone"""
self.t=self.t.to(tz)
def now(self, utc=False):
"""Method for get actual datetime.
Args:
utc (bool): If True, then get actual datetime in UTC datetime, if False, get actual datetime in selected timezone in tz attribute
Returns:
Actual datetime formatted in YYYYMMDDHHmmss format.
"""
if not utc:
actual=arrow.now(self.tz).format(sql_format_time)
else:
actual=arrow.utcnow().format(sql_format_time)
return actual
def today(self, utc=False):
"""Method for get today datetime. Get now datetime with 00:00:00 time.
Args:
utc (bool): If True, then get actual datetime in UTC datetime, if False, get actual datetime in selected timezone in tz attribute
Returns:
Actual datetime formatted in YYYYMMDD000000 format.
"""
if utc:
return arrow.utcnow()[:8]+'000000'
else:
return arrow.now(self.tz)[:8]+'000000'
def timestamp_to_datetime(self, timestamp):
"""Method for convert a timestamp in YYYYMMDDHHmmss format.
Args:
timestamp (int): datetime in timestamp format.
Returns:
Datetime in YYYYMMDDHHmmss format.
"""
return arrow.get(timestamp).format(sql_format_time)
def obtain_timestamp(self, timeform):
"""Method for get timestamp from a datetime in YYYYMMDDHHmmss format.
Args:
timeform (str): Datetime in YYYYMMDDHHmmss format.
Returns:
Datetime in YYYYMMDDHHmmss format.If timeform is incorrect, return False
"""
y, m, d, h, mi, s=format_timedata(timeform)
if checkdatetime(y, m, d, h, mi, s):
timestamp=arrow.arrow.Arrow(y, m, d, h, mi, s).timestamp
return timestamp
else:
return False
def format_strtime(self, strtime, timeform):
"""Method for get datetime formatted in strtime format
Args:
strtime (str): The string used for format the datetime
timeform (str): Datetime in YYYYMMDDHHmmss format.
Returns:
Datetime in strtime format.If timeform is incorrect, return False
"""
try:
y, m, d, h, mi, s=format_timedata(timeform)
return arrow.get(datetime(y, m, d, h, mi, s), self.tz).format(strtime)
except:
return False

4
parameciofast/libraries/db/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
*~
*.pyc
__pycache__

View file

@ -0,0 +1 @@
# A very simple ORM for Python 3

View file

View file

@ -0,0 +1,420 @@
"""
Parameciofm is a series of wrappers for Bottle.py, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.webmodel import PhangoField
from parameciofast.libraries.db import coreforms
from parameciofast.libraries.i18n import I18n
#from bs4 import BeautifulSoup
import bleach
class IntegerField(PhangoField):
"""Class that figure an integer sql type field.
"""
def __init__(self, name, size=11, required=False):
"""
Args:
name (str): The name of field
size (int): The size of the new field in database. By default 11.
required (bool): Boolean for define if field is required or not
"""
super(IntegerField, self).__init__(name, size, required)
self.default_value=0
self.type_sql='int({})'.format(self.size)
def check(self, value):
"""Method for check if value is integer
Args:
value (int): The value to check
"""
self.error=False
self.txt_error=''
try:
value=str(int(value))
if value=="0" and self.required==True:
self.txt_error="The value is zero"
self.error=True
except:
value="0"
self.txt_error="The value is zero"
self.error=True
return value
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'INT('+str(self.size)+') NOT NULL DEFAULT "0"'
class BigIntegerField(IntegerField):
"""Class that figure an big integer sql type field.
Only change the sql type with respect to IntegerField
"""
def __init__(self, name, size=11, required=False):
super().__init__(name, size, required)
self.type_sql='bigint({})'.format(self.size)
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'BIGINT('+str(self.size)+') NOT NULL DEFAULT "0"'
class FloatField(PhangoField):
"""Class that figure an float sql type field.
Args:
name (str): The name of new field
size (int): The size of the new field in database. By default 11.
required (bool): Boolean for define if
"""
def __init__(self, name, size=11, required=False):
super(FloatField, self).__init__(name, size, required)
self.error_default="The value is zero"
self.default_value=0
self.type_sql='float'.format(self.size)
def check(self, value):
"""Method for check if value is integer
Args:
value (float): The value to check
"""
self.error=False
self.txt_error=''
try:
value=str(value)
if value.find(',')!=-1:
value=value.replace(',', '.')
value=str(float(value))
if value==0 and self.required==True:
self.txt_error=self.error_default
self.error=True
except:
value="0"
self.txt_error=self.error_default
self.error=True
return value
def get_type_sql(self):
return 'FLOAT NOT NULL DEFAULT "0"'
class DecimalField(FloatField):
"""PhangoField field for Decimals fields."""
def __init__(self, name, size=11, required=False):
super().__init__(name, size, required)
self.type_sql='decimal(20,2)'
def get_type_sql(self):
return 'DECIMAL(20, 2) NOT NULL DEFAULT "0"'
class DoubleField(FloatField):
"""PhangoField field for Double fields."""
def __init__(self, name, size=11, required=False):
super().__init__(name, size, required)
self.type_sql='double'
def get_type_sql(self):
return 'DOUBLE NOT NULL DEFAULT "0"'
class CharField(PhangoField):
"""Simple alias for PhangoField"""
pass
class TextField(PhangoField):
"""Class used for text fields
Class used for text fields, use TEXT sql type for the this field.
"""
def __init__(self, name, required=False):
"""Init TextField class different to standard PhangoField
Args:
name (str): The name of new field
required (bool): Boolean for define if field is required or not
Attributes:
set_default (str): Set if the value es NOT NULL or not
"""
super().__init__(name, 11, required)
self.type_sql='text'
self.set_default='NOT NULL'
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'TEXT '+self.set_default
class LongTextField(TextField):
"""Class used for long text fields (32 bits size, 4G)
Class used for text fields, use LONGTEXT sql type for the this field.
"""
def __init__(self, name, required=False):
"""Init TextField class different to standard PhangoField
Args:
name (str): The name of new field
required (bool): Boolean for define if field is required or not
Attributes:
set_default (str): Set if the value es NOT NULL or not
"""
super().__init__(name, required)
self.type_sql='longtext'
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'LONGTEXT '+self.set_default
class HTMLField(TextField):
"""Class used for html fields
Class used for html fields, use TEXT sql type for the this field because is children of TextField. In this method self.escape is used for convert " to &quot;
"""
def __init__(self, name, required=False):
"""Init HTMLField class different to standard PhangoField
Args:
name (str): The name of new field
required (bool): Boolean for define if field is required or not
Attributes:
trusted_tags (list): A list with safe tags.
"""
super().__init__(name, required)
self.trusted_tags=[]
def check(self, value):
"""Check method for html values
This check method use beautifulsoap for clean and format html code
"""
# leach.clean('<p>"trial"</p><script></script>', tags=('p'))
"""
soup=BeautifulSoup(value, features='html.parser')
for tag in soup.findAll(True):
if tag.name not in self.trusted_tags:
tag.hidden=True
value=soup.renderContents().decode('utf-8')
if self.escape:
return value.replace('"', '&quot;')
else:
return value
"""
value=bleach.clean('<p>"trial"</p><script></script>', tags=self.trusted_tags)
if self.escape:
return value.replace('"', '&quot;')
else:
return value
class ForeignKeyField(IntegerField):
"""Subclass of IntegerField for create Foreign keys
A subclass of IntegerField used for create foreign keys in related tables.
"""
def __init__(self, name, related_table, size=11, required=False, identifier_field='id', named_field="id", select_fields=[]):
"""
Args:
name (str): Name of field
related_table (WebModel): The table-model related with this foreign key
size (int): The size of the new field in database. By default 11.
required (bool): Boolean for define if field is required or not
identifier_field (str): The Id field name from related table
named_field (str): The field from related table used for identify the row seleted from related table
select_fields (list): A series of fields names from related
"""
super(ForeignKeyField, self).__init__(name, size, required)
self.table_id=related_table.name_field_id
self.table_name=related_table.name
self.related_model=related_table
self.identifier_field=identifier_field
self.named_field=named_field
self.select_fields=select_fields
self.foreignkey=True
self.change_form(coreforms.SelectModelForm, [related_table, self.named_field, self.identifier_field])
self.default_value=None
def check(self, value):
value=super().check(value)
if value=='0' or value==0:
value=None
return value
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'INT NULL'
class BooleanField(IntegerField):
"""Field for boolean values
"""
def __init__(self, name, size=1):
"""
Args:
name (str): Name of field
size (int): The size of the new field in database. By default 11.
"""
required=False
self.yes_text=I18n.lang('common', 'yes', 'Yes')
self.no_text=I18n.lang('common', 'no', 'No')
super(IntegerField, self).__init__(name, size, required)
self.default_error="Need 0 or 1 value"
self.default_value=0
self.type_sql='tinyint(1)'
def check(self, value):
self.error=False
self.txt_error=''
try:
value=int(value)
if value<0 or value>1:
self.txt_error=self.default_error
self.error=True
value=0
except:
self.error=True
self.txt_error=self.default_error
value=0
value=str(value)
return value
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'BOOLEAN NOT NULL DEFAULT "0"'
def show_formatted(self, value):
value=int(value)
if value==0:
value=self.no_text
else:
value=self.yes_text
return str(value)

View file

@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""
Parameciofm is a series of wrappers for Bottle.py, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from collections import OrderedDict
from html import escape
#Forms para python3
class BaseForm:
"""The class used by all forms classes
BaseForm is the base class used for all form classes.
A form class is used for generate simple html forms, how input type, text type, hidden type, etc. PhangoField classes use this forms for generate automatically forms using GenerateAdminClass and others.
"""
def __init__(self, name, value):
"""
Args:
name (str): The html name for this form
value (str): The default value of this html form.
Attributes:
label (str): Label used in functions how show_form that generate a complete html form from a form class list
name (str): Name of the html form.
default_value (mixed): The default value of the form. Equal to value in typical html form.
css (str): Used for add css classes to the html form
type (str): Variable used for conventional html forms with html tag <input>
field (PhangoField): Field related with this form. Used in PhangoField.
required (boolean): If form is required or not, used in functions that generate forms.
name_field_id (str): The html id for the html form. Used for html things.
help (str): A string with help text, used in functions that generate forms.
"""
self.label=name
self.name=name
self.default_value=value
self.css=''
self.type='text'
self.field=None
self.required=False
self.txt_error=''
self.error=False
self.name_field_id=self.name+'_form'
self.help=''
self.placeholder=''
def form(self):
"""Method for returm the html code of the form
"""
return '<input type="'+self.type+'" class="'+self.css+'" name="'+self.name+'" id="'+self.name_field_id+'" value="'+self.setform(self.default_value)+'" placeholder="'+self.placeholder+'" />'
def show_formatted(self, value):
"""Method for show the value of form formatted
Args:
value (mixed): The value of field form
"""
return value
#Method for escape value for html input. DON'T CHANGE IF YOU DON'T KNOWN WHAT ARE YOU DOING
def setform(self, value):
"""A method for set the value in the form for escape and other things
Args:
value (mixed): The value of field form for set
"""
value=str(value)
return value.replace('"', '&quot;').replace("'", '&#39;')
def change_name(self, new_name):
"""A method for change the default form html name of the field form
Args:
new_name (str): The new name of the form. Always is finished with _form suffix
"""
self.name=new_name
self.name_field_id=self.name+'_form'
return ""
class SimpleTextForm(BaseForm):
"""Form for simple text
"""
def __init__(self, name, value):
super().__init__(name, value)
self.after_text=''
def form(self):
return super().form()+' '+self.after_text
class TextForm(BaseForm):
"""Form for simple text form
"""
def __init__(self, name, value):
super(TextForm, self).__init__(name, value)
def form(self):
return '<textarea class="'+self.css+'" name="'+self.name+'" id="'+self.name+'_form">'+self.setform(self.default_value)+'</textarea>'
class PasswordForm(BaseForm):
"""Form for password forms
"""
def __init__(self, name, value, show_password=False):
super(PasswordForm, self).__init__(name, value)
self.type='password'
self.show_password=show_password
def setform(self, value):
if not self.show_password:
return ""
else:
return value
class HiddenForm(BaseForm):
"""Form for hidden forms
"""
def __init__(self, name, value):
super(HiddenForm, self).__init__(name, value)
self.type='hidden'
class SelectForm(BaseForm):
"""Form for select html form
"""
def __init__(self, name, value, elements=OrderedDict()):
"""
Args:
name (str): The html name for this form
value (str): The default value of this html form
elements (OrderedDict): An ordered dict with the keys(the form value) and text label. Example, if you have a OrderedDict how {'0': 'Value selected'} in a html select form you have the next result: <select name="name"><option value="0">Value selected</option></selected>
"""
super(SelectForm, self).__init__(name, value)
self.arr_select=elements
def form(self):
the_form='<select name="'+self.name+'" id="'+self.name_field_id+'">\n'
arr_selected={self.default_value: 'selected'}
for k,v in self.arr_select.items():
arr_selected[k]=arr_selected.get(k, '')
the_form+="<option value=\""+self.setform(str(k))+"\" "+arr_selected[k]+">"+self.setform(str(v))+"</option>"
the_form+='</select>\n'
return the_form
class SelectModelForm(SelectForm):
"""Form for select html using a webmodel how base for get the data
"""
def __init__(self, name, value, model, field_name, field_value, field_parent=None):
"""
Args:
name (str): The html name for this form
value (str): The default value of this html form.
model (WebModel): The webmodel used for get the data for arr_select
field_name (str): The field used for get the name of every option in select
field_value (str): The field used for get the value of every option in select
field_parent (int): If the model have parents or children, the value of this argument
"""
super(SelectModelForm, self).__init__(name, value)
try:
self.default_value=int(self.default_value)
except:
self.default_value=0
self.arr_select=OrderedDict()
self.model=model
self.field_name=field_name
self.field_value=field_value
self.field_parent=field_parent
self.form=self.normal_form
if self.field_parent!=None:
self.form=self.parent_form
def normal_form(self):
"""Method for prepare the form hierated from SelectForm class, without parents
Method for prepare the form hierated from SelectForm class getting data from database using model attribute.
"""
self.arr_select['']=''
with self.model.select([self.field_name, self.field_value], True) as cur:
for arr_value in cur:
#print(self.model.fields[self.field_name])
self.arr_select[arr_value[self.field_value]]=self.model.fields[self.field_name].show_formatted(arr_value[self.field_name])
try:
self.default_value=int(self.default_value)
except:
self.default_value=0
return super().form()
def parent_form(self):
"""Method for prepare the form hierated from SelectForm class, with parents
Method for prepare the form hierated from SelectForm class getting data from database using model attribute.
"""
self.arr_select['']=''
arr_son={}
old_conditions=self.model.conditions
old_limit=self.model.limit
self.model.limit=''
self.model.set_conditions('WHERE 1=1', [])
with self.model.select([self.field_name, self.field_value, self.field_parent], True) as cur:
for arr_value in cur:
if not arr_value[self.field_parent] in arr_son:
arr_son[arr_value[self.field_parent]]=[]
if arr_value[self.field_value]!=self.model.model_id:
arr_son[arr_value[self.field_parent]].append([arr_value[self.field_value], self.model.fields[self.field_name].show_formatted(arr_value[self.field_name])])
self.create_son(0, arr_son)
self.model.conditions=old_conditions
self.model.limit=old_limit
try:
self.default_value=int(self.default_value)
except:
self.default_value=0
return super().form()
def create_son(self, parent_id, arr_son, separator=''):
"""Recursive method for generate parents and children dictionary
"""
if parent_id in arr_son:
for son in arr_son[parent_id]:
self.arr_select[son[0]]=separator+son[1]
son_separator=separator
if son[0] in arr_son:
son_separator+='--'
self.create_son(son[0],arr_son, son_separator)

View file

@ -0,0 +1,99 @@
#!/usr/bin/env python3
import sys
import pymysql
pymysql.install_as_MySQLdb()
class SqlClass:
connection={}
connection_method=''
def __init__(self):
self.error_connection=""
self.connected=False
SqlClass.connection_method=self.connect_to_db_sql
def dummy_connect(self, connection, name_connection="default"):
pass
def connect_to_db(self, connection, name_connection="default"):
SqlClass.connection_method(connection, name_connection)
SqlClass.connection_method=self.dummy_connect
def connect_to_db_sql(self, connection, name_connection="default"):
try:
self.connection[name_connection] = pymysql.connect(connection['host'],
user=connection['user'],
passwd=connection['password'],
db=connection['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
#return False
raise NameError(self.error_connection)
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
#if fetch_type=="ASSOC":
#fetch_type=pymysql.cursors.DictCursor
with self.connection[name_connection].cursor(pymysql.cursors.DictCursor) as cursor:
try:
cursor.execute(sql_query, arguments)
self.connection[name_connection].commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
self.error_connection="Error in query ||"+sql_query+"||: %s %s" % (e, v)
#return False
raise NameError(self.error_connection)
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
"""
def __del__(self):
for key in self.connection:
self.close(self.connection)
"""
"""
def close(self, name_connection="default"):
if self.connection[name_connection]:
self.connection[name_connection].close()
self.connection[name_connection]=False
pass
"""

View file

@ -0,0 +1,117 @@
#!/usr/bin/env python3
import sys
import MySQLdb.cursors
import sqlalchemy.pool as pool
import traceback
class SqlClass:
def __init__(self, connection):
self.max_overflow=-1
self.pool_size=15
self.error_connection=""
# Data of connection
self.connection=connection
# Sql connection
self.conn=None
self.connected=False
self.pool_recycle=3600
self.connect()
def connect(self):
"""
if self.conn==None:
"""
try:
def getconn():
return MySQLdb.connect(self.connection['host'],
user=self.connection['user'],
passwd=self.connection['password'],
db=self.connection['db'],
charset='utf8mb4',
cursorclass=MySQLdb.cursors.DictCursor)
"""
if mypool==None:
mypool=pool.QueuePool(getconn, max_overflow=self.max_overflow, pool_size=self.pool_size, recycle=self.pool_recycle, use_threadlocal=False)
"""
self.conn=getconn() #mypool.connect()
self.conn.ping(True)
"""
while not self.conn.open:
self.conn=SqlClass.mypool.connect()
"""
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
self.conn.close()
raise NameError(self.error_connection)
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
#if fetch_type=="ASSOC":
#fetch_type=MySQLdb.cursors.DictCursor
#with self.conn.cursor(MySQLdb.cursors.DictCursor) as cursor:
cursor=self.conn.cursor(MySQLdb.cursors.DictCursor)
try:
cursor.execute(sql_query, arguments)
self.conn.commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
#, traceback.format_exc()
self.error_connection="Error in query ||%s||Values: %s" % (sql_query, str(arguments))
self.conn.close()
#return False
raise NameError(self.error_connection)
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
def __del__(self):
if self.conn:
self.conn.close()
def close(self, name_connection="default"):
if self.conn:
self.conn.close()
self.conn=None
pass

View file

@ -0,0 +1,121 @@
#!/usr/bin/env python3
import sys
import pymysql.cursors
pymysql.install_as_MySQLdb
import sqlalchemy.pool as pool
import traceback
mypool=None
class SqlClass:
def __init__(self, connection):
self.max_overflow=-1
self.pool_size=0
self.error_connection=""
# Data of connection
self.connection=connection
# Sql connection
self.conn=None
self.connected=False
self.pool_recycle=3600
self.connect()
def connect(self):
global mypool
if self.conn==None:
try:
def getconn():
return pymysql.connect(self.connection['host'],
user=self.connection['user'],
passwd=self.connection['password'],
db=self.connection['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
if mypool==None:
mypool=pool.QueuePool(getconn, max_overflow=self.max_overflow, pool_size=self.pool_size, recycle=self.pool_recycle, use_threadlocal=True)
self.conn=mypool.connect()
self.conn.ping(True)
"""
while not self.conn.open:
self.conn=SqlClass.mypool.connect()
"""
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
self.conn.close()
raise NameError(self.error_connection)
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
#self.connect()
#if fetch_type=="ASSOC":
#fetch_type=MySQLdb.cursors.DictCursor
#with self.conn.cursor(MySQLdb.cursors.DictCursor) as cursor:
cursor=self.conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql_query, arguments)
self.conn.commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
#, traceback.format_exc()
self.error_connection="Error in query ||%s||Values: %s" % (sql_query, str(arguments))
self.conn.close()
#return False
raise NameError(self.error_connection)
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
def __del__(self):
if self.conn:
self.conn.close()
def close(self, name_connection="default"):
if self.conn:
self.conn.close()
self.conn=None
pass

View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
import sys
import pymysql.cursors
pymysql.install_as_MySQLdb
import sqlalchemy.pool as pool
import traceback
class SqlClass:
mypool=None
def __init__(self, connection):
self.max_overflow=-1
self.pool_size=0
self.error_connection=""
# Data of connection
self.connection=connection
# Sql connection
self.conn=None
self.connected=False
self.pool_recycle=3600
def connect(self):
if self.conn==None:
try:
def getconn():
return pymysql.connect(self.connection['host'],
user=self.connection['user'],
passwd=self.connection['password'],
db=self.connection['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
if SqlClass.mypool==None:
SqlClass.mypool=pool.QueuePool(getconn, max_overflow=self.max_overflow, pool_size=self.pool_size, recycle=self.pool_recycle)
self.conn=SqlClass.mypool.connect()
while not self.conn.open:
self.conn=SqlClass.mypool.connect()
self.conn.ping(True)
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
self.conn.close()
raise NameError(self.error_connection)
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
self.connect()
#if fetch_type=="ASSOC":
#fetch_type=pymysql.cursors.DictCursor
#with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor=self.conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql_query, arguments)
self.conn.commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
#, traceback.format_exc()
self.error_connection="Error in query ||%s||Values: %s" % (sql_query, str(arguments))
#return False
raise NameError(self.error_connection)
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
def __del__(self):
if self.conn:
self.conn.close()
def close(self, name_connection="default"):
if self.conn:
self.conn.close()
self.conn=None
pass

View file

@ -0,0 +1,253 @@
#!/usr/bin/env python3
import sys
#import pymysql.cursors
#pymysql.install_as_MySQLdb
#import sqlalchemy.pool as pool
from sqlalchemy import create_engine
import traceback
#mypool=None
#engine = create_engine('sqlite:///:memory:', echo=True)
engine=None
class SqlClass:
"""Class used how interface to sqlalchemy for connect to mysql engine
Attributes:
cursors_connect (pymysql.cursors.DictCursor): Cursor dict connection to database
disable_pool (boolean): If True then not exists mysql pool, if False, use sql pool for better connections.
pymsql_install (boolean): If True, pymysql is used how mysqldb classic python module.
pool_size (int): The size of the mysql pool.
"""
cursors_connect=None
disable_pool=False
pymysql_install=False
pool_size=15
def __init__(self, connection):
"""
Args:
connection (dict): A dict with the configurations of SqlClass connection
Attributes:
error_connection (str): A string where errors are saved
connection (dict): A dict with the configurations of SqlClass connection
conn (MySQL Connection): A PyMySQL or Mysqldb connection
connected (bool): Simple bool for check if was connected to mysql
pool_recycle (int): Time limite for recycle the pool by inactivity
"""
#self.max_overflow=-1
self.error_connection=""
# Data of connection
self.connection=connection
# Sql connection
self.conn=None
self.connected=False
self.pool_recycle=3600
self.last_query=''
self.connect()
def connect(self):
"""Method for connect to mysql db using pymysql or mysqldb
"""
global engine
if not SqlClass.disable_pool:
if not engine:
try:
if self.connection['db_type']=='pymysql':
import pymysql.cursors
pymysql.install_as_MySQLdb
SqlClass.pymysql_install=True
SqlClass.cursors_connect=pymysql.cursors.DictCursor
else:
import MySQLdb.cursors
SqlClass.cursors_connect=MySQLdb.cursors.DictCursor
engine=create_engine("mysql+%s://%s:%s@%s/%s?charset=utf8mb4" % (self.connection['db_type'], self.connection['user'], self.connection['password'], self.connection['host'], self.connection['db']), pool_recycle=self.pool_recycle, echo_pool=True, pool_size=self.pool_size, pool_pre_ping=True)
#Postgre
#engine = create_engine("postgresql+psycopg2://scott:tiger@localhost:5432/mydatabase")
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
#self.conn.close()
raise NameError(self.error_connection)
self.conn=engine.raw_connection()
#self.conn.ping(True)
else:
if self.connection['db_type']=='pymysql':
import pymysql.cursors
if not SqlClass.pymysql_install:
pymysql.install_as_MySQLdb
SqlClass.pymysql_install=True
"""
connection = pymysql.connect(host='localhost',
user='user',
password='passwd',
database='db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
"""
self.conn=pymysql.connect(host=self.connection['host'], user=self.connection['user'], passwd=self.connection['password'], db=self.connection['db'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor)
else:
import MySQLdb.cursors
self.conn=MySQLdb.connect(host=self.connection['host'],
user=self.connection['user'],
passwd=self.connection['password'],
db=self.connection['db'],
charset='utf8mb4',
cursorclass=MySQLdb.cursors.DictCursor)
pass
"""
if self.conn==None:
try:
def getconn():
return pymysql.connect(self.connection['host'],
user=self.connection['user'],
passwd=self.connection['password'],
db=self.connection['db'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
if mypool==None:
mypool=pool.QueuePool(getconn, max_overflow=self.max_overflow, pool_size=self.pool_size, recycle=self.pool_recycle, use_threadlocal=True)
self.conn=mypool.connect()
self.conn.ping(True)
self.connected=True
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
self.error_connection="Error in connection: %s %s" % (e, v)
self.conn.close()
raise NameError(self.error_connection)
"""
#Make def query more simple if not debugging.
def query(self, sql_query, arguments=[], name_connection="default"):
"""Method for send a sql query to mysql server
Args:
sql_query (str): The sql sentence to execute. For data you should use %s character.
arguments (list): The data used in sql sentence. This data substitute the %s characters.
name_connection (str): The name of dict element with the configuration of connection, without used in this moment.
"""
cursor=self.conn.cursor(SqlClass.cursors_connect)
try:
cursor.execute(sql_query, arguments)
self.conn.commit()
#if hasattr(cursor, '_executed'):
# self.last_query=cursor._executed
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
#if hasattr(cursor, '_executed'):
# self.last_query=cursor._executed
self.error_connection="Error in query ||%s||Values: %s" % (self.last_query, str(arguments))
self.conn.close()
#return False
raise NameError(self.error_connection)
#self.connect()
#if fetch_type=="ASSOC":
#fetch_type=MySQLdb.cursors.DictCursor
#with self.conn.cursor(MySQLdb.cursors.DictCursor) as cursor:
"""
cursor=self.conn.cursor(pymysql.cursors.DictCursor)
try:
cursor.execute(sql_query, arguments)
self.conn.commit()
return cursor
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
if hasattr(cursor, '_last_executed'):
sql_query=cursor._last_executed
#, traceback.format_exc()
self.error_connection="Error in query ||%s||Values: %s" % (sql_query, str(arguments))
#return False
raise NameError(self.error_connection)
"""
#Fetcho row return dictionary if is defined in query.
#def fetch(self, cursor):
#return cursor.fetchone()
def __del__(self):
"""Typical method used when class is deleted from memory. Close the connextion if exists.
"""
if self.conn:
self.conn.close()
def close(self, name_connection="default"):
"""Method used for close the connection if you want close connection and open other.
"""
if self.conn:
self.conn.close()
self.conn=None
pass

View file

@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""
Parameciofast is a series of wrappers for FastAPI, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import argparse
import os,traceback
import sys, inspect
import shutil
import re
from datetime import date
from pathlib import Path
from colorama import init, Fore, Back, Style
from importlib import import_module, reload
from parameciofast.libraries.db.webmodel import WebModel
sys.path.insert(0, os.path.realpath('.'))
#try:
from settings import config
#except:
#print('You need a settings directory with a parameciofast configuration')
#sys.exit(1)
def start():
"""Function for create and update mysql tables using webmodel classes and fields how source.
"""
connection=WebModel.connection()
#connection.connect_to_db(WebModel.connections['default'])
parser = argparse.ArgumentParser(description='A tool for create tables in databases using models from Cromosoma')
parser.add_argument('--model', help='Model python path', required=True)
parser.add_argument('--config', help='The config file', required=False)
args = parser.parse_args()
init()
#Import config
config_file='config'
if args.config!=None:
config_file=args.config
try:
config=import_module('settings.'+config_file)
except:
e = sys.exc_info()[0]
v = sys.exc_info()[1]
print(Fore.WHITE+Back.RED+Style.BRIGHT+"Config file not found: %s %s" % (e, v))
exit(1)
#print(WebModel.connections)
if '/' in args.model:
args.model=args.model.replace('/', '.')[:-3] #.replace('.py', '')
try:
model=import_module(args.model)
for name, obj in inspect.getmembers(sys.modules[model.__name__]):
if inspect.isclass(obj):
if obj.__module__==args.model and hasattr(obj, 'webmodel'):
WebModel.model[name.lower()]=obj(connection)
#WebModel.modelobj
except:
"""
e = sys.exc_info()[0]
v = sys.exc_info()[1]
print(Fore.WHITE+Back.RED+Style.BRIGHT +"Error, file with model not found: %s %s" % (e, v))
"""
print("Exception in user code:")
print("-"*60)
traceback.print_exc(file=sys.stdout)
print("-"*60)
exit(1)
#load the table of databases
cursor=connection.query("show tables")
table_exists=[]
for row in cursor:
table=list(row.values())[0]
if table in WebModel.model:
table_exists.append(table)
#If don't want order
#set([1,2,3,4]) - set([2,5])
tables=list(WebModel.model.keys())
#Array diff ordered
new_tables=[x for x in tables if x not in table_exists]
# Get foreignkeys
# SELECT * FROM information_schema.TABLE_CONSTRAINTS WHERE CONSTRAINT_SCHEMA='catalogdev_db' AND information_schema.TABLE_CONSTRAINTS.CONSTRAINT_TYPE = 'FOREIGN KEY' ;
foreignkey_fields={}
#| CONSTRAINT_CATALOG | CONSTRAINT_SCHEMA | CONSTRAINT_NAME | TABLE_SCHEMA | TABLE_NAME | CONSTRAINT_TYPE |
#+--------------------+-------------------+-----------------------------------------+---------------+-------------------+-----------------+
#| def | catalogdev_db | product_id_attributesIDX | catalogdev_db | attributes | FOREIGN KEY |
#WebModel.connections
db_name=WebModel.connections['default']['db']
with connection.query('SELECT * FROM information_schema.TABLE_CONSTRAINTS WHERE CONSTRAINT_SCHEMA=%s AND information_schema.TABLE_CONSTRAINTS.CONSTRAINT_TYPE = %s', [db_name, 'FOREIGN KEY']) as cursor:
for row in cursor:
if not row['TABLE_NAME'] in foreignkey_fields:
foreignkey_fields[row['TABLE_NAME']]=[]
foreignkey_fields[row['TABLE_NAME']]=row['CONSTRAINT_NAME'].replace('_{}IDX'.format(row['TABLE_NAME']), '')
pass
#If don't want order
#new_tables=set(tables)-set(table_exists)
#Need order new_tables
changes=0
#Create new tables
if len(new_tables)>0:
print(Style.BRIGHT+"Creating new tables...")
changes+=1
for table in new_tables:
print(Style.NORMAL+"--Creating table "+table+"...")
connection.query(WebModel.model[table].create_table())
for table in new_tables:
print("--Adding indexes and constraints for the new table "+table)
if table in WebModel.arr_sql_index:
for k_field, index in WebModel.arr_sql_index[table].items():
print("---Added index to "+k_field)
connection.query(index)
if table in WebModel.arr_sql_set_index:
for k_set, index_set in WebModel.arr_sql_set_index[table].items():
if index_set!="":
connection.query(index_set)
print("---Added constraint to "+k_set)
print("--Adding uniques elements for the new table")
if table in WebModel.arr_sql_unique:
for k_field, unique_set in WebModel.arr_sql_unique[table].items():
if unique_set!="":
connection.query(unique_set)
print("---Added unique to "+unique_set)
#See if changes exists
#Check if created tables are modified.
try:
model_old=import_module('backups.'+args.model)
for name, obj in inspect.getmembers(sys.modules[model_old.__name__]):
if inspect.isclass(obj):
if obj.__module__=='backups.'+args.model and hasattr(obj, 'webmodel'):
WebModel.model['old_'+name.lower()]=obj(connection)
print(Style.BRIGHT+"Checking old versions of model for find changes...")
for table in tables:
#print(table)
table_fields={table: {}}
# Field | Type | Null | Key | Default | Extra |
#| id | int(11) | NO | PRI | NULL | auto_increment |
with connection.query('describe %s' % table) as cursor:
#all_fields=cursor.fetchall()
#print(all_fields)
for row in cursor:
table_fields[table][row['Field']]={'type': row['Type'], 'key': row['Key']}
pass
#print(table_fields)
#connection.query("")
#Check if new table
#fields_to_add, fields_to_modify, fields_to_add_index, fields_to_add_constraint, fields_to_add_unique, fields_to_delete_index, fields_to_delete_unique, fields_to_delete_constraint, fields_to_delete
fields_to_add=[]
fields_to_modify=[]
fields_to_add_index=[]
fields_to_add_constraint=[]
fields_to_add_unique=[]
fields_to_delete_index=[]
fields_to_delete_unique=[]
fields_to_delete_constraint=[]
fields_to_delete=[]
old_table='old_'+table
if not old_table in WebModel.model:
WebModel.model[old_table]=WebModel.model[table]
for f, v in WebModel.model[table].fields.items():
#if not f in WebModel.model[old_table].fields:
if not f in table_fields[table]:
fields_to_add.append(f)
#Add index
if v.indexed==True:
fields_to_add_index.append(f)
changes+=1
#Add unique
if v.unique==True:
fields_to_add_unique.append(f)
changes+=1
#Add constraint
if v.foreignkey==True:
fields_to_add_constraint.append(f)
changes+=1
changes+=1
#If exists field in old webmodel and new
else:
v_old=WebModel.model[old_table].fields[f]
if v.get_type_sql()!=v_old.get_type_sql():
fields_to_modify.append(f)
changes+=1
#Add index
#if v.indexed==True and v_old.indexed==False:
if v.indexed==True and table_fields[table][f]['key']!='MUL':
fields_to_add_index.append(f)
changes+=1
#if v.indexed==False and v_old.indexed==True:
if v.indexed==False and table_fields[table][f]['key']=='MUL' and v.foreignkey==False:
fields_to_delete_index.append(f)
changes+=1
#Add unique
#if v.unique==True and v_old.unique==False:
if v.unique==True and table_fields[table][f]['key']!='UNI':
fields_to_add_unique.append(f)
changes+=1
#if v.unique==False and v_old.unique==True:
if v.unique==False and table_fields[table][f]['key']=='UNI':
fields_to_delete_unique.append(f)
changes+=1
#Add constraint
#if v.foreignkey==True and v_old.foreignkey==False:
if v.foreignkey==True and table_fields[table][f]['key']!='MUL':
fields_to_add_constraint.append(f)
changes+=1
#if v.foreignkey==False and v_old.foreignkey==True:
if v.foreignkey==False and table_fields[table][f]['key']=='MUL':
if table in foreignkey_fields:
if f in foreignkey_fields[table]:
fields_to_delete_constraint.append(f)
changes+=1
# Clean fields
#for f, v in WebModel.model[old_table].fields.items():
for f, v in table_fields[table].items():
if not f in WebModel.model[table].fields:
#Add constraint
#if v.foreignkey==True:
if table in foreignkey_fields:
if f in foreignkey_fields[table]:
fields_to_delete_constraint.append(f)
changes+=1
fields_to_delete.append(f)
changes+=1
WebModel.model[table].update_table(fields_to_add, fields_to_modify, fields_to_add_index, fields_to_add_constraint, fields_to_add_unique, fields_to_delete_index, fields_to_delete_unique, fields_to_delete_constraint, fields_to_delete)
#for field_update in arr_update:
#Make a for in fields, if the field not exist in old model, create, if is not same type, recreate. If no have index now, delete index, if is a new index, create, same thing with uniques
#for field in WebModel.model
except ImportError:
pass
except:
print("Exception in user code:")
print("-"*60)
traceback.print_exc(file=sys.stdout)
print("-"*60)
exit(1)
original_file_path=args.model.replace('.', '/')+'.py'
backup_path='backups/'+original_file_path
if changes>0:
print(Style.BRIGHT+"Creating backup of the model. WARNING: DON'T DELETE BACKUPS DIRECTORY IF YOU WANT MAKE CHANGES IN THE FUTURE WITHOUT MODIFY DIRECTLY THE DATABASE")
create_backup(original_file_path, backup_path)
else:
if not os.path.isfile(backup_path):
create_backup(original_file_path, backup_path)
# Execute script
arr_script_model=args.model.split('.')
arr_script_model.pop()
script_model='.'.join(arr_script_model)+'.scripts.install'
script_py=script_model.replace('.', '/')+'.py'
if os.path.isfile(script_py):
locked_file='/'.join(arr_script_model)+'/scripts/locked'
if not os.path.isfile(locked_file):
script_install=import_module(script_model)
script_install.run()
f=open(locked_file, 'w')
f.write('OK')
f.close()
connection.close()
#script_model=args.model+''
print(Style.BRIGHT+"All tasks finished")
def create_backup(original_file_path, file_path):
#Create copy
path=os.path.dirname(file_path)
p=Path(path)
if not p.is_dir():
p.mkdir(0o755, True)
with open(path+'/__init__.py', 'w') as f:
f.write("#!/usr/bin/env python3\n")
#Create path
if os.path.isfile(file_path):
today = date.today()
shutil.copy(file_path, file_path+'.'+today.strftime("%Y%M%d%H%M%S"))
new_file=""
f=open(original_file_path)
for line in f:
"""
new_line=line.replace("model[\"", "model[\"old_")
new_line=new_line.replace("model['", "model['old_")
new_line=new_line.replace("WebModel(\"", "WebModel(\"old_")
new_line=new_line.replace("WebModel('", "WebModel('old_")
"""
new_file+=line
f.close()
f=open(file_path, 'w')
f.write(new_file)
f.close()

View file

@ -0,0 +1,94 @@
"""
Parameciofm is a series of wrappers for Flask, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.webmodel import PhangoField,WebModel
import json
class ArrayField(PhangoField):
"""Field for save json arrays with determined values"""
def __init__(self, name, field_type, required=False):
"""
Args:
name (str): The name of new field
field_type (PhangoField): The type of PhangoField for save in ArrayField
required (bool): Boolean for define if field is required or not
"""
super().__init__(name, required)
self.field_type=field_type
self.error_default='Sorry, the json array is invalid'
self.set_default=''
self.type_sql='text'
def check(self, value):
if type(value).__name__=='str':
try:
value=json.loads(value)
except json.JSONDecodeError:
value=[]
self.error=True
self.txt_error=self.error_default
elif type(value).__name__!='list':
value=[]
self.error=True
self.txt_error='Sorry, the json array is invalid'
error=0
if type(self.field_type).__name__!='ArrayField':
for k,v in enumerate(value):
value[k]=self.field_type.check(v)
if self.field_type.error:
error+=1
if error>0:
self.error=True
final_value=json.dumps(value)
final_value=WebModel.escape_sql(final_value)
return final_value
def get_type_sql(self):
return 'JSON'
def show_formatted(self, value):
return ", ".join(value)
def loads(self, value):
try:
return json.loads(value)
except:
return False

View file

@ -0,0 +1,71 @@
"""
Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.corefields import IntegerField
try:
from parameciofast.libraries.db.extraforms.colorform import ColorForm
except:
class ColorForm:
pass
class ColorField(IntegerField):
"""Simple field for save colors in hexadecimal format."""
def __init__(self, name, size=11, required=False):
super().__init__(name, size, required)
self.name_form=ColorForm
def check(self, value):
value=str(value).replace('#', '0x')
value=int(value, 16)
if value<0 or value>0xffffff:
value=0
return value
def get_hex_color(self, value):
value=str(hex(int(value))).replace('0x', '')
c=len(value)
if(c<6):
repeat=6-c
value=('0'*repeat)+value
value='#'+value
return value
def show_formatted(self, value):
value=str(hex(int(value))).replace('0x', '')
c=len(value)
if(c<6):
repeat=6-c
value=('0'*repeat)+value
value='#'+value
return '<div style="width:50px;height:50px;background-color:%s;"></div>' % value;

View file

@ -0,0 +1,64 @@
"""
Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.corefields import PhangoField
from parameciofast.libraries import datetime
try:
from parameciofast.libraries.db.extraforms.dateform import DateForm
except:
class DateForm:
pass
class DateField(PhangoField):
"""Field for use and save dates in YYYYMMDDHHSS format"""
def __init__(self, name, size=255, required=False):
super().__init__(name, size, required)
self.name_form=DateForm
self.utc=True
self.error_default='Error: Date format invalid'
def check(self, value):
if self.utc:
value=datetime.local_to_gmt(value)
elif not datetime.obtain_timestamp(value):
self.error=True
self.txt_error=self.error_default
return ''
if value==False:
self.error=True
self.txt_error=self.error_default
return ''
return value
def show_formatted(self, value):
return datetime.format_date(value)

View file

@ -0,0 +1,87 @@
"""
Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.corefields import PhangoField
from parameciofast.libraries import datetime
try:
from parameciofast.libraries.db.extraforms.dateform import DateForm
except:
class DateForm:
pass
class DateTimeField(PhangoField):
"""Field for use and save dates in MySQL date format"""
def __init__(self, name, size=255, required=False):
super().__init__(name, size, required)
self.name_form=DateForm
self.utc=False
self.error_default='Error: Date format invalid in %s' % self.name
self.type_sql='datetime'
def check(self, value):
if self.utc:
value=datetime.local_to_gmt(value)
elif not datetime.obtain_timestamp(value):
self.error=True
self.txt_error=self.error_default+' '+value
return '0000-00-00 00:00:00'
if value==False:
self.error=True
self.txt_error=self.error_default+' '+value
return '0000-00-00 00:00:00'
else:
"""
format_date_txt="YYYY/MM/DD"
format_time_txt="HH:mm:ss"
"""
value=datetime.format_local_strtime('YYYY-MM-DD HH:mm:ss', value)
return value
def show_formatted(self, value):
# Convert to paramecio value
value=str(value)
value=value.replace('-', '').replace(':', '').replace(' ', '')
return datetime.format_date(value)
def get_type_sql(self):
"""Method for return the sql code for this type
"""
return 'DATETIME NOT NULL'

View file

@ -0,0 +1,83 @@
"""
Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.webmodel import WebModel, PhangoField
import json
class DictField(PhangoField):
"""Field for save json dicts with determined values"""
def __init__(self, name, field_type, required=False):
"""
Args:
name (str): The name of field
field_type (PhangoField): The type of PhangoField for save in ArrayField
required (bool): Boolean for define if field is required or not
"""
super().__init__(name, required)
self.field_type=field_type
self.error_default='Sorry, the json dict is invalid'
self.set_default='NOT NULL'
self.type_sql='longtext'
def check(self, value):
if type(value).__name__=='str':
try:
value=json.loads(value)
except json.JSONDecodeError:
value={}
self.error=True
self.txt_error=self.error_default
elif type(value).__name__!='dict':
value={}
self.error=True
self.txt_error=self.error_default
error=0
for k,v in value.items():
value[k]=self.field_type.check(v)
if self.field_type.error:
error+=1
final_value=json.dumps(value)
if error>0:
self.error=True
#final_value=WebModel.escape_sql(final_value)
return final_value
def get_type_sql(self):
return 'JSON '+self.set_default
def show_formatted(self, value):
return ", ".join(value)

View file

@ -0,0 +1,47 @@
"""
Parameciofm is a series of wrappers for bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.corefields import CharField
import re
mail_pattern=re.compile(r"\w[\w\.-]*@\w[\w\.-]+\.\w+")
class EmailField(CharField):
"""Field for save and check email addreses"""
def __init__(self, name, size=1024, required=False):
super().__init__(name, size, required)
self.error_default='Error: No valid format in '+self.name
def check(self, value):
value=super().check(value)
self.error=False
self.txt_error=''
if not mail_pattern.match(value):
self.error=True
value=""
self.txt_error=self.error_default+' Email:'+value
return value

View file

@ -0,0 +1,176 @@
import os
import sys
from pathlib import Path
from parameciofast.libraries.db.corefields import CharField
from parameciofast.libraries.db.extraforms.fileform import FileForm
from parameciofast.libraries import httputils
from parameciofast.libraries.keyutils import create_key
import traceback
from bottle import request
from uuid import uuid4
#from parameciofast.libraries.db.extraforms.fileform import FileForm
class FileField(CharField):
def __init__(self, name, save_folder='media/upload/files', sizes=None, module=None, size=255, required=False):
super().__init__(name, size, required)
self.yes_prefix=True
self.suffix=''
# Is relative to media folder of paramecio
#if module!=None:
self.save_folder=save_folder
self.file_related=True
self.sizes=sizes
self.name_form=FileForm
self.extra_parameters=[self.save_folder]
def change_folder(self, folder):
pass
def check(self, value):
files_uploaded=request.files
field_file=self.name+'_file'
#if not change
if not field_file in files_uploaded:
if value=='':
if self.model:
if self.model.updated:
old_reset=self.model.yes_reset_conditions
self.model.yes_reset_conditions=False
with self.model.select([self.name]) as cur:
for arr_image in cur:
if arr_image[self.name]!='':
try:
os.remove(arr_image[self.name])
except:
pass
#if arr_image[self.name]!=save_file and arr_image[self.name]!='':
#value=arr_image[self.name]
self.model.yes_reset_conditions=old_reset
self.txt_error='Field is empty'
self.error=True
return ''
else:
value=os.path.basename(value)
return self.save_folder+'/'+value
# Load image file
file_bytecode=files_uploaded[field_file].file
filename=files_uploaded[field_file].filename
realfilename, ext = os.path.splitext(filename)
prefix=''
if self.yes_prefix==True:
#prefix=uuid4().hex+'_'
prefix=create_key(5).replace('/', '-').replace('#', '-')+self.suffix+'_'
filename=prefix+filename
save_file=self.save_folder+'/'+filename
# Save file
try:
#Check if directory exists
if not os.path.isdir(self.save_folder):
# Try create if not
try:
p=Path(self.save_folder)
p.mkdir(mode=0o755, parents=True)
except:
self.error=True
self.txt_error='Error: cannot create the directory where save the image.Check permissions,'
return ""
#files_uploaded[field_file].save(self.save_folder, overwrite=True)
if os.path.isfile(save_file):
os.remove(save_file)
# Delete old files
if self.model!=None:
if self.model.updated:
#old_conditions=self.model.conditions
old_reset=self.model.yes_reset_conditions
self.model.yes_reset_conditions=False
with self.model.select([self.name]) as cur:
for arr_file in cur:
if arr_file[self.name]!=save_file and arr_file[self.name]!='':
if os.path.isfile(arr_file[self.name]):
os.remove(arr_file[self.name])
self.model.yes_reset_conditions=old_reset
#self.model.conditions=old_conditions
return save_file
except:
self.error=True
self.txt_error='Error: cannot save the image file, Exists directory for save the file? '+traceback.format_exc()
print(traceback.format_exc())
return ""
def show_formatted(self, value):
return os.path.basename(value)

View file

@ -0,0 +1,157 @@
#!/usr/bin/env python3
import json
from parameciofast.libraries.db.webmodel import PhangoField
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.db.extraforms.i18nform import I18nForm
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.httputils import GetPostFiles
import json
import re
class I18nField(PhangoField):
def __init__(self, name, form=None):
super().__init__(name)
if form==None:
form=BaseForm(name, '')
self.name_form=I18nForm
self.extra_parameters=[form]
self.show_formatted_value=True
self.show_blank=False
arr_i18n={i:'' for i in I18n.dict_i18n}
self.default_value=json.dumps(arr_i18n)
def change_form(self, form):
self.extra_parameters=[form]
def check_value(self, value):
return super().check(value)
def check(self, value):
self.error=False
self.txt_error=''
arr_values={}
try:
arr_values=json.loads(value)
if not arr_values:
arr_values={}
except:
arr_values={}
arr_real_values={}
error_values=0
for lang in I18n.dict_i18n:
arr_real_values[lang]=arr_values.get(lang, '')
arr_real_values[lang]=self.check_value(arr_real_values[lang])
if not arr_real_values[lang] or arr_real_values[lang]=='None':
arr_real_values[lang]=''
error_values+=1
self.error=False
arr_values=arr_real_values
if error_values:
if error_values==len(arr_values):
self.error=True
self.txt_error='Sorry, You field language is empty'
return json.dumps(arr_values)
"""
if arr_values[I18n.default_lang]=='':
self.error=True
self.txt_error='Sorry, You need default language '+I18n.default_lang
return json.dumps(arr_values)
"""
return json.dumps(arr_values)
def get_type_sql(self):
return 'TEXT NOT NULL'
def obtain_lang_value(self, lang, value):
return value.get(self.name+'_'+lang, '')
def obtain_lang_from_post(self, lang, value):
#getpost=GetPostFiles()
#getpost.obtain_post()
return "" #GetPostFiles.post.get(self.name+'_'+lang, '')
def show_formatted(self, value):
if value=='':
value='{"en-US": "", "es-ES": ""}'
value=json.loads(value)
lang=I18n.get_default_lang()
if value[lang]!='' or self.show_blank:
return value[lang]
lang_value=value[I18n.default_lang]
if value[I18n.default_lang]=='':
for l in value:
if value[l]!='':
lang_value=value[l]
break
return lang_value
@staticmethod
def get_value(value):
value=json.loads(value)
lang=I18n.get_default_lang()
if value[lang]!='':
return value[lang]
return value[I18n.default_lang]
class I18nHTMLField(I18nField):
def check_value(self, value):
return re.sub('<.*?script?>', '', value)
class I18nPhangoField(I18nField):
def __init__(self, name, field_class, form=None):
super().__init__(name, form)
self.field_class=field_class
def check_value(self, value):
f=self.field_class
return f.check(value)

View file

@ -0,0 +1,276 @@
import os
import sys
from pathlib import Path
from parameciofast.libraries.db.corefields import CharField
from parameciofast.libraries.db.extraforms.fileform import FileForm
from parameciofast.libraries import httputils
from parameciofast.libraries.keyutils import create_key
import traceback
from bottle import request
try:
from PIL import Image
except:
print("Unexpected error:", sys.exc_info()[0])
raise
from uuid import uuid4
#from parameciofast.libraries.db.extraforms.fileform import FileForm
class ImageField(CharField):
def __init__(self, name, save_folder='media/upload/images', sizes=None, module=None, size=255, required=False):
super().__init__(name, size, required)
self.yes_prefix=True
#self.name_form=FileForm
self.thumbnail={'mini_': 150}
self.yes_thumbnail=False
self.default_quality_thumb=95
self.suffix=''
# Is relative to media folder of paramecio
#if module!=None:
self.save_folder=save_folder
self.file_related=True
self.sizes=sizes
self.name_form=FileForm
self.extra_parameters=[self.save_folder]
def change_folder(self, folder):
pass
def check(self, value):
files_uploaded=request.files
field_file=self.name+'_file'
#if not change
if not field_file in files_uploaded:
if value=='':
if self.model:
if self.model.updated:
old_reset=self.model.yes_reset_conditions
self.model.yes_reset_conditions=False
with self.model.select([self.name]) as cur:
for arr_image in cur:
if arr_image[self.name]!='':
try:
os.remove(arr_image[self.name])
except:
pass
#if arr_image[self.name]!=save_file and arr_image[self.name]!='':
#value=arr_image[self.name]
self.model.yes_reset_conditions=old_reset
self.txt_error='Field is empty'
self.error=True
return ''
else:
value=os.path.basename(value)
return self.save_folder+'/'+value
# Load image file
file_bytecode=files_uploaded[field_file].file
filename=files_uploaded[field_file].filename
try:
im=Image.open(file_bytecode)
except IOError:
self.error=True
self.txt_error='Error, file not have a valid format'
return ""
real_width=im.size[0]
real_height=im.size[1]
if self.sizes:
if 'maximum' in self.sizes:
if self.sizes['maximum'][0]<real_width or self.sizes['maximum'][1]<real_height:
self.error=True
self.txt_error='Wrong size. Maximum size is '+str(self.sizes['maximum'][0])+'x'+str(self.sizes['maximum'][1])
im.close()
return ""
if 'minimum' in self.sizes:
if self.sizes['minimum'][0]>real_width or self.sizes['minimum'][1]>real_height:
self.error=True
self.txt_error='Wrong size. Minimum size is '+str(self.sizes['minimum'][0])+'x'+str(self.sizes['minimum'][1])
im.close()
return ""
if 'resize' in self.sizes:
height_t=0
width_t=0
if real_height<=self.sizes['resize'][1]:
height_t=self.sizes['resize'][1]
if real_width>self.sizes['resize'][0]:
width_t=self.sizes['resize'][0]
if height_t==0:
ratio=(real_width/width_t)
height_t=round(real_height/ratio)
size=(width_t, height_t)
if width_t>0 and height_t>0:
im.thumbnail(size, 3)
format_image=im.format
if format_image!='JPEG' and format_image!='GIF' and format_image!='PNG':
self.error=True
self.txt_error='Format is wrong. Requires JPEG or PNG formats'
im.close()
return ""
# Create thumbnails and move file
realfilename, ext = os.path.splitext(filename)
prefix=''
if self.yes_prefix==True:
#prefix=uuid4().hex+'_'
prefix=create_key(5).replace('/', '-').replace('#', '-')+self.suffix+'_'
filename=prefix+filename
save_file=self.save_folder+'/'+filename
if self.yes_thumbnail:
for name, width_t in self.thumbnail.items():
im_thumb=im.copy()
ratio=(real_width/width_t)
height_t=round(real_height/ratio)
size=(width_t, height_t)
save_file_thumb=self.save_folder+'/'+name+filename
im_thumb.thumbnail(size, Image.ANTIALIAS)
im_thumb.save(save_file_thumb, "JPEG", quality=self.default_quality_thumb)
im_thumb.close()
# Save file
try:
#Check if directory exists
if not os.path.isdir(self.save_folder):
# Try create if not
try:
p=Path(self.save_folder)
p.mkdir(mode=0o755, parents=True)
except:
im.close()
self.error=True
self.txt_error='Error: cannot create the directory where save the image.Check permissions,'
return ""
#files_uploaded[field_file].save(self.save_folder, overwrite=True)
if os.path.isfile(save_file):
os.remove(save_file)
im.save(save_file)
# Delete old files
if self.model!=None:
if self.model.updated:
#old_conditions=self.model.conditions
old_reset=self.model.yes_reset_conditions
self.model.yes_reset_conditions=False
with self.model.select([self.name]) as cur:
for arr_image in cur:
if arr_image[self.name]!=save_file and arr_image[self.name]!='':
if os.path.isfile(arr_image[self.name]):
os.remove(arr_image[self.name])
self.model.yes_reset_conditions=old_reset
#self.model.conditions=old_conditions
im.close()
return save_file
except:
im.close()
self.error=True
self.txt_error='Error: cannot save the image file, Exists directory for save the file? '+traceback.format_exc()
print(traceback.format_exc())
return ""
def show_formatted(self, value):
return os.path.basename(value)

View file

@ -0,0 +1,19 @@
from parameciofast.libraries.db.corefields import CharField
import ipaddress
class IpField(CharField):
def check(self, value):
try:
value=str(ipaddress.ip_address(value))
except:
self.error=True
self.txt_error='No Valid IP'
value=""
return value

View file

@ -0,0 +1,115 @@
"""
Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.webmodel import WebModel, PhangoField
import sys
try:
import ujson as json
except:
import json
class JsonField(PhangoField):
"""Field for save json datatype values"""
def __init__(self, name, field_type, required=False):
"""
Args:
name (str): The name of field
field_type (PhangoField): The type of PhangoField for save in JsonField
required (bool): Boolean for define if field is required or not
"""
super().__init__(name, required)
self.field_type=field_type
self.error_default='Sorry, the json dict is invalid'
self.set_default='NOT NULL'
self.type_sql='longtext'
def check(self, value):
if type(value).__name__=='str':
try:
value=json.loads(value)
except json.JSONDecodeError:
value={}
self.error=True
self.txt_error=self.error_default
elif type(value).__name__!='dict':
value={}
self.error=True
self.txt_error=self.error_default
for k,v in value.items():
value[k]=self.field_type.check(v)
final_value=json.dumps(value)
#final_value=WebModel.escape_sql(final_value)
return final_value
def get_type_sql(self):
return 'JSON '+self.set_default
def show_formatted(self, value):
return ", ".join(value)
# You need check the values previously.
class JsonValueField(PhangoField):
"""Field for save json mixed values. You need check the values previously, the field only check values for prevent sql injections."""
def __init__(self, name, required=False):
super().__init__(name, required)
self.error_default='Sorry, the json dict is invalid'
self.default_value={}
#self.set_default='NOT NULL'
def get_type_sql(self):
return 'JSON'
def check(self, value):
try:
final_value=json.dumps(value)
except json.JSONDecodeError:
final_value='{}'
self.error=True
self.txt_error=self.error_default
return final_value

View file

@ -0,0 +1,27 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.corefields import CharField
from parameciofast.libraries.db import coreforms
from parameciofast.libraries.i18n import I18n
class LangField(CharField):
def __init__(self, name, size=255, required=False):
super(CharField, self).__init__(name, size, required)
select_lang={}
for lang in I18n.dict_i18n:
select_lang[lang]=lang
self.change_form(coreforms.SelectForm, [select_lang])
self.default_value=I18n.default_lang
def check(self, value):
if value not in I18n.dict_i18n:
value=I18n.default_lang
return value

View file

@ -0,0 +1,26 @@
from parameciofast.libraries.db.corefields import DecimalField
from decimal import Decimal, getcontext
from locale import format_string
getcontext().prec=2
class MoneyField(DecimalField):
def __init__(self, name, required=False):
super().__init__(name, 11, required)
def check(self, value):
value=Decimal(value)
return value
def show_formatted(self, value):
return format_string('%.2f', Decimal(value), grouping=True)
@staticmethod
def format_money(value):
return format_string('%.2f', Decimal(value), grouping=True)

View file

@ -0,0 +1,42 @@
#!/usr/bin/env python3
#from parameciofast.libraries.db.webmodel import PhangoField
from parameciofast.libraries.db.corefields import IntegerField
from parameciofast.libraries.db.coreforms import SelectModelForm
from parameciofast.libraries.httputils import GetPostFiles
class ParentField(IntegerField):
def __init__(self, name, size=11, required=False, field_name='name'):
super().__init__(name, size, required)
#self.foreignkey=True
self.indexed=True
self.field_name=field_name
def post_register(self):
if self.model!=None:
self.change_form(SelectModelForm, [self.model, self.field_name, self.model.name_field_id, self.name])
def check(self, value):
value=super().check(value)
if self.model!=None:
if self.model.updated==True:
if self.model.name_field_id in self.model.post:
GetPostFiles.obtain_get()
model_id=GetPostFiles.get.get(self.model.name_field_id, '0')
if model_id==value:
self.error=True
self.txt_error='A field cannot be its own father'
self.required=True
value=0
return value
return value

View file

@ -0,0 +1,157 @@
#!/usr/bin/python3
"""
Parameciofm is a series of wrappers for bottle.py, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.corefields import PhangoField
from parameciofast.libraries.db.coreforms import PasswordForm
from hmac import compare_digest as compare_hash
from hmac import compare_digest as compare_hash
#try:
# import crypt
#except:
# pass
#import bcrypt
from argon2 import PasswordHasher
class PasswordField(PhangoField):
"""Field for check and save passwords"""
def __init__(self, name, size=1024, required=False):
super(PasswordField, self).__init__(name, size, required)
self.protected=True
self.name_form=PasswordForm
self.default_value=''
self.encrypt_password=True
def check(self, value):
self.txt_error=''
self.error=False
value.strip()
if value=='':
if self.model!=None:
if self.model.updated==True:
self.required=False
self.check_blank=True
return ""
else:
self.txt_error=self.error_default
self.error=True
else:
self.txt_error=self.error_default
self.error=True
else:
#if crypt.METHOD_SHA512 in crypt.methods:
#salt=crypt.mksalt(crypt.METHOD_SHA512)
if self.encrypt_password:
#value=crypt.crypt(value)
ph=PasswordHasher()
final_value=ph.hash(value)
return final_value
"""
else:
self.txt_error="You need the SHA512 method"
self.error=True
return ""
"""
return value
@staticmethod
def verify( password, h):
"""Static method used for verify a password save using PasswordField"""
#return bcrypt_sha256.verify(password, h)
#return compare_hash(h, crypt.crypt(password, h))
ph=PasswordHasher()
try:
return ph.verify(h, password)
except:
return False
# Old function bcrypt
"""
try:
from passlib.hash import bcrypt
from passlib.hash import bcrypt_sha256
class PasswordField(PhangoField):
def __init__(self, name, size=1024, required=False):
super(PasswordField, self).__init__(name, size, required)
self.protected=True
self.name_form=PasswordForm
self.default_value=''
def check(self, value):
self.txt_error=''
self.error=False
value.strip()
if value=='':
if self.model!=None:
if self.model.updated==True:
self.required=False
self.check_blank=True
return ""
else:
self.txt_error="The field is empty"
self.error=True
else:
self.txt_error="The field is empty"
self.error=True
else:
#if crypt.METHOD_SHA512 in crypt.methods:
#value = bcrypt_sha256.encrypt(value)
value = bcrypt_sha256.hash(value)
return value
@staticmethod
def verify( password, h):
return bcrypt_sha256.verify(password, h)
except:
"""

View file

@ -0,0 +1,22 @@
from parameciofast.libraries.db.corefields import IntegerField
class PercentField(IntegerField):
def __init__(self, name, required=False):
super().__init__(name, 2, required)
def check(self, value):
try:
value=int(value)
if value<0:
value=0
if value>100:
value=100
except:
value=0
return value

View file

@ -0,0 +1,37 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.corefields import CharField
from parameciofast.libraries.slugify import slugify
from parameciofast.libraries.db.coreforms import HiddenForm
class SlugifyField(CharField):
def __init__(self, name, size=255, field_related=None, required=False):
super(SlugifyField, self).__init__(name, size, required)
self.name_form=HiddenForm
self.field_related=field_related
def check(self, value):
value=slugify(value)
if value=='':
if self.model!=None and self.field_related!=None:
self.model.post[self.field_related]=self.model.post.get(self.field_related, '')
value=slugify(self.model.post[self.field_related])
if value=='':
self.error=True
self.error_txt='Value is empty'
return ''
return value

View file

@ -0,0 +1,42 @@
from parameciofast.libraries.db.corefields import CharField
import re
check_url = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain...
r'localhost|' #localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
class UrlField(CharField):
def check(self, value):
self.error=False
self.txt_error=''
if not check_url.match(value):
self.error=True
value=""
self.txt_error='No valid URL format'
return value
check_domain=re.compile('^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|([a-zA-Z0-9][a-zA-Z0-9-_]{1,61}[a-zA-Z0-9]))\.([a-zA-Z]{2,6}|[a-zA-Z0-9-]{2,30}\.[a-zA-Z]{2,3})$')
class DomainField(CharField):
def check(self, value):
self.error=False
self.txt_error=''
if not check_domain.match(value):
self.error=True
value=""
self.txt_error='No valid domain format'
return value

View file

@ -0,0 +1,16 @@
from parameciofast.libraries.db.corefields import PhangoField
from parameciofast.libraries.db.coreforms import PasswordForm
from hmac import compare_digest as compare_hash
import crypt
import re
class UserNameField(PhangoField):
def check(self, value):
if not re.match("^[A-Za-z0-9_-]+$", value):
self.txt_error='Error: use only letters, numbers, underscores and dashes for this field'
self.error=1
value=''
return value

View file

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.coreforms import BaseForm
class CheckForm(BaseForm):
def __init__(self, name, value, real_value=1):
super(CheckForm, self).__init__(name, value)
self.real_value=real_value
def form(self):
arr_value={}
arr_value[self.setform(self.default_value)]=''
arr_value[self.setform(self.real_value)]='checked'
return '<input type="checkbox" class="'+self.css+'" name="'+self.name+'" id="'+self.name_field_id+'" value="'+str(self.real_value)+'" '+arr_value[self.setform(self.default_value)]+'>'

View file

@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
Parameciofm is a series of wrappers for bottlepy, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.mtemplates import standard_t
class ColorForm(BaseForm):
def __init__(self, name, value):
"""Form for get colors from a picker"""
super().__init__(name, value)
self.t=standard_t
def form(self):
return self.t.load_template('forms/colorform.phtml', name_form=self.name_field_id, default_value=self.default_value, form=self)

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.mtemplates import standard_t
from parameciofast.libraries.datetime import format_timedata
class DateForm(BaseForm):
def __init__(self, name, value):
super().__init__(name, value)
self.yes_time=False
self.t=standard_t
def form(self):
y=''
m=''
d=''
h=''
min=''
s=''
min_time=''
time=format_timedata(self.default_value)
if time[0]:
y=int(time[0])
m=int(time[1])
d=int(time[2])
h=int(time[3])
min_time=int(time[4])
s=int(time[5])
return self.t.load_template('forms/dateform.phtml', yes_time=self.yes_time, form=self.name, y=y, m=m, d=d, h=h, min=min_time, s=s)
#def

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.mtemplates import env_theme, PTemplate
env=env_theme(__file__)
t=PTemplate(env)
class FileForm(BaseForm):
def __init__(self, name, value, path):
super().__init__(name, value)
self.t=t
self.enctype=True
def form(self):
return self.t.load_template('forms/fileform.phtml', form=self)

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.mtemplates import standard_t
import json
class I18nForm(BaseForm):
def __init__(self, name, value, form):
super().__init__(name, value)
self.form_child=form
self.t=standard_t
def form(self):
lang_selected=I18n.get_default_lang()
try:
self.default_value=json.loads(self.default_value)
except:
self.default_value={}
if type(self.default_value).__name__!='dict':
self.default_value={}
for lang in I18n.dict_i18n:
self.default_value[lang]=self.default_value.get(lang, '')
return standard_t.load_template('forms/i18nform.phtml', name_form=self.name_field_id, real_name_form=self.name, form=self.form_child, arr_i18n=I18n.dict_i18n, lang_selected=lang_selected, default_value=self.default_value)

View file

@ -0,0 +1,22 @@
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.mtemplates import env_theme, PTemplate
env=env_theme(__file__)
t=PTemplate(env)
class TextHTMLForm(BaseForm):
def __init__(self, name, value, t_add=None):
super().__init__(name, value)
self.t=t_add
if t_add==None:
self.t=t
def form(self):
return self.t.load_template('forms/texthtmlform.phtml', form=self)

View file

@ -0,0 +1,22 @@
from parameciofast.libraries.db.coreforms import BaseForm
from parameciofast.libraries.mtemplates import env_theme, PTemplate
env=env_theme(__file__)
t=PTemplate(env)
class TextHTMLForm(BaseForm):
def __init__(self, name, value, t_add=None):
super().__init__(name, value)
self.t=t_add
if t_add==None:
self.t=t
def form(self):
return self.t.load_template('forms/texthtmlform.phtml', form=self)

View file

@ -0,0 +1,145 @@
#!/usr/bin/env python3
from parameciofast.libraries.db import corefields
from parameciofast.libraries.db.coreforms import PasswordForm
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.sessions import get_session
from parameciofast.libraries.keyutils import create_key_encrypt
from bottle import request
# Need unittest
def pass_values_to_form(post, arr_form, yes_error=True, pass_values=True):
if pass_values:
def get_value(key):
return post[key]
else:
def get_value(key):
return arr_form[key].default_value
for key, value in arr_form.items():
post[key]=post.get(key, '')
#if arr_form[key].default_value=='':
arr_form[key].default_value=get_value(key)
if arr_form[key].field==None:
arr_form[key].field=corefields.CharField(key, 255, required=False)
# Recheck value if no set error field
if arr_form[key].field.error == None:
arr_form[key].field.check(post[key])
#arr_form[key].txt_error=""
if arr_form[key].required==True and arr_form[key].field.error==True and yes_error==True:
arr_form[key].txt_error=arr_form[key].field.txt_error
# Reset error on field.
arr_form[key].field.error=None
return arr_form
class CheckForm():
def __init__(self):
self.error=0
def check(self, post, arr_form):
for k in arr_form.keys():
post[k]=post.get(k, '')
if arr_form[k].field==None:
arr_form[k].field=corefields.CharField(k, 255, required=False)
post[k]=arr_form[k].field.check(post[k])
arr_form[k].txt_error=arr_form[k].field.txt_error
if arr_form[k].field.error==True and arr_form[k].required==True:
self.error+=1
return post, arr_form
def check_form(post, arr_form, sufix_form='_error'):
error=0
error_form={}
for k in arr_form.keys():
post[k]=post.get(k, '')
if arr_form[k].field==None:
arr_form[k].field=corefields.CharField(k, 255, required=False)
post[k]=arr_form[k].field.check(post[k])
arr_form[k].txt_error=arr_form[k].field.txt_error
if arr_form[k].field.error==True and arr_form[k].required==True:
error_form['#'+k+sufix_form]=arr_form[k].txt_error
error+=1
return error, error_form, post, arr_form
def show_form(post, arr_form, t, yes_error=True, pass_values=True, modelform_tpl='forms/modelform.phtml'):
# Create csrf_token in session
generate_csrf()
if pass_values==True:
pass_values_to_form(post, arr_form, yes_error, pass_values)
return t.load_template(modelform_tpl, forms=arr_form)
#Simple Function for add repeat_password form to user model
def set_extra_forms_user(user_admin):
user_admin.fields['password'].required=True
user_admin.fields['email'].required=True
user_admin.create_forms(['username', 'email', 'password'])
user_admin.forms['repeat_password']=PasswordForm('repeat_password', '')
user_admin.forms['repeat_password'].required=True
user_admin.forms['repeat_password'].label=I18n.lang('common', 'repeat_password', 'Repeat Password')
#Function for initial values for necessary fields.
def ini_fields(fields):
pass
def csrf_token(token_id='csrf_token'):
s=get_session()
if not 'csrf_token' in s:
s['csrf_token']=create_key_encrypt()
s.save()
return '<input type="hidden" name="csrf_token" class="csrf_token" id="'+token_id+'" value="'+s['csrf_token']+'" />'
def generate_csrf():
s=get_session()
if not 'csrf_token' in s:
s['csrf_token']=create_key_encrypt()
s.save()
return s['csrf_token']
def request_type():
return request.environ['REQUEST_METHOD']

View file

@ -0,0 +1,328 @@
#!/usr/bin/env python3
from collections import OrderedDict
import sys
def query(model, str_query, args=[], connection_id='default'):
model.connect_to_db()
return model.sqlclass.query(str_query, args, connection_id)
# Insert method, for insert a row in database.using a dictionary
# External agent define if the update is in code or from external source, how a form.
def insert(model, dict_values, external_agent=True):
model.clean_fields()
# Connect to db
model.post=dict_values
#model.connect_to_db()
query_error=False
last_sql=''
#model.fields[model.name_field_id].required=False
if model.name_field_id in dict_values:
del dict_values[model.name_field_id]
try:
fields, values, update_values=model.check_all_fields(dict_values, external_agent)
except:
query_error=(model.sqlclass.error_connection+' '+sys.exc_info()[0], '')
#cursor.close()
return (query_error, False)
c=len(values)
arr_str=['%s' for x in range(c)]
sql="insert into `"+model.name+"` (`"+"`, `".join(fields)+"`) VALUES ("+", ".join(arr_str)+")"
last_sql=sql
cursor=model.query(sql, values, model.connection_id)
if cursor.rowcount>0:
model.last_id=cursor.lastrowid
cursor.close()
# Delete cache for this table.
return (False, True)
else:
query_error=('Cannot insert the new row', last_sql)
return (query_error, False)
class QueryBuilderException(Exception):
pass
def select(model, conditions=['', []], arr_select=[], raw_query=False):
model.clean_fields()
final_fields=[]
extra_fields=[]
#model.query_error=''
query_error=False
last_query=''
#First table selecction
tables_to_select=['`'+model.name+'`']
keys=list(model.fields.keys())
if len(arr_select)==0:
arr_select=keys
# Array intersect for obtain the valid fields
fields = list(set(keys) & set(arr_select))
#Creating the fields
arr_repeat_field={}
new_fields=OrderedDict()
for field in fields:
#Check if foreignkeyfield
if type(model.fields[field]).__name__=="ForeignKeyField" and raw_query==False:
if model.fields[field].table_name in arr_repeat_field:
arr_repeat_field[model.fields[field].table_name]+=1
else:
arr_repeat_field[model.fields[field].table_name]=0
table_name=model.fields[field].table_name+'` as `'+model.fields[field].table_name+str(arr_repeat_field[model.fields[field].table_name])
final_table_name=model.fields[field].table_name+str(arr_repeat_field[model.fields[field].table_name])
# The name with its alias of this related table model
tables_to_select.append('`'+table_name+'`')
# Add field from related table
# as "+table_name+"_"+model.fields[field].named_field
extra_fields.append("`"+final_table_name+"`.`"+model.fields[field].named_field+"` as "+field)
# Add a condition to sql query for join the two tables.
conditions[0]+=" AND `"+final_table_name+"`.`"+model.fields[field].identifier_field+"`=`"+model.name+"`.`"+field+"`"
# Add extra fields from related table from select_fields ForeignKeyField class member
select_fields=model.fields[field].select_fields
for extra_field in select_fields:
model.fields[field+'_'+extra_field]=model.fields[field].related_model.fields[extra_field]
model.fields_to_clean.append(field+'_'+extra_field)
# Check if extra_field is ForeignKeyField, if yes, call this function recursively.
extra_fields.append("`"+final_table_name+"`.`"+extra_field+"` as `"+field+"_"+extra_field+"`")
else:
# Add normal field to sql query
final_fields.append("`"+model.name+"`.`"+field+"`")
#if len(new_fields)>0:
#model.fields.update(new_fields)
extra_sql_field=""
if len(extra_fields)>0:
extra_sql_field=", "+", ".join(extra_fields)
if len(final_fields)==0:
query_error=("Error: without fields to search", '')
#return (query_error, False)
raise QueryBuilderException("Error: without fields to search")
sql= ("select "+" "+model.distinct+", ".join(final_fields)+extra_sql_field+" from "+", ".join(tables_to_select)+' '+conditions[0]).strip()
last_query=sql
cursor=model.query(sql, conditions[1], model.connection_id)
if cursor==False:
#query_error=(model.sqlclass.error_connection, last_query)
#cursor.close()
#return (query_error, False)
raise QueryBuilderException(model.sqlclass.error_connection+last_query)
else:
return cursor
def select_to_array(model, conditions=['', []], fields_selected=[], raw_query=0):
if len(fields_selected)==0:
fields_selected=model.fields.keys()
if (model.name_field_id not in fields_selected):
fields_selected.append(model.name_field_id)
def del_row_id(row):
try:
index_id=row.index(model.name_field_id)
del row[index_id]
except:
pass
else:
def del_row_id(row):
pass
results=[] #OrderedDict()
with select(model, conditions, fields_selected, raw_query) as cursor:
for row in cursor:
if model.show_formatted and row:
for k, col in row.items():
if model.fields[k].show_formatted_value:
row[k]=self.fields[k].show_formatted(col)
results.append(row)
del_row_id(results)
return results
def select_to_dict(model, conditions=['', []], fields_selected=[], raw_query=0, integer_dict=False):
if not integer_dict:
def conv_int(i):
return str(i)
else:
def conv_int(i):
return i
if len(fields_selected)==0:
fields_selected=model.fields.keys()
if (model.name_field_id not in fields_selected):
fields_selected.append(model.name_field_id)
def del_row_id(row):
try:
index_id=row.index(model.name_field_id)
del row[index_id]
except:
pass
else:
def del_row_id(row):
pass
results=OrderedDict()
with select(model, conditions, fields_selected, raw_query) as cursor:
for row in cursor:
if model.show_formatted and row:
for k, col in row.items():
row[k]=model.fields[k].show_formatted(col)
results[conv_int(row[model.name_field_id])]=row
del_row_id(results)
return results
def select_a_row_where(model, conditions=['', []], fields_selected=[], raw_query=0, begin=0):
limit="limit "+str(begin)+", 1"
with select(model, conditions, fields_selected, raw_query) as cursor:
row=cursor.fetchone()
if row==None:
row=False
else:
if model.show_formatted:
for k, col in row.items():
row[k]=model.fields[k].show_formatted(col)
return row
def select_a_row(model, id, fields_selected=[], raw_query=0):
conditions=['WHERE `'+model.name+'`.`'+model.name_field_id+'`=%s', [id]]
with select(model, conditions, fields_selected, raw_query) as cursor:
row=cursor.fetchone()
if row==None:
row=False
else:
if model.show_formatted:
for k, col in row.items():
row[k]=model.fields[k].show_formatted(col)
return row
# A method por count num rows affected for sql conditions
def select_count(model, conditions=['', []], field_to_count='id', raw_query=True):
print(model.dummy)
#First table selecction
tables_to_select=['`'+model.name+'`']
fields=list(model.fields.keys())
#Creating the fields
for field in fields:
#Check if foreignkeyfield
if type(model.fields[field]).__name__=="ForeignKeyField" and raw_query==False:
table_name=model.fields[field].table_name
tables_to_select.append('`'+table_name+'`')
# Add a condition to sql query for join the two tables.
conditions[0]+=" AND `"+table_name+"`.`"+model.fields[field].identifier_field+"`=`"+model.name+"`.`"+field+"`"
sql= "select count(`"+field_to_count+"`) from "+", ".join(tables_to_select)+' '+conditions[0]
count=0
with model.query(sql, conditions[1], model.connection_id) as cursor:
count=list(cursor.fetchone().values())[0]
if model.yes_reset_conditions:
model.reset_conditions()
return count

View file

@ -0,0 +1,153 @@
#!/usr/bin/env python3
from parameciofast.libraries.db.webmodel import WebModel
from parameciofast.libraries.db.coreforms import PasswordForm
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.httputils import GetPostFiles
class UserModel(WebModel):
def __init__(self, name_field_id="id"):
super().__init__(name_field_id)
self.password_field='password'
self.email_field='email'
self.username_field='username'
self.yes_repeat_password=True
self.check_user=True
self.check_email=True
def create_forms(self, arr_fields=[]):
# Add password_repeat to forms from the model
arr_fields=super().create_forms(arr_fields)
if self.password_field in arr_fields and self.yes_repeat_password:
repeat_password=PasswordForm('repeat_password', '')
repeat_password.required=1
repeat_password.label=I18n.lang('common', 'repeat_password', 'Repeat Password')
repeat_password.field=self.fields['password']
self.create_form_after(self.password_field, repeat_password)
return arr_fields
"""
def insert(self, dict_values, external_agent=True):
if 'password' in dict_values:
dict_values['repeat_password']=dict_values.get('repeat_password', '')
if dict_values['repeat_password']!=dict_values[self.password_field]:
self.fields[self.password_field].error=True
self.fields['password'].txt_error=I18n.lang('common', 'error_passwords_no_match', 'Error: passwords doesn\'t match')
return super().insert(dict_values, external_agent)
"""
def check_all_fields(self, dict_values, external_agent, yes_update=False, errors_set="insert"):
error=0
try:
fields, values, update_values=super().check_all_fields(dict_values, external_agent, yes_update, errors_set)
except:
error+=1
if self.check_user==True:
# Check if passwords matches
if self.password_field in dict_values:
dict_values['repeat_password']=dict_values.get('repeat_password', '')
if dict_values[self.password_field].strip()!="":
if dict_values['repeat_password']!=dict_values[self.password_field]:
self.fields[self.password_field].error=True
self.fields[self.password_field].txt_error=I18n.lang('common', 'error_passwords_no_match', 'Error: passwords doesn\'t match')
error+=1
# Check if exists user with same email or password
get_id=0
if self.updated:
# Need the id
#GetPostFiles.obtain_get()
#GetPostFiles.obtain_post()
getpostfiles=GetPostFiles()
getpostfiles.obtain_get()
get_id=getpostfiles.get.get(self.name_field_id, '0')
post_id=getpostfiles.post.get(self.name_field_id, '0')
if get_id!='0':
get_id=int(get_id)
if post_id!='0':
get_id=int(post_id)
pass
get_id=int(get_id)
sql_id=''
original_conditions=self.conditions
self.reset_conditions()
if self.username_field in dict_values:
self.conditions=['WHERE username=%s AND '+self.name_field_id+'!=%s', [dict_values[self.username_field], get_id]]
if self.select_count()>0:
self.fields[self.username_field].error=True
self.fields[self.username_field].txt_error=I18n.lang('common', 'error_username_exists', 'Error: username already exists')
self.fields_errors[self.username_field].append(self.fields[self.username_field].txt_error)
error+=1
if self.check_email:
if self.email_field in dict_values:
self.conditions=['WHERE email=%s AND '+self.name_field_id+'!=%s', [dict_values[self.email_field], get_id]]
if self.select_count()>0:
self.fields[self.email_field].error=True
self.fields[self.email_field].txt_error=I18n.lang('common', 'error_email_exists', 'Error: this email is already being used')
self.fields_errors[self.email_field].append(self.fields[self.email_field].txt_error)
error+=1
self.conditions=original_conditions
if error>0:
self.query_error+='Error:if is not expected, please, check that you disabled the special checkings of this model'
return False
return fields, values, update_values

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel
class ResponseData(BaseModel):
error: bool
message: str
code_error: int | None = None

View file

@ -221,8 +221,8 @@ class I18n:
return json.dumps(arr_final)
@staticmethod
def session_lang(request):
def session_lang(session):
return request.session.get('lang', I18n.get_default_lang())
return session.get('lang', I18n.get_default_lang())
common_pgettext=PGetText(__file__)

View file

@ -2,7 +2,7 @@ from fastapi import FastAPI, Cookie, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from typing import Annotated
from settings import config
from parameciofast.fast import app
#from parameciofast.fast import app
from parameciofast.libraries.session import ParamecioSession
from starlette.middleware.sessions import SessionMiddleware

View file

@ -1,20 +1,23 @@
from fastapi import FastAPI, Cookie, Request
from fastapi import FastAPI, Cookie, Request, Response
from fastapi.responses import HTMLResponse, RedirectResponse
from parameciofast.modules.fastadmin import admin_app
from typing import Annotated
from parameciofast.fast import app
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.mtemplates import env_theme, PTemplate
from pydantic import BaseModel, Field
from parameciofast.modules.fastadmin.models.admin import UserAdmin
from parameciofast.libraries.db.webmodel import WebModel
from parameciofast.libraries.fastutils import ResponseData
env=env_theme(__file__)
t=PTemplate(env, app.url_path_for)
useradmin=UserAdmin()
@admin_app.get('/', response_class=HTMLResponse)
def home_admin(request: Request, paramecio_session: Annotated[str | None, Cookie(description='Cookie for validate into the admin site. The cookie name can change in you settings/config.py')] = None):
"""
if not paramecio_session:
return RedirectResponse(app.url_path_for('login_admin'))
"""
if not request.session.get('login_admin', None):
return RedirectResponse(app.url_path_for('login_admin'))
@ -24,8 +27,75 @@ def home_admin(request: Request, paramecio_session: Annotated[str | None, Cookie
@admin_app.get('/login', response_class=HTMLResponse)
def login_admin(request: Request):
#session=request.session
db=WebModel.connection()
i18n=I18n('admin', I18n.session_lang(request))
with db.query('select count(id) as num_users from useradmin', []) as cursor:
num_users=cursor.fetchone()['num_users']
if not num_users:
return RedirectResponse(app.url_path_for('signup_admin'))
db.close()
i18n=I18n('admin', I18n.session_lang(request.session))
return t.load_template('login.phtml', title=i18n.tlang('Login'), tlang=i18n.tlang, url_for=app.url_path_for)
@admin_app.get('/signup', response_class=HTMLResponse)
def signup_admin(request: Request):
db=WebModel.connection()
with db.query('select count(id) as num_users from useradmin', []) as cursor:
num_users=cursor.fetchone()['num_users']
if num_users>0:
return RedirectResponse(app.url_path_for('signup_admin'))
db.close()
i18n=I18n('admin', I18n.session_lang(request.session))
return t.load_template('signup.phtml', title=i18n.tlang('Signup'), tlang=i18n.tlang, url_for=app.url_path_for)
class UserAdmin(BaseModel):
username: str = Field(description="The username of user")
password: str = Field(description="The password of user")
remember_login: bool | None = None
@admin_app.post('/login')
def check_login_admin(user: UserAdmin, request: Request) -> ResponseData:
db=WebModel.connection()
i18n=I18n('admin', I18n.session_lang(request.session))
error=1
message=i18n.tlang('Invalid user and password')
if user.username!='' and user.password!='':
with db.query('select * from useradmin WHERE username=%s', [user.username]) as cursor:
result=cursor.fetchone()
if result:
if useradmin.fields['password'].verify(user.password, result['password']):
request.session['login_admin']=True
error=0
message=''
db.close()
return {'error': error, 'message': message}
@admin_app.get('/logout')
def logout_admin(request: Request) -> RedirectResponse:
if 'login_admin' in request.session:
del request.session['login_admin']
return RedirectResponse(app.url_path_for('login_admin'))

View file

@ -0,0 +1,100 @@
#!/usr/bin/env python3
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.db.webmodel import WebModel
#from parameciofast.libraries.db.usermodel import UserModel
from parameciofast.libraries.db import corefields
from parameciofast.libraries.db.extrafields.emailfield import EmailField
from parameciofast.libraries.db.extrafields.passwordfield import PasswordField
from parameciofast.libraries.db.extrafields.langfield import LangField
from parameciofast.libraries.db.extrafields.ipfield import IpField
from parameciofast.libraries.db.extrafields.datetimefield import DateTimeField
class PrivilegesField(corefields.IntegerField):
def show_formatted(self, value):
value=int(value)
if value==0:
return I18n.lang('admin', 'without_privileges', 'Without privileges')
elif value==1:
return I18n.lang('admin', 'selected_privileges', 'Selected privileges')
elif value==2:
return I18n.lang('admin', 'administrator', 'Administrator')
class UserAdmin(WebModel):
#def create_fields(self):
def __init__(self, connection=None):
super().__init__(connection)
# I can change other fields here, how the name.
self.register(corefields.CharField('username'))
self.fields['username'].required=True
self.register(PasswordField('password'))
self.fields['password'].required=True
self.register(EmailField('email'))
self.fields['email'].required=True
self.register(corefields.CharField('token_recovery'))
self.register(corefields.CharField('token_login'))
self.register(PasswordField('token_auth'))
self.register(PrivilegesField('privileges'))
self.register(LangField('lang', 20))
self.register(corefields.BooleanField('disabled'))
self.register(corefields.BooleanField('double_auth'))
self.register(corefields.BooleanField('dark_theme'))
#self.register(corefields.IntegerField('num_tries', 1))
self.register(DateTimeField('last_login'))
class LoginTries(WebModel):
#def create_fields(self):
def __init__(self, connection=None):
super().__init__(connection)
self.register(IpField('ip'))
self.register(corefields.IntegerField('num_tries', 1))
self.register(DateTimeField('last_login'))
"""
user_admin=WebModel('user_admin')
user_admin.register(corefields.CharField('username'))
user_admin.fields['username'].required=True
user_admin.register(corefields.CharField('password'))
user_admin.fields['password'].required=True
user_admin.register(EmailField('email'))
user_admin.fields['email'].required=True
user_admin.register(corefields.CharField('token_recovery'))
user_admin.register(corefields.BooleanField('privileges'))
#user_admin.register(corefields.CharField('prueba'))
"""

View file

@ -22,6 +22,7 @@
<div class="d-flex align-items-center" style="height:100vh;">
<div class="col-3"></div>
<div class="col">
<%block name="content">
<div class="card">
<div class="card-header bg-primary">
${tlang('Login')}
@ -43,9 +44,10 @@
<input type="checkbox" class="form-check-input" id="autologin" name="autologin">
<label class="form-check-label" for="autologin">${tlang('Autologin')}</label>
</div>
<button type="submit" class="btn btn-primary">Submit</button>
<button type="submit" id="login_submit" class="btn btn-primary">Submit</button>
</form>
</div>
</%block>
</div>
</div>
<div class="col-3"></div>
@ -56,6 +58,9 @@
<script src="https://code.jquery.com/jquery-3.7.1.min.js" integrity="sha256-/JqT3SQfawRcv/BIHPThkBvs0OEvtFFmqPF/lYI/Cxo=" crossorigin="anonymous"></script>
<script>
document.documentElement.setAttribute('data-bs-theme','dark');
</script>
<%block name="jscript">
<script>
/*setTimeout(function () {
@ -64,8 +69,79 @@
}, 1000);*/
$(document).ready( function () {
$('#login_form').submit( function () {
$('#loader-div').show();
$('#login_submit').prop('disabled', true);
data_form={'username': $('#username_form').val(), 'password': $('#password_form').val(), 'csrf_token': $("#csrf_token").val(), 'remember_login': 0};
$.ajax({
url: "${url_for('check_login_admin')}",
method: "POST",
dataType: "json",
contentType : 'application/json',
data: JSON.stringify(data_form)
}).done(function(data) {
if(data.error==0)
{
//location.reload()
location.href="${url_for('home_admin')}";
}
else
{
$('#login_submit').prop('disabled', false);
$('#loader-div').hide();
// Firefox have a horrible and stupid bug and you need attr for set de new csrf_token
/*$('#csrf_token').attr('value', data.csrf_token);
$('#loading').hide('slow');
if(data.hasOwnProperty('disable')) {
$('#username_error').html("${_('Error, your user is disabled, you need support of web administration')}");
} if(data.hasOwnProperty('you_cannot_login')) {
if(data.you_cannot_login) {
$('#username_error').html("${_('Error, excessive tries, wait some minutes for login again')}");
}
else {
$('#username_error').html("${_('Error, wrong username or password')}");
}
}
else {
$('#username_error').html("${_('Error, wrong username or password')}");
}*/
}
});
return false;
});
});
</script>
</%block>
</body>
</html>

View file

@ -0,0 +1,113 @@
<%inherit file="login.phtml"/>
<%block name="content">
<div class="card">
<div class="card-header bg-primary">
${tlang('Signup')}
</div>
<div class="card-body">
<form method="POST" action="${url_for('login_admin')}" id="login_form">
<div class="mb-3">
<label for="username_form" class="form-label">${tlang('Username')}*</label>
<input type="text" class="form-control form-control-lg" id="username_form" name="password" aria-describedby="username" autocomplete="off">
<div class="invalid-feedback">
${tlang('You need a valid username')}
</div>
</div>
<div class="mb-3">
<label for="email_form" class="form-label">${tlang('Email')}*</label>
<input type="email" class="form-control form-control-lg" id="email_form" name="email" autocomplete="off">
<div class="invalid-feedback">
${tlang('You need an email')}
</div>
</div>
<div class="mb-3">
<label for="password_form" class="form-label">Password*</label>
<input type="password" class="form-control form-control-lg" id="password_form" name="password" autocomplete="off">
<div class="invalid-feedback">
${tlang('You need a password')}
</div>
</div>
<div class="mb-3">
<label for="repeat_password_form" class="form-label">${tlang('Repeat password')}*</label>
<input type="repeat_password" class="form-control form-control-lg" id="repeat_password_form" name="repeat_password" autocomplete="off">
<div class="invalid-feedback">
${tlang('Password not equal')}
</div>
</div>
<button type="submit" id="login_submit" class="btn btn-primary">${tlang('Create user')}</button>
</form>
</div>
</%block>
<%block name="jscript">
$(document).ready( function () {
$('#login_form').submit( function () {
$('#loader-div').show();
$('#login_submit').prop('disabled', true);
data_form={'username': $('#username_form').val(), 'email': $('#email_form').val(), 'password': $('#password_form').val(), 'repeat_password': $('#repeat_password_form').val(), 'csrf_token': $("#csrf_token").val(), 'remember_login': 0};
$.ajax({
url: "${url_for('check_login_admin')}",
method: "POST",
dataType: "json",
contentType : 'application/json',
data: JSON.stringify(data_form)
}).done(function(data) {
if(data.error==0)
{
//location.reload()
location.href="${url_for('home_admin')}";
}
else
{
$('#login_submit').prop('disabled', false);
$('#loader-div').hide();
// Firefox have a horrible and stupid bug and you need attr for set de new csrf_token
/*$('#csrf_token').attr('value', data.csrf_token);
$('#loading').hide('slow');
if(data.hasOwnProperty('disable')) {
$('#username_error').html("${_('Error, your user is disabled, you need support of web administration')}");
} if(data.hasOwnProperty('you_cannot_login')) {
if(data.you_cannot_login) {
$('#username_error').html("${_('Error, excessive tries, wait some minutes for login again')}");
}
else {
$('#username_error').html("${_('Error, wrong username or password')}");
}
}
else {
$('#username_error').html("${_('Error, wrong username or password')}");
}*/
}
});
return false;
});
});
</%block>