parameciofast/parameciofast/modules/fastadmin/app.py

257 lines
8.7 KiB
Python

from fastapi import FastAPI, Cookie, Request, Response, Body, Header
from fastapi.responses import HTMLResponse, RedirectResponse
from parameciofast.modules.fastadmin import admin_app
from typing import Annotated
from parameciofast.fast import app
from parameciofast.libraries.i18n import I18n
from parameciofast.libraries.mtemplates import env_theme, PTemplate
from pydantic import BaseModel, Field
from parameciofast.modules.fastadmin.models.admin import UserAdmin, LoginTries
from parameciofast.libraries.db.webmodel import WebModel
from parameciofast.libraries.fastutils import ResponseData
from parameciofast.libraries.db import simplequery
from settings import config
from parameciofast.libraries.datetime import now, format_local_strtime, timestamp_to_datetime, obtain_timestamp
from parameciofast.libraries.keyutils import create_key_encrypt, create_key
from time import time
env=env_theme(__file__)
t=PTemplate(env, app.url_path_for)
usermodel=UserAdmin()
login_tries=5
if hasattr(config, 'login_tries'):
login_tries=config.login_tries
seconds_login=300
if hasattr(config, 'seconds_login'):
seconds_login=config.seconds_login
cookie_name='paramecio_session'
if hasattr(config, 'cookie_name'):
cookie_name=config.cookie_name
@admin_app.get('/', response_class=HTMLResponse)
def home_admin(request: Request, paramecio_session: Annotated[str | None, Cookie(description='Cookie for validate into the admin site. The cookie name can change in you settings/config.py')] = None, remote_address: Annotated[str | None, Header()] = None):
if not request.session.get('login_admin', None):
return RedirectResponse(app.url_path_for('login_admin'))
i18n=I18n('admin', I18n.session_lang(request.session))
return t.load_template('layout.phtml', title=i18n.tlang('Admin'), tlang=i18n.tlang, url_for=app.url_path_for)
@admin_app.get('/login', response_class=HTMLResponse)
def login_admin(request: Request):
db=WebModel.connection()
if cookie_name+'_remember' in request.cookies:
arr_user=simplequery.select(usermodel, db, dict_fields=['id', 'username'], where_sql='WHERE token_login=%s', dict_values=[request.cookies[cookie_name+'_remember']])
if len(arr_user)>0:
now_str=now()
date_now=format_local_strtime('YYYY-MM-DD HH:mm:ss', now_str)
db.query('update useradmin set last_login=%s WHERE id=%s', [date_now, arr_user[0]['id']])
request.session['login_admin']=True
db.close()
return RedirectResponse(app.url_path_for('home_admin'))
with db.query('select count(id) as num_users from useradmin', []) as cursor:
num_users=cursor.fetchone()['num_users']
if not num_users:
return RedirectResponse(app.url_path_for('signup_admin'))
db.close()
i18n=I18n('admin', I18n.session_lang(request.session))
return t.load_template('login.phtml', title=i18n.tlang('Login'), tlang=i18n.tlang, url_for=app.url_path_for)
@admin_app.get('/signup', response_class=HTMLResponse)
def signup_admin(request: Request):
db=WebModel.connection()
with db.query('select count(id) as num_users from useradmin', []) as cursor:
num_users=cursor.fetchone()['num_users']
if num_users>0:
return RedirectResponse(app.url_path_for('login_admin'))
db.close()
i18n=I18n('admin', I18n.session_lang(request.session))
return t.load_template('signup.phtml', title=i18n.tlang('Signup'), tlang=i18n.tlang, url_for=app.url_path_for)
class UserAdmin(BaseModel):
username: str = Field(description="The username of user")
password: str = Field(description="The password of user")
remember_login: bool | None = None
class ResponseDataLogin(ResponseData):
no_login: bool
@admin_app.post('/login')
def check_login_admin(user: UserAdmin, request: Request, response: Response) -> ResponseDataLogin:
db=WebModel.connection()
i18n=I18n('admin', I18n.session_lang(request.session))
error=1
message=i18n.tlang('Invalid user and password')
no_login=check_login_tries(request, db)
if user.username!='' and user.password!='' and not no_login:
with db.query('select * from useradmin WHERE username=%s', [user.username]) as cursor:
result=cursor.fetchone()
if result:
if usermodel.fields['password'].verify(user.password, result['password']):
remember_key=''
if user.remember_login==True:
# Send cookies
remember_key=create_key_encrypt()
timestamp=int(time())+315360000
response.set_cookie(key=cookie_name+'_remember', value=remember_key, expires=timestamp, max_age=315360000, httponly=True, path=config.application_root)
now_str=now()
date_now=format_local_strtime('YYYY-MM-DD HH:mm:ss', now_str)
db.query('update useradmin set token_login=%s, last_login=%s WHERE id=%s', [remember_key, date_now, result['id']])
request.session['login_admin']=True
error=0
message=''
db.close()
return {'error': error, 'message': message, 'no_login': no_login}
class UserSignup(BaseModel):
username: str = Field(description="The username of new user", min_length=4, pattern=r"\w+")
email: str = Field(description="The email of new user", min_length=7 , pattern=r"\w[\w\.-]*@\w[\w\.-]+\.\w+")
password: str = Field(description="The password of new user", min_length=4)
repeat_password: str = Field(description="Repeat the password of the new user", min_length=4)
model_config = {
"json_schema_extra": {
"examples": [
{
"username": "johnny5",
"email": "trial@example.com",
"password": "arandompasswordthatineverused",
"repeat_password": "arandompasswordthatineverused",
}
]
}
}
@admin_app.post('/signup')
def signup_insert_admin(user: UserSignup, request: Request) -> ResponseData:
i18n=I18n('admin', I18n.session_lang(request.session))
error=1
message=''
db=WebModel.connection()
#Only can exist and user
with db.query('select count(id) as num_users from useradmin', []) as cursor:
num_users=cursor.fetchone()['num_users']
if num_users:
message="You cannot add new users from here"
else:
error=0
if not error:
if simplequery.insert(usermodel, db, dict(user)):
error=0
message="User added!"
db.close()
return {'error': error, 'message': message}
@admin_app.get('/logout')
def logout_admin(request: Request) -> RedirectResponse:
response=RedirectResponse(app.url_path_for('login_admin'))
if 'login_admin' in request.session:
del request.session['login_admin']
if cookie_name+'_remember' in request.cookies:
response.delete_cookie(cookie_name+'_remember', path=config.application_root)
return response
def check_login_tries(request, db):
logintries=LoginTries(db)
logintries.safe_query()
if 'x-real-ip' in request.headers:
ip=request.headers['x-real-ip']
elif 'x-forwarded-for' in request.headers:
ip=request.headers['x-forwarded-for']
else:
ip=request.client.host
you_cannot_login=0
now_str=now()
date_now=format_local_strtime('YYYY-MM-DD HH:mm:ss', now_str)
date_check=format_local_strtime('YYYY-MM-DD HH:mm:ss', timestamp_to_datetime(obtain_timestamp(now_str)-seconds_login))
logintries.query('delete from logintries where last_login<%s', [date_check])
arr_try=logintries.set_conditions('WHERE ip=%s', [ip]).select_a_row_where()
if arr_try:
if arr_try['num_tries']<login_tries:
logintries.query('update logintries set num_tries=num_tries+1, last_login=%s WHERE ip=%s', [date_now, ip])
else:
you_cannot_login=1
else:
logintries.query('insert into logintries (`ip`, `num_tries`, `last_login`) VALUES (%s, %s, %s)', [ip, 1, date_now])
return you_cannot_login