From ed23a190443965554b4e33859aeaba7b9885c0dd Mon Sep 17 00:00:00 2001 From: Antonio de la Rosa Date: Wed, 26 Feb 2025 01:11:03 +0100 Subject: [PATCH] Added session plugin using itsdangeroues --- .../libraries/db/extrafields/jsonfield.py | 115 ++++++++++++++++++ paramecio/libraries/sessionplugin.py | 90 ++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 paramecio/libraries/db/extrafields/jsonfield.py create mode 100644 paramecio/libraries/sessionplugin.py diff --git a/paramecio/libraries/db/extrafields/jsonfield.py b/paramecio/libraries/db/extrafields/jsonfield.py new file mode 100644 index 0000000..f9203a1 --- /dev/null +++ b/paramecio/libraries/db/extrafields/jsonfield.py @@ -0,0 +1,115 @@ +""" +Parameciofm is a series of wrappers for Bottle, mako and others and construct a simple headless cms. + +Copyright (C) 2024 Antonio de la Rosa Caballero + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +""" + +from paramecio.libraries.db.webmodel import WebModel, PhangoField +import sys +try: + + import ujson as json +except: + import json + +class JsonField(PhangoField): + """Field for save json datatype values""" + + def __init__(self, name, field_type, required=False): + """ + Args: + name (str): The name of field + field_type (PhangoField): The type of PhangoField for save in JsonField + required (bool): Boolean for define if field is required or not + """ + + super().__init__(name, required) + + self.field_type=field_type + + self.error_default='Sorry, the json dict is invalid' + + self.set_default='NOT NULL' + + self.type_sql='longtext' + + def check(self, value): + + if type(value).__name__=='str': + try: + + value=json.loads(value) + + except json.JSONDecodeError: + + value={} + self.error=True + self.txt_error=self.error_default + + elif type(value).__name__!='dict': + + value={} + self.error=True + self.txt_error=self.error_default + + for k,v in value.items(): + + value[k]=self.field_type.check(v) + + final_value=json.dumps(value) + + #final_value=WebModel.escape_sql(final_value) + + return final_value + + def get_type_sql(self): + + return 'JSON '+self.set_default + + def show_formatted(self, value): + + return ", ".join(value) + +# You need check the values previously. + +class JsonValueField(PhangoField): + """Field for save json mixed values. You need check the values previously, the field only check values for prevent sql injections.""" + + def __init__(self, name, required=False): + + super().__init__(name, required) + + self.error_default='Sorry, the json dict is invalid' + self.default_value={} + + #self.set_default='NOT NULL' + + def get_type_sql(self): + + return 'JSON' + + def check(self, value): + + try: + final_value=json.dumps(value) + + except json.JSONDecodeError: + + final_value='{}' + self.error=True + self.txt_error=self.error_default + + return final_value diff --git a/paramecio/libraries/sessionplugin.py b/paramecio/libraries/sessionplugin.py new file mode 100644 index 0000000..8fcc61c --- /dev/null +++ b/paramecio/libraries/sessionplugin.py @@ -0,0 +1,90 @@ +from bottle import request, response +from itsdangerous.url_safe import URLSafeTimedSerializer +from settings import config +import inspect + +class Session(dict): + + def __init__(self, *args, **kwargs): + + self.update(*args, **kwargs) + + self.changed=False + + def __setitem__(self, item, value): + + super(Session, self).__setitem__(item, value) + self.changed=True + +class SessionPlugin(object): + + name = 'session' + api = 2 + + def __init__(self, keyword='session'): + + self.keyword=keyword + + + def setup(self, app): + ''' Make sure that other installed plugins don't affect the same keyword argument.''' + for other in app.plugins: + if not isinstance(other, SessionPlugin): continue + if other.keyword == self.keyword: + raise PluginError("Found another login plugin with "\ + "conflicting settings (non-unique keyword).") + + def apply(self, callback, context): + + # Test if the original callback accepts a 'session' keyword. + # Ignore it if it does not need a login handle. + + conf = context.config.get('session') or {} + + keyword = conf.get('keyword', self.keyword) + + args = inspect.getfullargspec(context.callback)[0] + + if keyword not in args: + return callback + + def wrapper(*args, **kwargs): + + cookie=request.get_cookie(config.cookie_name) + + s=None + + if not cookie: + session=Session() + else: + + s=URLSafeTimedSerializer(config.key_encrypt) + + session=Session(s.loads(cookie)) + + if type(session).__name__!='Session': + session=Session() + + #except: + # session=Session() + + kwargs['session']=session + + rv=callback(*args, **kwargs) + + if session.changed: + #print('changed') + if not s: + s=URLSafeTimedSerializer(config.key_encrypt) + #print(session) + response.set_cookie(config.cookie_name, s.dumps(session)) + + return rv + + return wrapper + + + + + +