Added gevent server for tasks

This commit is contained in:
absurdo 2023-07-16 01:23:19 +02:00
parent a9762c1a11
commit 0e0131334a
5 changed files with 449 additions and 8 deletions

View file

@ -260,7 +260,7 @@ def pastafari2_add_server_task():
if not task.run_task(ip, path_task, 'Add new server', 'add_new_server', 'Task for add a new server', user=server_username, password=server_password, where_sql_server='', url='', data=data, send_task=True):
error=1
error_form['#server_host_error']=I18n.lang('pastafari2', 'error_exec_task', 'Error: cannot execute the task')
error_form['#server_host_error']=I18n.lang('pastafari2', 'error_exec_task', 'Error: cannot execute the task '+task.txt_error)
task_id=task.task_id

View file

@ -18,6 +18,7 @@ from rq import Queue
import importlib
import traceback
from modules.monit.models.monit import LonelyIpField
from modules.pastafari2.libraries.configtask import config_task
class Task(WebModel):
@ -49,6 +50,7 @@ class Task(WebModel):
self.register(DictField('data', data_field))
self.register(corefields.IntegerField('num_servers'))
self.register(corefields.BooleanField('is_parent'))
self.register(corefields.BooleanField('status'))
self.register(ParentField('parent_id', size=11, required=False, field_name='name_task'))
self.fields['where_sql_server'].escape=True
@ -176,7 +178,7 @@ class Task(WebModel):
return False
"""
q = Queue(connection=Redis())
#q = Queue(connection=Redis())
if yes_parent:
arr_task=self.select_a_row(task_id)
@ -196,22 +198,91 @@ class Task(WebModel):
final_sql=sql_insert+", ".join(sql_insert_values)
self.query(final_sql)
"""
with self.query('select id from task WHERE parent_id=%s', [task_id]) as cursor:
for row in cursor:
#print(row)
result = q.enqueue(task, row['id'], job_timeout=3600)
"""
return send_task_to_server(task_id)
else:
#Enqueue task function.
result = q.enqueue(task, task_id, job_timeout=3600)
#result = q.enqueue(task, task_id, job_timeout=3600)
http = urllib3.PoolManager()
try:
#@app.route('/exec/<api_key>/<int:task_id>')
r = http.request('GET', 'http://127.0.0.1:1337/exec/{}/{}'.format(config_task.api_key, task_id))
#print('http://127.0.0.1:1337/exec/{}/{}'.format(task_id, pastafari2_api_key))
if r.status!=200:
self.error=True
self.txt_error="Cannot access to task server: Error "+str(r.status)
return False
else:
resp=json.loads(r.data.decode('utf-8'))
if resp['error']:
self.error=True
self.txt_error=resp['message']
logtask=LogTask(self.connection)
logtask.insert(resp)
return False
except urllib3.exceptions.MaxRetryError:
self.error=True
self.txt_error='Cannot connect to the task server, check if is up'
return False
return True
# Function used in rq worker for exec the ssh task.
# Function for send task to task server
def send_task_to_server(task_id):
http = urllib3.PoolManager()
try:
#@app.route('/exec/<api_key>/<int:task_id>')
r = http.request('GET', 'http://127.0.0.1:1337/exec/{}/{}'.format(config_task.api_key, task_id))
#print('http://127.0.0.1:1337/exec/{}/{}'.format(task_id, pastafari2_api_key))
if r.status!=200:
self.error=True
self.txt_error="Cannot access to task server: Error "+str(r.status)
return False
else:
resp=json.loads(r.data.decode('utf-8'))
if resp['error']:
self.error=True
self.txt_error=resp['message']
logtask=LogTask(self.connection)
logtask.insert(resp)
return False
except urllib3.exceptions.MaxRetryError:
self.error=True
self.txt_error='Cannot connect to the task server, check if is up'
return False
return True
# Function used in rq worker for exec the ssh task in rq.
def task(task_id):

View file

@ -1,4 +1,4 @@
#!/usr/bin/python3 -u
#!/opt/pythonenv/bin/python3 -u
# A script for install pzoo user
@ -17,7 +17,7 @@ if linux_distro=='arch':
if call("sudo pacman -Syu --noconfirm", shell=True) > 0:
print('Error, cannot upgrade server...')
exit(1)
elif linux_distro=='debian':
elif linux_distro=='debian' or linux_distro=='ubuntu':
if call('sudo DEBIAN_FRONTEND="noninteractive" apt-get -y update', shell=True) > 0:
print('Error, cannot upgrade server...')
@ -30,7 +30,7 @@ elif linux_distro=='debian':
if call(update_command, shell=True) > 0:
print('Error, cannot upgrade server...')
exit(1)
elif linux_distro=='rocky' or linux_distro=='fedora':
elif linux_distro=='rocky' or linux_distro=='fedora' or linux_distro=='alma' or linux_distro=='centos':
if call("sudo dnf upgrade -y", shell=True) > 0:
print('Error, cannot upgrade server...')

225
servers/launcher.py Normal file
View file

@ -0,0 +1,225 @@
#!/usr/bin/env python3
import sys, os
sys.path.insert(0, os.path.realpath(os.path.dirname(__file__))+'/../../../')
os.chdir(os.path.realpath(os.path.dirname(__file__))+'/../../../')
import argparse
import json
from paramecio2.libraries.db.webmodel import WebModel
from modules.pastafari2.models import tasks
from modules.pastafari2.models.pastafari2 import ServerDbTask
from modules.pastafari2.libraries.task import Task
from settings import config
from modules.pastafari2.libraries.configtask import config_task
from multiprocessing.pool import Pool
import importlib
from paramecio2.libraries.db.sqlalchemy import SqlClass
SqlClass.disable_pool=True
num_tasks=10
if hasattr(config, 'pastafari_num_tasks'):
num_tasks=config.pastafari_num_tasks
def start(cli_args=None):
parser = argparse.ArgumentParser(description='A daemon used for make a task in a server.The results are saved in a sql database using task class')
parser.add_argument('--task_id', help='The task to execute', required=True)
args = parser.parse_args(cli_args)
task_id=int(args.task_id)
conn=WebModel.connection()
task_model=tasks.Task(conn)
logtask=tasks.LogTask(conn)
arr_task=task_model.select_a_row(task_id)
if arr_task:
if (arr_task['user']!='' or arr_task['password']!='') and arr_task['path']!='':
config_task.remote_user=arr_task['user']
config_task.remote_password=arr_task['password']
config_task.remote_path=arr_task['path']
task_model.create_forms()
task_model.reset_require()
task_model.set_conditions('WHERE id=%s', [task_id])
task_model.update({'password': ''})
"""
if not commands_to_execute:
print('Error: no task files')
exit(1)
"""
server_model=ServerDbTask(conn)
if not arr_task['is_parent']:
#task=generate_task(arr_task, conn, task_id)
remote_user=arr_task['user']
remote_password=arr_task['password']
private_key=arr_task['ssh_key_priv']
password_key=arr_task.get('ssh_key_password', '')
taskmod=importlib.import_module(arr_task.get('path', ''))
ssh_task=taskmod.ServerTask(arr_task['server'], conn, remote_user=remote_user, remote_password=remote_password, private_key=private_key, password_key=password_key, remote_path='pastafari2', task_id=task_id, data=json.loads(arr_task['data']))
ssh_task.exec()
else:
# Select the servers and make all tasks asynchronous
#server_model.set_conditions(arr_task['where_sql_server'], [])
#server_model.yes_reset_conditions=False
c=task_model.set_conditions('WHERE parent_id=%s', [task_id]).select_count()
#with conn.query('select count(*) from task where parent_id=%s', [task_id]) as cursor:
#Update task with number of servers
task_model.set_conditions('WHERE id=%s', [task_id])
task_model.reset_require()
task_model.valid_fields=['num_servers']
task_model.update({'num_servers': c})
z=0
while z<c:
# Set the num of pools
#server_model.set_limit([z, num_tasks])
arr_tasks=task_model.set_conditions('WHERE parent_id=%s', [task_id]).set_limit([z, num_tasks]).select_to_array()
num_pools=len(arr_tasks)
arr_task_exec=[]
with Pool(processes=num_pools) as pool:
for one_task in arr_tasks:
arr_task_exec.append(one_task)
pool.map(execute_task, arr_task_exec)
#for x in range(num_pools)
#pool.
pass
z+=num_tasks
pass
# Task done
task_model.set_conditions('WHERE id=%s', [task_id])
task_model.reset_require()
task_model.valid_fields=['status']
task_model.update({'num_servers': 1, 'status': 1})
conn.close()
exit(0)
def execute_task(arr_task):
conn=WebModel.connection()
remote_user=arr_task['user']
remote_password=arr_task['password']
private_key=arr_task['ssh_key_priv']
password_key=arr_task.get('ssh_key_password', '')
task_id=arr_task['id']
taskmod=importlib.import_module(arr_task.get('path', ''))
ssh_task=taskmod.ServerTask(arr_task['server'], conn, remote_user=remote_user, remote_password=remote_password, private_key=private_key, password_key=password_key, remote_path='pastafari2', task_id=task_id, data=json.loads(arr_task['data']))
ssh_task.exec()
conn.close()
del ssh_task
def execute_multitask(arr_task_exec=[]):
conn=WebModel.connection()
arr_task=arr_task_exec[0]
task_id=arr_task_exec[1]
server=arr_task_exec[2]
os_server=arr_task_exec[3]
task=generate_task(arr_task, conn, task_id)
task.server=server
task.os_server=os_server
task.exec()
conn.close()
del task
def generate_task(arr_task, conn, task_id):
#taskssh.config=taskset.get('config', taskssh.config)
#arr_task={k: v for k, v in arr_task.items() if v is not "" and v is not "[]"}
taskmod=importlib.import_module(arr_task.get('path', ''))
taskssh=taskmod.TaskServer(arr_task['server'], conn, task_id)
"""
if arr_task['name_task']!='':
taskssh.name_task=arr_task['name_task']
if arr_task['codename_task']!='':
taskssh.codename_task=arr_task['codename_task']
if arr_task['description_task']!='':
taskssh.description_task=arr_task['description_task']
if arr_task['files']!='[]':
taskssh.files=json.loads(arr_task['files'])
if arr_task['commands_to_execute']!='[]':
taskssh.commands_to_execute=json.loads(arr_task['commands_to_execute'])
if arr_task['delete_files']!='[]':
taskssh.delete_files=json.loads(arr_task['delete_files'])
if arr_task['delete_directories']!='[]':
taskssh.delete_directories=json.loads(arr_task['delete_directories'])
"""
taskssh.data=json.loads(arr_task['data'])
return taskssh
if __name__=='__main__':
start()

145
servers/scheduler.py Normal file
View file

@ -0,0 +1,145 @@
import sys, os
sys.path.insert(0, os.path.realpath(os.path.dirname(__file__))+'/../../../')
from gevent import monkey; monkey.patch_all()
from gevent.subprocess import Popen, PIPE
#from multiprocessing import Process
import argparse
import uuid
import gevent, traceback, sys, time
#from bottle import route, run, default_app
from gevent.pywsgi import WSGIServer
from importlib import import_module
from os.path import isfile
from settings import config
from paramecio2.libraries.i18n import I18n
from paramecio2.libraries.db.webmodel import WebModel
from modules.pastafari2.models import tasks
from modules.pastafari2.libraries.task import Task
from modules.pastafari2.libraries.configtask import config_task
from flask import Flask
# For deploy with uwsgi; uwsgi --gevent 100 --http-socket :9090 --wsgi-file scheduler.py
pastafari_scripts='./scripts'
pastafari_host='127.0.0.1'
pastafari_port=1337
pastafari_py_command='python3'
#parser.add_argument('--port', help='The port where the task server is executed', required=True)
if hasattr(config, 'pastafari_scripts'):
pastafari_scripts=config.pastafari_scripts
if hasattr(config, 'pastafari_port'):
pastafari_port=config.pastafari_port
if hasattr(config, 'pastafari_host'):
pastafari_host=config.pastafari_host
if hasattr(config, 'pastafari_py_command'):
pastafari_py_command=config.pastafari_py_command
def execute_script(task_id, executable='launcher.py'):
args=['%s %s --task_id=%i' % (pastafari_py_command, executable, task_id)]
with Popen(args, bufsize=None, executable=None, stdin=None, stdout=PIPE, stderr=PIPE, preexec_fn=None, shell=True, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, threadpool=None) as proc:
proc.wait()
return_value=proc.returncode
if return_value>0:
connection=WebModel.connection()
if executable=='launcher.py':
logtask=tasks.LogTask(connection)
task=tasks.Task(connection)
"""
else:
logtask=tasks.HostLogTask(connection)
task=tasks.HostTask(connection)
"""
logtask.create_forms()
task.create_forms()
line=proc.stdout.read().decode('utf-8')
line_error=proc.stderr.read().decode('utf-8')
logtask.insert({'task_id': task_id, 'progress': 100, 'message': 'Error executing '+executable+': '+str(line)+"\n"+str(line_error), 'error': 1, 'status': 1})
#Status die
task.set_conditions('where id=%s', [task_id])
task.reset_require()
task.update({'status': 1, 'error': 1})
connection.close()
app=Flask(__name__)
@app.route('/')
def home():
response={'error': 1, 'code_error': 1, 'message': 'Nothing to see here...', 'progress' : 100}
return response
@app.route('/exec/<api_key>/<int:task_id>')
def index(api_key, task_id):
connection=WebModel.connection()
# Get the task to execute.
executable='launcher.py'
if api_key==config_task.api_key:
task=tasks.Task(connection)
logtask=tasks.LogTask(connection)
logtask.create_forms()
arr_task=task.select_a_row(task_id)
if arr_task:
# Add to queue
greenlet = gevent.spawn( execute_script, task_id, executable)
# Close the connection
response={'error': 0, 'code_error': 0, 'message': 'Begin task with id '+str(task_id), 'progress' : 0}
connection.close()
return response
else:
response={'error': 1, 'code_error': 1, 'message': 'Doesnt exists task with id '+str(task_id), 'progress' : 100, 'status': 1}
connection.close()
return response
else:
response={'error': 1, 'code_error': 1, 'message': 'No permission for make tasks', 'progress' : 100, 'status': 1}
connection.close()
return response
#logtask.insert({})
#app = application = default_app()
# Init the task servers
def run_app(app):
#run(app=app, host=pastafari_host, port=pastafari_port, debug=config.debug, server='gevent', reloader=config.reloader)
print('Init task server...')
http_server=WSGIServer((pastafari_host, pastafari_port), app)
http_server.serve_forever()
if __name__=='__main__':
run_app(app)