432 lines
13 KiB
Python
432 lines
13 KiB
Python
#from fast import app
|
|
from fastapi import Response, Cookie, FastAPI, Request, HTTPException
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from paramecio2.modules.admin.models.admin import UserAdmin, LoginTries
|
|
from paramecio2.libraries.mtemplates import env_theme, PTemplate
|
|
from paramecio2.libraries.urls import make_url
|
|
from paramecio2.modules.admin import t as admin_t
|
|
from settings import config
|
|
from paramecio2.libraries.i18n import I18n, PGetText
|
|
import os
|
|
from typing import Annotated
|
|
#from parameciofast.fastadmin.forms.userform import UserForm, RegisterForm
|
|
from parameciofast.modules.fastadmin.models.admin import UserAdmin
|
|
from paramecio2.libraries.db.webmodel import WebModel
|
|
from pydantic import BaseModel
|
|
from paramecio2.libraries.datetime import now, format_local_strtime, timestamp_to_datetime, obtain_timestamp
|
|
from paramecio2.libraries.formsutils import show_form, generate_csrf, set_extra_forms_user, pass_values_to_form
|
|
from itsdangerous.url_safe import URLSafeSerializer
|
|
from paramecio2.libraries.keyutils import create_key_encrypt
|
|
|
|
admin_app=FastAPI()
|
|
|
|
env=env_theme(__file__)
|
|
|
|
t=PTemplate(env)
|
|
|
|
t.env.directories=admin_t.env.directories
|
|
|
|
t.env.directories.insert(1, os.path.dirname(__file__).replace('/admin', '')+'/templates/admin')
|
|
|
|
# ['themes/default/templates', '/home/absurdo/arch/absurdo/apache/htdocs/catalog/parameciofast/fastadmin/templates/admin', '/home/absurdo/virtualenv/lib/python3.11/site-packages/paramecio2/modules/admin/templates', '/home/absurdo/virtualenv/lib/python3.11/site-packages/paramecio2/libraries/templates']
|
|
|
|
login_tries=5
|
|
|
|
if hasattr(config, 'login_tries'):
|
|
login_tries=config.login_tries
|
|
|
|
gtext=PGetText(__file__)
|
|
|
|
_=gtext.gettext
|
|
|
|
yes_recovery_login=False
|
|
email_address='localhost'
|
|
|
|
if hasattr(config, 'yes_recovery_login'):
|
|
yes_recovery_login=config.yes_recovery_login
|
|
|
|
if hasattr(config, 'email_address'):
|
|
email_address=config.email_address
|
|
|
|
cookie_name='paramecio.session'
|
|
|
|
if hasattr(config, 'cookie_name'):
|
|
cookie_name=config.cookie_name
|
|
|
|
def check_session():
|
|
|
|
pass
|
|
|
|
secret_key=create_key_encrypt(24)
|
|
|
|
#useradmin=UserAdmin()
|
|
|
|
@admin_app.get('/', response_class=HTMLResponse)
|
|
def admin(paramecio_session: Annotated[str | None, Cookie()] = None):
|
|
|
|
"""Admin html frontend. Only can access using admin cookie an dlogin via /admin/login
|
|
"""
|
|
|
|
if not paramecio_session:
|
|
return RedirectResponse(make_url('admin/login'))
|
|
else:
|
|
arr_user=get_user(paramecio_session)
|
|
|
|
if 'admin' in arr_user:
|
|
|
|
return t.load_template('home.phtml', title=_('Admin'), user=arr_user)
|
|
|
|
else:
|
|
raise HTTPException(status_code=404, detail="User incorrect")
|
|
|
|
user_admin=UserAdmin()
|
|
|
|
user_admin.create_forms()
|
|
|
|
#user_admin.create_forms()
|
|
|
|
#user_form=UserForm()
|
|
|
|
#register_form=RegisterForm()
|
|
|
|
@admin_app.get('/login', response_class=HTMLResponse)
|
|
def admin_login():
|
|
|
|
db=WebModel.connection()
|
|
|
|
#user_admin.conn(db)
|
|
|
|
post={}
|
|
|
|
user_admin=UserAdmin(db)
|
|
|
|
user_admin.yes_repeat_password=False
|
|
|
|
user_admin.fields['password'].required=True
|
|
|
|
user_admin.create_forms(['username', 'password'])
|
|
|
|
c=user_admin.select_count()
|
|
|
|
forms=show_form(post, user_admin.forms, t, yes_error=False)
|
|
|
|
db.close()
|
|
|
|
if c:
|
|
|
|
return t.load_template('login.phtml', forms=forms, yes_recovery_login=yes_recovery_login)
|
|
else:
|
|
return RedirectResponse(make_url('admin/signup'))
|
|
|
|
class UserLogin(BaseModel):
|
|
username: str
|
|
password: str
|
|
remember_login: bool | None
|
|
|
|
@admin_app.post('/login')
|
|
def admin_check_login(user_login: UserLogin, response: Response, request: Request):
|
|
|
|
db=WebModel.connection()
|
|
|
|
user_admin.conn(db)
|
|
|
|
username=user_login.username
|
|
|
|
password=user_login.password
|
|
|
|
remember_login=user_login.remember_login
|
|
|
|
resp={'error': 1}
|
|
|
|
arr_user=user_admin.set_conditions('WHERE username=%s', [username]).select_a_row_where()
|
|
|
|
if arr_user and not check_login_tries(request, db):
|
|
|
|
# Layer compatibility with old crypt password
|
|
|
|
new_crypt=False
|
|
|
|
check_pass=user_admin.fields['password'].verify(password, arr_user['password'])
|
|
|
|
if not check_pass:
|
|
#check_pass=password_ok(password, arr_user['password'])
|
|
try:
|
|
check_pass=compare_hash(arr_user['password'], crypt.crypt(password, arr_user['password']))
|
|
new_crypt=True
|
|
except:
|
|
print('Warning: python developers deleting unix crypt module support, you cannot use sha512 passwords.')
|
|
check_pass=False
|
|
pass
|
|
|
|
if check_pass:
|
|
|
|
if not arr_user['disabled']:
|
|
|
|
session={}
|
|
|
|
#s=Serialize(config.secret_key)
|
|
|
|
resp={'error': 0}
|
|
|
|
response=JSONResponse(resp)
|
|
|
|
s=URLSafeSerializer(config.secret_key)
|
|
|
|
value_cookie=s.dumps({'admin': 1, 'user_id': arr_user['id']})
|
|
|
|
#session['login_admin']=True
|
|
|
|
#session['user_id']=arr_user['id']
|
|
|
|
response.set_cookie(key=cookie_name, value=value_cookie)
|
|
|
|
#session['login_admin']=True
|
|
|
|
#session['user_id']=arr_user['id']
|
|
|
|
arr_update={}
|
|
|
|
user_admin.safe_query()
|
|
|
|
user_admin.check_user=False
|
|
|
|
if user_login.remember_login==1:
|
|
|
|
remember_key=create_key_encrypt()
|
|
|
|
#user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update({'token_login': remember_key})
|
|
arr_update['token_login']=remember_key
|
|
|
|
timestamp=int(time())+315360000
|
|
|
|
#Signature: Response.set_cookie(key, value, max_age=None, expires=None, path="/", domain=None, secure=False, httponly=False, samesite="lax")
|
|
|
|
response.set_cookie(key="remember_login_admin", value=remember_key, max_age=315360000, expires=timestamp, path=config.application_root)
|
|
|
|
#return response
|
|
|
|
#resp.set_cookie('remember_login_admin', value=remember_key, max_age=315360000, expires=timestamp, path=config.application_root)
|
|
|
|
if arr_user['double_auth']:
|
|
|
|
token_auth=create_key(8)
|
|
session['verify_auth']=False
|
|
|
|
#user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update({'token_auth': token_auth})
|
|
|
|
user_admin.fields['token_auth'].protected=False
|
|
|
|
arr_update['token_auth']=token_auth
|
|
|
|
# Send email
|
|
|
|
sendmail=SendMail(ssl=True)
|
|
|
|
# def send(self, from_address, to_address: list, subject, message, content_type='plain', attachments=[]):
|
|
|
|
sendmail.send(config.portal_email, [arr_user['email']], _('Code for complete login'), _('We send to you a code for activate your account using double authentication:')+"\n"+token_auth, content_type='plain', attachments=[])
|
|
|
|
if arr_user['dark_theme']:
|
|
session['theme']='1'
|
|
else:
|
|
session['theme']='0'
|
|
|
|
arr_update['last_login']=now()
|
|
|
|
session['lang']=arr_user.get('lang', I18n.default_lang)
|
|
|
|
if len(arr_update)>0:
|
|
|
|
if new_crypt:
|
|
#print('Changing password for %s to argon2' % arr_user['username'])
|
|
|
|
user_admin.fields['password'].protected=False
|
|
|
|
arr_update['password']=password
|
|
|
|
user_admin.set_conditions('WHERE id=%s', [arr_user['id']]).update(arr_update)
|
|
|
|
return response
|
|
|
|
else:
|
|
|
|
resp={'error': 1, 'disable': 1}
|
|
|
|
response=JSONResponse(resp)
|
|
|
|
return response
|
|
|
|
|
|
else:
|
|
|
|
you_cannot_login=check_login_tries(request, db)
|
|
|
|
return {'error': 1, 'you_cannot_login': you_cannot_login}
|
|
else:
|
|
|
|
you_cannot_login=check_login_tries(request, db)
|
|
|
|
return {'error': 1, 'you_cannot_login': you_cannot_login}
|
|
|
|
|
|
@admin_app.get('/signup', response_class=HTMLResponse)
|
|
def admin_signup():
|
|
|
|
|
|
db=WebModel.connection()
|
|
|
|
user_admin.conn(db)
|
|
|
|
c=user_admin.select_count()
|
|
|
|
post={}
|
|
|
|
#forms="\n".join([x for x in UserForm()])
|
|
#forms=register_form
|
|
|
|
set_extra_forms_user(user_admin)
|
|
|
|
forms=show_form(post, user_admin.forms, t, yes_error=False)
|
|
|
|
db.close()
|
|
|
|
if not c:
|
|
|
|
return t.load_template('register.phtml', forms=forms)
|
|
else:
|
|
return RedirectResponse(make_url('admin/login'))
|
|
|
|
class UserRegister(BaseModel):
|
|
username: str
|
|
email: str
|
|
password: str
|
|
repeat_password: str
|
|
|
|
@admin_app.post('/signup')
|
|
def post_admin_signup(user_register: UserRegister):
|
|
|
|
#print(user_ogin
|
|
|
|
db=WebModel.connection()
|
|
|
|
user_admin.conn(db)
|
|
|
|
error=1
|
|
forms=dict(user_register)
|
|
|
|
|
|
user_admin.conditions=['WHERE privileges=%s', [2]]
|
|
|
|
forms['privileges']=2
|
|
|
|
forms['last_login']=now()
|
|
|
|
user_admin.valid_fields=['username', 'email', 'password', 'privileges', 'last_login']
|
|
|
|
user_admin.create_forms()
|
|
|
|
if user_admin.insert(forms, False):
|
|
|
|
error= {'error': 0}
|
|
|
|
else:
|
|
|
|
user_admin.check_all_fields(forms, False)
|
|
|
|
pass_values_to_form(forms, user_admin.forms, yes_error=True)
|
|
|
|
error={'error': 1}
|
|
|
|
for field in user_admin.valid_fields:
|
|
|
|
error[field]=user_admin.forms[field].txt_error
|
|
|
|
error['repeat_password']=user_admin.forms['repeat_password'].txt_error
|
|
|
|
db.close()
|
|
|
|
return error
|
|
|
|
@admin_app.get('/recovery_password', response_class=HTMLResponse)
|
|
def admin_recovery_passsword():
|
|
|
|
return ""
|
|
|
|
@admin_app.get('/change_theme')
|
|
def change_theme(theme: int, paramecio_session: Annotated[str | None, Cookie()] = None):
|
|
|
|
resp={'error': 0}
|
|
|
|
response=JSONResponse(resp)
|
|
|
|
s=URLSafeSerializer(config.secret_key)
|
|
|
|
arr_user=get_user(paramecio_session)
|
|
|
|
arr_user['theme']=theme
|
|
|
|
value_cookie=s.dumps(arr_user)
|
|
|
|
response.set_cookie(key=cookie_name, value=value_cookie)
|
|
|
|
return response
|
|
|
|
|
|
"""Function for get the number of login tries
|
|
"""
|
|
|
|
def check_login_tries(request, db):
|
|
|
|
logintries=LoginTries(db)
|
|
|
|
logintries.safe_query()
|
|
|
|
"""
|
|
if request.headers.getlist("X-Forwarded-For"):
|
|
ip=request.headers.getlist("X-Forwarded-For")[0]
|
|
else:
|
|
ip=request.remote_addr
|
|
"""
|
|
|
|
ip=request.client.host
|
|
print(ip)
|
|
|
|
you_cannot_login=0
|
|
|
|
arr_try=logintries.set_conditions('WHERE ip=%s', [ip]).select_a_row_where()
|
|
|
|
now_str=now()
|
|
date_now=format_local_strtime('YYYY-MM-DD HH:mm:ss', now_str)
|
|
|
|
date_check=format_local_strtime('YYYY-MM-DD HH:mm:ss', timestamp_to_datetime(obtain_timestamp(now_str)-300))
|
|
|
|
logintries.query('delete from logintries where last_login<%s', [date_check])
|
|
|
|
if arr_try:
|
|
|
|
if arr_try['num_tries']<login_tries:
|
|
|
|
logintries.query('update logintries set num_tries=num_tries+1, last_login=%s WHERE ip=%s', [date_now, ip])
|
|
|
|
else:
|
|
|
|
you_cannot_login=1
|
|
|
|
else:
|
|
|
|
logintries.query('insert into logintries (`ip`, `num_tries`, `last_login`) VALUES (%s, %s, %s)', [ip, 1, date_now])
|
|
|
|
return you_cannot_login
|
|
|
|
def get_user(session_cookie):
|
|
|
|
#try:
|
|
|
|
s=URLSafeSerializer(config.secret_key)
|
|
|
|
return s.loads(session_cookie)
|
|
"""
|
|
except:
|
|
|
|
return {}
|
|
"""
|