509 lines
16 KiB
Python
509 lines
16 KiB
Python
from flask import Blueprint, redirect, session, url_for, request, g, make_response, abort
|
|
from settings import config
|
|
from paramecio2.libraries.i18n import I18n, PGetText
|
|
from paramecio2.libraries.datetime import now, format_local_strtime, timestamp_to_datetime, obtain_timestamp
|
|
from paramecio2.libraries.formsutils import show_form, generate_csrf, set_extra_forms_user, pass_values_to_form
|
|
from paramecio2.libraries.db.webmodel import WebModel
|
|
from paramecio2.modules.admin.models.admin import UserAdmin, LoginTries
|
|
from paramecio2.libraries.keyutils import create_key_encrypt, create_key
|
|
from time import time
|
|
import os, sys
|
|
from importlib import import_module
|
|
from paramecio2.libraries.config_admin import config_admin
|
|
import copy
|
|
from os import path
|
|
from paramecio2.modules.admin import admin_app, t
|
|
from paramecio2.libraries.sendmail import SendMail
|
|
from paramecio2.libraries.formsutils import check_csrf
|
|
from hmac import compare_digest as compare_hash
|
|
from paramecio2.modules.admin.libraries.admin_auth import admin_prepare, admin_finished, modules_access
|
|
|
|
try:
|
|
import crypt
|
|
crypt_pass=True
|
|
except:
|
|
crypt_pass=False
|
|
#import gettext
|
|
|
|
#_=pgettext(__file__)
|
|
|
|
login_tries=5
|
|
|
|
if hasattr(config, 'login_tries'):
|
|
login_tries=config.login_tries
|
|
|
|
gtext=PGetText(__file__)
|
|
|
|
_=gtext.gettext
|
|
|
|
yes_recovery_login=False
|
|
email_address='localhost'
|
|
|
|
if hasattr(config, 'yes_recovery_login'):
|
|
yes_recovery_login=config.yes_recovery_login
|
|
|
|
if hasattr(config, 'email_address'):
|
|
email_address=config.email_address
|
|
|
|
|
|
#admin_app=Blueprint('admin_app', __name__, static_folder='static')
|
|
|
|
#@admin_app.before_request
|
|
"""
|
|
def admin_prepare():
|
|
|
|
g.connection=WebModel.connection()
|
|
|
|
if request.endpoint!='admin_app.login' and request.endpoint!='admin_app.signup' and request.endpoint!='admin_app.need_auth' and request.endpoint!='admin_app.auth_check':
|
|
|
|
if 'login_admin' not in session:
|
|
|
|
if 'remember_login_admin' in request.cookies:
|
|
|
|
with g.connection.query('select count(id) as count_id from useradmin where token_login=%s', [request.cookies['remember_login_admin']]) as cursor:
|
|
|
|
arr_count=cursor.fetchone()
|
|
|
|
if arr_count['count_id']==0:
|
|
|
|
url_redirect=config.domain_url+url_for('admin_app.login', _external=False)
|
|
|
|
return redirect(url_redirect)
|
|
else:
|
|
|
|
session['login_admin']=True
|
|
|
|
else:
|
|
|
|
url_redirect=config.domain_url+url_for('admin_app.login', _external=False)
|
|
|
|
return redirect(url_redirect)
|
|
else:
|
|
|
|
#print(session['verify_auth'])
|
|
if request.endpoint!='admin_app.logout':
|
|
|
|
if not session.get('verify_auth', True):
|
|
|
|
url_redirect=config.domain_url+url_for('admin_app.need_auth', _external=False)
|
|
|
|
return redirect(url_redirect)
|
|
|
|
"""
|
|
#home=welcome_app.route("/")(home)
|
|
admin_prepare=admin_app.before_request(admin_prepare)
|
|
|
|
#@admin_app.after_request
|
|
"""
|
|
def admin_finished(response):
|
|
|
|
#print('pepe')
|
|
|
|
g.connection.close()
|
|
|
|
return response
|
|
"""
|
|
|
|
admin_finished=admin_app.after_request(admin_finished)
|
|
|
|
# Load modules from admin
|
|
|
|
for app in config.apps:
|
|
module_app=config.apps[app][0]
|
|
|
|
module_path=os.path.dirname(sys.modules[module_app].__file__)
|
|
|
|
if os.path.isfile(module_path+'/settings/config_admin.py'):
|
|
|
|
config_path=module_app+'.settings.config_admin'
|
|
|
|
a=import_module(config_path)
|
|
|
|
|
|
arr_modules_admin={}
|
|
|
|
for app_load in config_admin:
|
|
|
|
#print(app)
|
|
|
|
if len(app_load)==3:
|
|
|
|
arr_modules_admin[app_load[2]+'/']=import_module(app_load[1])
|
|
#arr_modules_admin[app_load[2]+'/'].admin=admin_app.route(arr_modules_admin[app_load[2]+'/'])(arr_modules_admin[app_load[2]+'/'].admin)
|
|
#print(app_load[1])
|
|
elif len(app_load)==4:
|
|
|
|
arr_modules_admin[app_load[2]+'/'+app_load[3]]=import_module(app_load[1])
|
|
|
|
#print(app_load[1])
|
|
|
|
@admin_app.route('/admin/')
|
|
def admin():
|
|
|
|
return t.load_template('home.phtml', title=_('Admin'))
|
|
|
|
"""
|
|
@admin_app.route('/admin/')
|
|
@admin_app.route('/admin/<module>', methods=['GET', 'POST'])
|
|
@admin_app.route('/admin/<module>/<submodule>', methods=['GET', 'POST'])
|
|
def admin(module='', submodule=''):
|
|
|
|
if module=='':
|
|
|
|
return t.load_template('home.phtml', title=I18n.lang('admin', 'paramecio_admin', 'Paramecio admin'))
|
|
|
|
else:
|
|
|
|
path_module=module+'/'+submodule
|
|
|
|
if path_module in arr_modules_admin:
|
|
|
|
t_mod=copy.copy(t)
|
|
|
|
templates_path=path.dirname(arr_modules_admin[module+'/'+submodule].__file__).replace('/admin', '')+'/templates'
|
|
|
|
try:
|
|
index_value = t_mod.env.directories.index(templates_path)
|
|
except ValueError:
|
|
t_mod.env.directories.insert(0, templates_path)
|
|
|
|
content=arr_modules_admin[path_module].admin(t=t)
|
|
|
|
if type(content).__name__=='str':
|
|
|
|
return t.load_template('content.phtml', title=I18n.lang('admin', 'paramecio_admin', 'Paramecio admin'), contents=content, path_module=path_module)
|
|
|
|
else:
|
|
|
|
return content
|
|
|
|
else:
|
|
abort(404)
|
|
|
|
"""
|
|
|
|
@admin_app.route('/admin/logout/')
|
|
def logout():
|
|
|
|
resp=make_response(redirect(url_for('admin_app.login')))
|
|
|
|
if 'login_admin' in session:
|
|
del session['login_admin']
|
|
|
|
if 'verify_auth' in session:
|
|
del session['verify_auth']
|
|
|
|
if 'remember_login_admin' in request.cookies:
|
|
resp.set_cookie('remember_login_admin', value='', max_age=0, expires=0, path=config.application_root)
|
|
|
|
return resp
|
|
|
|
@admin_app.route('/admin/login/', methods=['GET', 'POST'])
|
|
def login():
|
|
|
|
#connection=WebModel.connection()
|
|
new_crypt=False
|
|
|
|
user_admin=UserAdmin(g.connection)
|
|
|
|
user_admin.yes_repeat_password=False
|
|
|
|
user_admin.fields['password'].required=True
|
|
|
|
user_admin.create_forms(['username', 'password'])
|
|
|
|
c=user_admin.select_count()
|
|
|
|
if c==0:
|
|
return redirect(url_for('admin_app.signup'))
|
|
|
|
post={}
|
|
|
|
if request.method=='POST':
|
|
|
|
check_csrf()
|
|
|
|
username=request.form['username']
|
|
|
|
password=request.form['password']
|
|
|
|
arr_user=user_admin.set_conditions('WHERE username=%s', [username]).select_a_row_where()
|
|
|
|
if arr_user and not check_login_tries():
|
|
|
|
# Layer compatibility with old crypt password
|
|
|
|
check_pass=user_admin.fields['password'].verify(password, arr_user['password'])
|
|
|
|
if not check_pass:
|
|
#check_pass=password_ok(password, arr_user['password'])
|
|
try:
|
|
check_pass=compare_hash(arr_user['password'], crypt.crypt(password, arr_user['password']))
|
|
new_crypt=True
|
|
except:
|
|
print('Warning: python developers deleting unix crypt module support, you cannot use sha512 passwords.')
|
|
check_pass=False
|
|
pass
|
|
|
|
if check_pass:
|
|
|
|
if not arr_user['disabled']:
|
|
|
|
session['login_admin']=True
|
|
|
|
session['user_id']=arr_user['id']
|
|
|
|
resp = make_response({'error': 0})
|
|
|
|
arr_update={}
|
|
|
|
user_admin.safe_query()
|
|
|
|
user_admin.check_user=False
|
|
|
|
if 'remember_login' in request.form:
|
|
|
|
remember_key=create_key_encrypt()
|
|
|
|
#user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update({'token_login': remember_key})
|
|
arr_update['token_login']=remember_key
|
|
|
|
timestamp=int(time())+315360000
|
|
|
|
resp.set_cookie('remember_login_admin', value=remember_key, max_age=315360000, expires=timestamp, path=config.application_root)
|
|
|
|
if arr_user['double_auth']:
|
|
|
|
token_auth=create_key(8)
|
|
session['verify_auth']=False
|
|
|
|
#user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update({'token_auth': token_auth})
|
|
|
|
user_admin.fields['token_auth'].protected=False
|
|
|
|
arr_update['token_auth']=token_auth
|
|
|
|
# Send email
|
|
|
|
sendmail=SendMail(ssl=True)
|
|
|
|
# def send(self, from_address, to_address: list, subject, message, content_type='plain', attachments=[]):
|
|
|
|
sendmail.send(config.portal_email, [arr_user['email']], _('Code for complete login'), _('We send to you a code for activate your account using double authentication:')+"\n"+token_auth, content_type='plain', attachments=[])
|
|
|
|
if arr_user['dark_theme']:
|
|
session['theme']='1'
|
|
else:
|
|
session['theme']='0'
|
|
|
|
arr_update['last_login']=now()
|
|
|
|
session['lang']=arr_user.get('lang', I18n.default_lang)
|
|
|
|
if len(arr_update)>0:
|
|
|
|
if new_crypt:
|
|
print('Changing password for %s to argon2' % arr_user['username'])
|
|
|
|
user_admin.fields['password'].protected=False
|
|
|
|
arr_update['password']=password
|
|
|
|
user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update(arr_update)
|
|
|
|
return resp
|
|
|
|
else:
|
|
|
|
resp = make_response({'error': 1, 'disable': 1})
|
|
|
|
return resp
|
|
|
|
|
|
else:
|
|
|
|
you_cannot_login=check_login_tries()
|
|
|
|
return {'error': 1, 'you_cannot_login': you_cannot_login}
|
|
else:
|
|
|
|
you_cannot_login=check_login_tries()
|
|
|
|
return {'error': 1, 'you_cannot_login': you_cannot_login}
|
|
|
|
#if
|
|
|
|
else:
|
|
|
|
forms=show_form(post, user_admin.forms, t, yes_error=False)
|
|
|
|
return t.load_template('login.phtml', forms=forms, yes_recovery_login=yes_recovery_login)
|
|
|
|
@admin_app.route('/admin/signup/', methods=['GET', 'POST'])
|
|
def signup():
|
|
|
|
user_admin=UserAdmin(g.connection)
|
|
|
|
c=user_admin.select_count()
|
|
|
|
if c==0:
|
|
|
|
if request.method=='POST':
|
|
|
|
check_csrf()
|
|
|
|
user_admin.conditions=['WHERE privileges=%s', [2]]
|
|
|
|
forms=dict(request.form)
|
|
|
|
forms['privileges']=2
|
|
|
|
forms['last_login']=now()
|
|
|
|
user_admin.valid_fields=['username', 'email', 'password', 'privileges', 'last_login']
|
|
|
|
user_admin.create_forms()
|
|
|
|
if user_admin.insert(forms, False):
|
|
|
|
error= {'error': 0}
|
|
|
|
return error
|
|
|
|
else:
|
|
|
|
user_admin.check_all_fields(forms, False)
|
|
|
|
pass_values_to_form(forms, user_admin.forms, yes_error=True)
|
|
|
|
session['csrf_token']=create_key_encrypt()
|
|
|
|
error={'error': 1, 'csrf_token': session['csrf_token']}
|
|
|
|
for field in user_admin.valid_fields:
|
|
|
|
error[field]=user_admin.forms[field].txt_error
|
|
|
|
error['repeat_password']=user_admin.forms['repeat_password'].txt_error
|
|
|
|
return error
|
|
|
|
else:
|
|
|
|
post={}
|
|
|
|
set_extra_forms_user(user_admin)
|
|
|
|
forms=show_form(post, user_admin.forms, t, yes_error=False)
|
|
|
|
return t.load_template('register.phtml', forms=forms)
|
|
|
|
else:
|
|
|
|
return redirect(url_for('.login'))
|
|
|
|
@admin_app.route('/admin/need_auth/')
|
|
def need_auth():
|
|
|
|
return t.load_template('need_auth.phtml')
|
|
|
|
@admin_app.route('/admin/auth_check/', methods=['POST'])
|
|
def auth_check():
|
|
|
|
error=1
|
|
|
|
check_csrf()
|
|
|
|
you_cannot_login=0
|
|
|
|
if 'login_admin' in session:
|
|
|
|
code=request.form.get('code', '')
|
|
|
|
user_admin=UserAdmin(g.connection)
|
|
|
|
user_admin.check_user=False
|
|
|
|
arr_user=user_admin.set_conditions('WHERE id=%s', [session.get('user_id', 0)]).select_a_row_where()
|
|
|
|
if arr_user:
|
|
|
|
if user_admin.fields['token_auth'].verify(code, arr_user['token_auth']):
|
|
|
|
user_admin.safe_query()
|
|
|
|
user_admin.set_conditions('WHERE id=%s', [session['user_id']]).update({'token_auth': ''})
|
|
|
|
session['verify_auth']=True
|
|
error=0
|
|
|
|
else:
|
|
|
|
you_cannot_login=check_login_tries()
|
|
|
|
else:
|
|
|
|
you_cannot_login=check_login_tries()
|
|
|
|
return {'error': error, 'you_cannot_login': you_cannot_login}
|
|
|
|
@admin_app.route('/admin/change_theme/')
|
|
def change_theme():
|
|
|
|
db=g.connection
|
|
|
|
theme_selected=str(request.args.get('theme', '0'))
|
|
|
|
session['theme']=theme_selected
|
|
|
|
if not db.query('update useradmin set dark_theme=%s WHERE id=%s', [session['theme'], session['user_id']]):
|
|
error=1
|
|
|
|
error=0
|
|
|
|
return {'error': error}
|
|
|
|
"""
|
|
@admin_app.route('/admin/recovery_password/')
|
|
def recovery_password():
|
|
|
|
return ""
|
|
"""
|
|
|
|
def check_login_tries():
|
|
|
|
logintries=LoginTries(g.connection)
|
|
|
|
logintries.safe_query()
|
|
|
|
if request.headers.getlist("X-Forwarded-For"):
|
|
ip=request.headers.getlist("X-Forwarded-For")[0]
|
|
else:
|
|
ip=request.remote_addr
|
|
|
|
you_cannot_login=0
|
|
|
|
arr_try=logintries.set_conditions('WHERE ip=%s', [ip]).select_a_row_where()
|
|
|
|
now_str=now()
|
|
date_now=format_local_strtime('YYYY-MM-DD HH:mm:ss', now_str)
|
|
|
|
date_check=format_local_strtime('YYYY-MM-DD HH:mm:ss', timestamp_to_datetime(obtain_timestamp(now_str)-300))
|
|
|
|
logintries.query('delete from logintries where last_login<%s', [date_check])
|
|
|
|
if arr_try:
|
|
|
|
if arr_try['num_tries']<login_tries:
|
|
|
|
logintries.query('update logintries set num_tries=num_tries+1, last_login=%s WHERE ip=%s', [date_now, ip])
|
|
|
|
else:
|
|
|
|
you_cannot_login=1
|
|
|
|
else:
|
|
|
|
logintries.query('insert into logintries (`ip`, `num_tries`, `last_login`) VALUES (%s, %s, %s)', [ip, 1, date_now])
|
|
|
|
return you_cannot_login
|