241 lines
7.4 KiB
Python
241 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from collections import OrderedDict
|
|
from cuchulu.libraries.sessionplugin import get_session
|
|
from cuchulu.libraries.urls import make_url
|
|
from cuchulu.libraries.i18n import I18n
|
|
from cuchulu.libraries.httputils import GetPostFiles
|
|
from cuchulu.libraries.keyutils import create_key_encrypt, create_key_encrypt_256, create_key
|
|
from cuchulu.libraries.db.formsutils import generate_csrf
|
|
from bottle import response,request
|
|
from cuchulu.libraries.db.webmodel import WebModel
|
|
from time import time
|
|
|
|
try:
|
|
|
|
from settings import config
|
|
|
|
except:
|
|
|
|
class config:
|
|
admin_folder='admin'
|
|
try:
|
|
|
|
from settings import config_admin
|
|
|
|
except:
|
|
|
|
class config_admin:
|
|
modules_admin=[]
|
|
|
|
#Function for get an admin url
|
|
|
|
def make_admin_url(url, query_args={}):
|
|
|
|
"""Function for get an admin url
|
|
|
|
A special function based in make_url for get admin urls. You can use only the module admin part in the url and get a real url for use in your templates or other functions.
|
|
|
|
Args:
|
|
url (str): The url without admin part for use how base. Example: with 'pages' as url value you get http://localhost:8080/admin/pages
|
|
query_args (dict): A serie of dictionary values where you get a url query result as it: {'key1': 'value1', 'key2': 'value2'} -> key1=value1&key2=value2
|
|
|
|
Returns:
|
|
str: A new url valid for use in href links directing to admin site
|
|
|
|
"""
|
|
|
|
return make_url('%s/%s' % (config.admin_folder, url), query_args)
|
|
|
|
def get_language(s):
|
|
|
|
"""Function for get language from a session
|
|
|
|
With this function you gan get easily the language of session
|
|
|
|
Args:
|
|
s (session): A session object where the language value is stored
|
|
|
|
Returns:
|
|
str: The language string
|
|
|
|
"""
|
|
|
|
s['lang']=s.get('lang', None)
|
|
|
|
lang_selected=None
|
|
|
|
if s['lang']!=None:
|
|
lang_selected=s['lang']
|
|
else:
|
|
s['lang']=I18n.default_lang
|
|
s.save()
|
|
lang_selected=I18n.default_lang
|
|
|
|
return lang_selected
|
|
|
|
def get_menu(modules_admin):
|
|
|
|
"""Function for get a ordered dict with modules admin
|
|
|
|
With this method you get a menu ordered dict for use internally in admin module.
|
|
|
|
Args:
|
|
modules_admin (OrderedDict): The ordereddict used get it from admin configuration of Cuchulu system
|
|
|
|
Returns:
|
|
OrderedDict: A new dict prepared for use in admin module.
|
|
|
|
"""
|
|
|
|
menu=OrderedDict()
|
|
|
|
icon=OrderedDict()
|
|
|
|
for mod in modules_admin:
|
|
if type(mod[1]).__name__!='list':
|
|
menu[mod[2]]=mod
|
|
|
|
if len(menu[mod[2]])<4:
|
|
menu[mod[2]].append('<i class="fa fa-circle-o" aria-hidden="true"></i>')
|
|
else:
|
|
menu[mod[2]][3]='<i class="fa {}" aria-hidden="true"></i>'.format(menu[mod[2]][3])
|
|
else:
|
|
|
|
menu[mod[2]]=mod[0]
|
|
|
|
if len(mod)<4:
|
|
menu[mod[2]]='<i class="fa fa-arrow-down" aria-hidden="true"></i>'+menu[mod[2]]
|
|
|
|
for submod in mod[1]:
|
|
if submod[2] in menu:
|
|
print('WARNING: you would not set the admin url for '+submod[2]+' with same general name of module if is not stand alone admin file')
|
|
menu[submod[2]]=submod
|
|
|
|
|
|
if len(menu[submod[2]])<4:
|
|
menu[submod[2]].append('<i class="fa fa-circle-o" aria-hidden="true"></i>')
|
|
else:
|
|
menu[submod[2]][3]='<i class="fa {}" aria-hidden="true"></i>'.format(menu[submod[2]][3])
|
|
|
|
return menu
|
|
|
|
def check_login():
|
|
|
|
"""Function for check if correct login in admin module
|
|
|
|
With this function you can check if the online user is login or not
|
|
"""
|
|
|
|
s=get_session()
|
|
|
|
if 'login' in s:
|
|
|
|
if 'privileges' in s:
|
|
|
|
if s['privileges']==2:
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
def login_model(ModelLogin, session='', enable_tries=False):
|
|
|
|
""" Function for standard login
|
|
"""
|
|
|
|
connection=WebModel.connection()
|
|
|
|
user_admin=ModelLogin(connection)
|
|
|
|
getpostfiles=GetPostFiles()
|
|
|
|
getpostfiles.obtain_post()
|
|
|
|
getpostfiles.post['username']=getpostfiles.post.get('username', '')
|
|
getpostfiles.post['password']=getpostfiles.post.get('password', '')
|
|
|
|
username=user_admin.fields['username'].check(getpostfiles.post['username'])
|
|
|
|
password=getpostfiles.post['password'].strip()
|
|
|
|
user_admin.conditions=['WHERE username=%s', [username]]
|
|
|
|
arr_user=user_admin.select_a_row_where(['id', 'username', 'password', 'privileges', 'lang', 'num_tries', 'email', 'theme'])
|
|
|
|
if arr_user==False:
|
|
|
|
connection.close()
|
|
return {'error': 1, 'csrf_token': generate_csrf()}
|
|
else:
|
|
|
|
num_tries=int(arr_user['num_tries'])
|
|
|
|
if arr_user['num_tries']<3:
|
|
|
|
if user_admin.fields['password'].verify(password, arr_user['password']):
|
|
|
|
s=get_session()
|
|
|
|
s[session+'id']=arr_user['id']
|
|
s[session+'login']=1
|
|
s[session+'privileges']=arr_user['privileges']
|
|
s[session+'lang']=arr_user['lang']
|
|
s[session+'email']=arr_user['email']
|
|
s[session+'username']=arr_user['username']
|
|
s[session+'theme']=str(arr_user['theme'])
|
|
|
|
if s['lang']=='':
|
|
s['lang']=I18n.default_lang
|
|
|
|
remember_login=getpostfiles.post.get(session+'remember_login', '0')
|
|
|
|
if remember_login=='1':
|
|
|
|
timestamp=time()+315360000
|
|
|
|
random_text=create_key_encrypt()
|
|
|
|
#Update user with autologin token
|
|
|
|
user_admin.check_user=False
|
|
|
|
user_admin.conditions=['WHERE username=%s', [username]]
|
|
|
|
user_admin.valid_fields=['token_login']
|
|
|
|
user_admin.reset_require()
|
|
|
|
if user_admin.update({'token_login': random_text}):
|
|
|
|
response.set_cookie(session+'remember_login', random_text, path=config.session_opts['session.path'], expires=timestamp, secret=key_encrypt)
|
|
#else:
|
|
#print(user_admin.query_error)
|
|
s.save()
|
|
|
|
connection.close()
|
|
|
|
return {'error': 0}
|
|
else:
|
|
|
|
user_admin.check_user=False
|
|
|
|
user_admin.conditions=['WHERE username=%s', [username]]
|
|
|
|
user_admin.valid_fields=['num_tries']
|
|
|
|
user_admin.reset_require()
|
|
|
|
if enable_tries:
|
|
user_admin.update({'num_tries': arr_user['num_tries']+1})
|
|
|
|
connection.close()
|
|
|
|
return {'error': 1, 'csrf_token': generate_csrf()}
|
|
else:
|
|
|
|
connection.close()
|
|
|
|
return {'error': 1, 'csrf_token': generate_csrf()}
|
|
|
|
|