from flask import Blueprint, redirect, session, url_for, request, g, make_response, abort from settings import config from paramecio2.libraries.i18n import I18n, PGetText from paramecio2.libraries.datetime import now, format_local_strtime, timestamp_to_datetime, obtain_timestamp from paramecio2.libraries.formsutils import show_form, generate_csrf, set_extra_forms_user, pass_values_to_form from paramecio2.libraries.db.webmodel import WebModel from paramecio2.modules.admin.models.admin import UserAdmin, LoginTries from paramecio2.libraries.keyutils import create_key_encrypt, create_key from time import time import os, sys from importlib import import_module from paramecio2.libraries.config_admin import config_admin import copy from os import path from paramecio2.modules.admin import admin_app, t from paramecio2.libraries.sendmail import SendMail from paramecio2.libraries.formsutils import check_csrf from hmac import compare_digest as compare_hash from paramecio2.modules.admin.libraries.admin_auth import admin_prepare, admin_finished, modules_access try: import crypt crypt_pass=True except: crypt_pass=False login_tries=5 if hasattr(config, 'login_tries'): login_tries=config.login_tries gtext=PGetText(__file__) _=gtext.gettext yes_recovery_login=False email_address='localhost' if hasattr(config, 'yes_recovery_login'): yes_recovery_login=config.yes_recovery_login if hasattr(config, 'email_address'): email_address=config.email_address admin_prepare=admin_app.before_request(admin_prepare) admin_finished=admin_app.after_request(admin_finished) # Load modules from admin for app in config.apps: module_app=config.apps[app][0] module_path=os.path.dirname(sys.modules[module_app].__file__) if os.path.isfile(module_path+'/settings/config_admin.py'): config_path=module_app+'.settings.config_admin' a=import_module(config_path) arr_modules_admin={} for app_load in config_admin: if len(app_load)==3: arr_modules_admin[app_load[2]+'/']=import_module(app_load[1]) elif len(app_load)==4: arr_modules_admin[app_load[2]+'/'+app_load[3]]=import_module(app_load[1]) @admin_app.route('/admin/') def admin(): return t.load_template('home.phtml', title=_('Admin')) @admin_app.route('/admin/logout/') def logout(): resp=make_response(redirect(url_for('admin_app.login'))) if 'login_admin' in session: del session['login_admin'] if 'verify_auth' in session: del session['verify_auth'] if 'remember_login_admin' in request.cookies: resp.set_cookie('remember_login_admin', value='', max_age=0, expires=0, path=config.application_root) return resp @admin_app.route('/admin/login/', methods=['GET', 'POST']) def login(): new_crypt=False user_admin=UserAdmin(g.connection) user_admin.yes_repeat_password=False user_admin.fields['password'].required=True user_admin.create_forms(['username', 'password']) c=user_admin.select_count() if c==0: return redirect(url_for('admin_app.signup')) post={} if request.method=='POST': check_csrf() username=request.form['username'] password=request.form['password'] arr_user=user_admin.set_conditions('WHERE username=%s', [username]).select_a_row_where() if arr_user and not check_login_tries(): # Layer compatibility with old crypt password check_pass=user_admin.fields['password'].verify(password, arr_user['password']) if not check_pass: try: check_pass=compare_hash(arr_user['password'], crypt.crypt(password, arr_user['password'])) new_crypt=True except: print('Warning: python developers deleting unix crypt module support, you cannot use sha512 passwords.') check_pass=False pass if check_pass: if not arr_user['disabled']: session['login_admin']=True session['user_id']=arr_user['id'] resp = make_response({'error': 0}) arr_update={} user_admin.safe_query() user_admin.check_user=False if 'remember_login' in request.form: remember_key=create_key_encrypt() #user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update({'token_login': remember_key}) arr_update['token_login']=remember_key timestamp=int(time())+315360000 resp.set_cookie('remember_login_admin', value=remember_key, max_age=315360000, expires=timestamp, path=config.application_root) if arr_user['double_auth']: token_auth=create_key(8) session['verify_auth']=False #user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update({'token_auth': token_auth}) user_admin.fields['token_auth'].protected=False arr_update['token_auth']=token_auth # Send email sendmail=SendMail(ssl=True) # def send(self, from_address, to_address: list, subject, message, content_type='plain', attachments=[]): sendmail.send(config.portal_email, [arr_user['email']], _('Code for complete login'), _('We send to you a code for activate your account using double authentication:')+"\n"+token_auth, content_type='plain', attachments=[]) if arr_user['dark_theme']: session['theme']='1' else: session['theme']='0' arr_update['last_login']=now() session['lang']=arr_user.get('lang', I18n.default_lang) if len(arr_update)>0: if new_crypt: print('Changing password for %s to argon2' % arr_user['username']) user_admin.fields['password'].protected=False arr_update['password']=password user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update(arr_update) return resp else: resp = make_response({'error': 1, 'disable': 1}) return resp else: you_cannot_login=check_login_tries() return {'error': 1, 'you_cannot_login': you_cannot_login} else: you_cannot_login=check_login_tries() return {'error': 1, 'you_cannot_login': you_cannot_login} #if else: forms=show_form(post, user_admin.forms, t, yes_error=False) return t.load_template('login.phtml', forms=forms, yes_recovery_login=yes_recovery_login) @admin_app.route('/admin/signup/', methods=['GET', 'POST']) def signup(): user_admin=UserAdmin(g.connection) c=user_admin.select_count() if c==0: if request.method=='POST': check_csrf() user_admin.conditions=['WHERE privileges=%s', [2]] forms=dict(request.form) forms['privileges']=2 forms['last_login']=now() user_admin.valid_fields=['username', 'email', 'password', 'privileges', 'last_login'] user_admin.create_forms() if user_admin.insert(forms, False): error= {'error': 0} return error else: user_admin.check_all_fields(forms, False) pass_values_to_form(forms, user_admin.forms, yes_error=True) session['csrf_token']=create_key_encrypt() error={'error': 1, 'csrf_token': session['csrf_token']} for field in user_admin.valid_fields: error[field]=user_admin.forms[field].txt_error error['repeat_password']=user_admin.forms['repeat_password'].txt_error return error else: post={} set_extra_forms_user(user_admin) forms=show_form(post, user_admin.forms, t, yes_error=False) return t.load_template('register.phtml', forms=forms) else: return redirect(url_for('.login')) @admin_app.route('/admin/need_auth/') def need_auth(): return t.load_template('need_auth.phtml') @admin_app.route('/admin/auth_check/', methods=['POST']) def auth_check(): error=1 check_csrf() you_cannot_login=0 if 'login_admin' in session: code=request.form.get('code', '') user_admin=UserAdmin(g.connection) user_admin.check_user=False arr_user=user_admin.set_conditions('WHERE id=%s', [session.get('user_id', 0)]).select_a_row_where() if arr_user: if user_admin.fields['token_auth'].verify(code, arr_user['token_auth']): user_admin.safe_query() user_admin.set_conditions('WHERE id=%s', [session['user_id']]).update({'token_auth': ''}) session['verify_auth']=True error=0 else: you_cannot_login=check_login_tries() else: you_cannot_login=check_login_tries() return {'error': error, 'you_cannot_login': you_cannot_login} @admin_app.route('/admin/change_theme/') def change_theme(): db=g.connection theme_selected=str(request.args.get('theme', '0')) session['theme']=theme_selected if not db.query('update useradmin set dark_theme=%s WHERE id=%s', [session['theme'], session['user_id']]): error=1 error=0 return {'error': error} def check_login_tries(): logintries=LoginTries(g.connection) logintries.safe_query() if request.headers.getlist("X-Forwarded-For"): ip=request.headers.getlist("X-Forwarded-For")[0] else: ip=request.remote_addr you_cannot_login=0 arr_try=logintries.set_conditions('WHERE ip=%s', [ip]).select_a_row_where() now_str=now() date_now=format_local_strtime('YYYY-MM-DD HH:mm:ss', now_str) date_check=format_local_strtime('YYYY-MM-DD HH:mm:ss', timestamp_to_datetime(obtain_timestamp(now_str)-300)) logintries.query('delete from logintries where last_login<%s', [date_check]) if arr_try: if arr_try['num_tries']