Added session plugin using itsdangeroues

This commit is contained in:
Antonio de la Rosa 2025-02-26 01:11:03 +01:00
parent e6f824717e
commit ed23a19044
2 changed files with 205 additions and 0 deletions

View file

@ -0,0 +1,115 @@
"""
Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms.
Copyright (C) 2024 Antonio de la Rosa Caballero
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from paramecio.libraries.db.webmodel import WebModel, PhangoField
import sys
try:
import ujson as json
except:
import json
class JsonField(PhangoField):
"""Field for save json datatype values"""
def __init__(self, name, field_type, required=False):
"""
Args:
name (str): The name of field
field_type (PhangoField): The type of PhangoField for save in JsonField
required (bool): Boolean for define if field is required or not
"""
super().__init__(name, required)
self.field_type=field_type
self.error_default='Sorry, the json dict is invalid'
self.set_default='NOT NULL'
self.type_sql='longtext'
def check(self, value):
if type(value).__name__=='str':
try:
value=json.loads(value)
except json.JSONDecodeError:
value={}
self.error=True
self.txt_error=self.error_default
elif type(value).__name__!='dict':
value={}
self.error=True
self.txt_error=self.error_default
for k,v in value.items():
value[k]=self.field_type.check(v)
final_value=json.dumps(value)
#final_value=WebModel.escape_sql(final_value)
return final_value
def get_type_sql(self):
return 'JSON '+self.set_default
def show_formatted(self, value):
return ", ".join(value)
# You need check the values previously.
class JsonValueField(PhangoField):
"""Field for save json mixed values. You need check the values previously, the field only check values for prevent sql injections."""
def __init__(self, name, required=False):
super().__init__(name, required)
self.error_default='Sorry, the json dict is invalid'
self.default_value={}
#self.set_default='NOT NULL'
def get_type_sql(self):
return 'JSON'
def check(self, value):
try:
final_value=json.dumps(value)
except json.JSONDecodeError:
final_value='{}'
self.error=True
self.txt_error=self.error_default
return final_value

View file

@ -0,0 +1,90 @@
from bottle import request, response
from itsdangerous.url_safe import URLSafeTimedSerializer
from settings import config
import inspect
class Session(dict):
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)
self.changed=False
def __setitem__(self, item, value):
super(Session, self).__setitem__(item, value)
self.changed=True
class SessionPlugin(object):
name = 'session'
api = 2
def __init__(self, keyword='session'):
self.keyword=keyword
def setup(self, app):
''' Make sure that other installed plugins don't affect the same keyword argument.'''
for other in app.plugins:
if not isinstance(other, SessionPlugin): continue
if other.keyword == self.keyword:
raise PluginError("Found another login plugin with "\
"conflicting settings (non-unique keyword).")
def apply(self, callback, context):
# Test if the original callback accepts a 'session' keyword.
# Ignore it if it does not need a login handle.
conf = context.config.get('session') or {}
keyword = conf.get('keyword', self.keyword)
args = inspect.getfullargspec(context.callback)[0]
if keyword not in args:
return callback
def wrapper(*args, **kwargs):
cookie=request.get_cookie(config.cookie_name)
s=None
if not cookie:
session=Session()
else:
s=URLSafeTimedSerializer(config.key_encrypt)
session=Session(s.loads(cookie))
if type(session).__name__!='Session':
session=Session()
#except:
# session=Session()
kwargs['session']=session
rv=callback(*args, **kwargs)
if session.changed:
#print('changed')
if not s:
s=URLSafeTimedSerializer(config.key_encrypt)
#print(session)
response.set_cookie(config.cookie_name, s.dumps(session))
return rv
return wrapper