diff --git a/admin/dashboard.py b/admin/dashboard.py index 887d424..d24aad1 100644 --- a/admin/dashboard.py +++ b/admin/dashboard.py @@ -260,7 +260,7 @@ def pastafari2_add_server_task(): if not task.run_task(ip, path_task, 'Add new server', 'add_new_server', 'Task for add a new server', user=server_username, password=server_password, where_sql_server='', url='', data=data, send_task=True): error=1 - error_form['#server_host_error']=I18n.lang('pastafari2', 'error_exec_task', 'Error: cannot execute the task') + error_form['#server_host_error']=I18n.lang('pastafari2', 'error_exec_task', 'Error: cannot execute the task '+task.txt_error) task_id=task.task_id diff --git a/models/tasks.py b/models/tasks.py index bc558f4..2c78334 100644 --- a/models/tasks.py +++ b/models/tasks.py @@ -18,6 +18,7 @@ from rq import Queue import importlib import traceback from modules.monit.models.monit import LonelyIpField +from modules.pastafari2.libraries.configtask import config_task class Task(WebModel): @@ -49,6 +50,7 @@ class Task(WebModel): self.register(DictField('data', data_field)) self.register(corefields.IntegerField('num_servers')) self.register(corefields.BooleanField('is_parent')) + self.register(corefields.BooleanField('status')) self.register(ParentField('parent_id', size=11, required=False, field_name='name_task')) self.fields['where_sql_server'].escape=True @@ -176,7 +178,7 @@ class Task(WebModel): return False """ - q = Queue(connection=Redis()) + #q = Queue(connection=Redis()) if yes_parent: arr_task=self.select_a_row(task_id) @@ -196,22 +198,91 @@ class Task(WebModel): final_sql=sql_insert+", ".join(sql_insert_values) self.query(final_sql) - + """ with self.query('select id from task WHERE parent_id=%s', [task_id]) as cursor: for row in cursor: #print(row) result = q.enqueue(task, row['id'], job_timeout=3600) + """ + + return send_task_to_server(task_id) else: #Enqueue task function. - result = q.enqueue(task, task_id, job_timeout=3600) + #result = q.enqueue(task, task_id, job_timeout=3600) + + http = urllib3.PoolManager() + + try: + + #@app.route('/exec//') + + r = http.request('GET', 'http://127.0.0.1:1337/exec/{}/{}'.format(config_task.api_key, task_id)) + + #print('http://127.0.0.1:1337/exec/{}/{}'.format(task_id, pastafari2_api_key)) + + if r.status!=200: + self.error=True + self.txt_error="Cannot access to task server: Error "+str(r.status) + return False + + else: + resp=json.loads(r.data.decode('utf-8')) + + if resp['error']: + self.error=True + self.txt_error=resp['message'] + logtask=LogTask(self.connection) + logtask.insert(resp) + return False + + except urllib3.exceptions.MaxRetryError: + self.error=True + self.txt_error='Cannot connect to the task server, check if is up' + return False return True -# Function used in rq worker for exec the ssh task. +# Function for send task to task server + +def send_task_to_server(task_id): + + http = urllib3.PoolManager() + + try: + + #@app.route('/exec//') + + r = http.request('GET', 'http://127.0.0.1:1337/exec/{}/{}'.format(config_task.api_key, task_id)) + + #print('http://127.0.0.1:1337/exec/{}/{}'.format(task_id, pastafari2_api_key)) + + if r.status!=200: + self.error=True + self.txt_error="Cannot access to task server: Error "+str(r.status) + return False + + else: + resp=json.loads(r.data.decode('utf-8')) + + if resp['error']: + self.error=True + self.txt_error=resp['message'] + logtask=LogTask(self.connection) + logtask.insert(resp) + return False + + except urllib3.exceptions.MaxRetryError: + self.error=True + self.txt_error='Cannot connect to the task server, check if is up' + return False + + return True + +# Function used in rq worker for exec the ssh task in rq. def task(task_id): diff --git a/scripts/system/updates.py b/scripts/system/updates.py index 6ce905f..1e6de0a 100644 --- a/scripts/system/updates.py +++ b/scripts/system/updates.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 -u +#!/opt/pythonenv/bin/python3 -u # A script for install pzoo user @@ -17,7 +17,7 @@ if linux_distro=='arch': if call("sudo pacman -Syu --noconfirm", shell=True) > 0: print('Error, cannot upgrade server...') exit(1) -elif linux_distro=='debian': +elif linux_distro=='debian' or linux_distro=='ubuntu': if call('sudo DEBIAN_FRONTEND="noninteractive" apt-get -y update', shell=True) > 0: print('Error, cannot upgrade server...') @@ -30,7 +30,7 @@ elif linux_distro=='debian': if call(update_command, shell=True) > 0: print('Error, cannot upgrade server...') exit(1) -elif linux_distro=='rocky' or linux_distro=='fedora': +elif linux_distro=='rocky' or linux_distro=='fedora' or linux_distro=='alma' or linux_distro=='centos': if call("sudo dnf upgrade -y", shell=True) > 0: print('Error, cannot upgrade server...') diff --git a/servers/launcher.py b/servers/launcher.py new file mode 100644 index 0000000..542daff --- /dev/null +++ b/servers/launcher.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 + +import sys, os + +sys.path.insert(0, os.path.realpath(os.path.dirname(__file__))+'/../../../') +os.chdir(os.path.realpath(os.path.dirname(__file__))+'/../../../') + +import argparse +import json +from paramecio2.libraries.db.webmodel import WebModel +from modules.pastafari2.models import tasks +from modules.pastafari2.models.pastafari2 import ServerDbTask +from modules.pastafari2.libraries.task import Task +from settings import config +from modules.pastafari2.libraries.configtask import config_task +from multiprocessing.pool import Pool +import importlib +from paramecio2.libraries.db.sqlalchemy import SqlClass +SqlClass.disable_pool=True + +num_tasks=10 + +if hasattr(config, 'pastafari_num_tasks'): + num_tasks=config.pastafari_num_tasks + +def start(cli_args=None): + + parser = argparse.ArgumentParser(description='A daemon used for make a task in a server.The results are saved in a sql database using task class') + parser.add_argument('--task_id', help='The task to execute', required=True) + + args = parser.parse_args(cli_args) + + task_id=int(args.task_id) + + conn=WebModel.connection() + + task_model=tasks.Task(conn) + + logtask=tasks.LogTask(conn) + + arr_task=task_model.select_a_row(task_id) + + if arr_task: + + if (arr_task['user']!='' or arr_task['password']!='') and arr_task['path']!='': + + config_task.remote_user=arr_task['user'] + config_task.remote_password=arr_task['password'] + config_task.remote_path=arr_task['path'] + + task_model.create_forms() + task_model.reset_require() + + task_model.set_conditions('WHERE id=%s', [task_id]) + + task_model.update({'password': ''}) + + """ + if not commands_to_execute: + print('Error: no task files') + exit(1) + """ + server_model=ServerDbTask(conn) + + if not arr_task['is_parent']: + + #task=generate_task(arr_task, conn, task_id) + + remote_user=arr_task['user'] + remote_password=arr_task['password'] + private_key=arr_task['ssh_key_priv'] + password_key=arr_task.get('ssh_key_password', '') + + taskmod=importlib.import_module(arr_task.get('path', '')) + + ssh_task=taskmod.ServerTask(arr_task['server'], conn, remote_user=remote_user, remote_password=remote_password, private_key=private_key, password_key=password_key, remote_path='pastafari2', task_id=task_id, data=json.loads(arr_task['data'])) + + ssh_task.exec() + + + else: + + # Select the servers and make all tasks asynchronous + + #server_model.set_conditions(arr_task['where_sql_server'], []) + + #server_model.yes_reset_conditions=False + + c=task_model.set_conditions('WHERE parent_id=%s', [task_id]).select_count() + #with conn.query('select count(*) from task where parent_id=%s', [task_id]) as cursor: + + + #Update task with number of servers + + task_model.set_conditions('WHERE id=%s', [task_id]) + + task_model.reset_require() + + task_model.valid_fields=['num_servers'] + + task_model.update({'num_servers': c}) + + z=0 + + while z0: + connection=WebModel.connection() + + if executable=='launcher.py': + + logtask=tasks.LogTask(connection) + task=tasks.Task(connection) + """ + else: + logtask=tasks.HostLogTask(connection) + task=tasks.HostTask(connection) + """ + + logtask.create_forms() + task.create_forms() + + + line=proc.stdout.read().decode('utf-8') + line_error=proc.stderr.read().decode('utf-8') + logtask.insert({'task_id': task_id, 'progress': 100, 'message': 'Error executing '+executable+': '+str(line)+"\n"+str(line_error), 'error': 1, 'status': 1}) + #Status die + task.set_conditions('where id=%s', [task_id]) + task.reset_require() + task.update({'status': 1, 'error': 1}) + + connection.close() + +app=Flask(__name__) + +@app.route('/') +def home(): + + response={'error': 1, 'code_error': 1, 'message': 'Nothing to see here...', 'progress' : 100} + + return response + +@app.route('/exec//') +def index(api_key, task_id): + + connection=WebModel.connection() + + # Get the task to execute. + executable='launcher.py' + + if api_key==config_task.api_key: + + + task=tasks.Task(connection) + logtask=tasks.LogTask(connection) + + logtask.create_forms() + + arr_task=task.select_a_row(task_id) + + if arr_task: + + # Add to queue + + greenlet = gevent.spawn( execute_script, task_id, executable) + + # Close the connection + + response={'error': 0, 'code_error': 0, 'message': 'Begin task with id '+str(task_id), 'progress' : 0} + connection.close() + return response + else: + + response={'error': 1, 'code_error': 1, 'message': 'Doesnt exists task with id '+str(task_id), 'progress' : 100, 'status': 1} + connection.close() + return response + + else: + + response={'error': 1, 'code_error': 1, 'message': 'No permission for make tasks', 'progress' : 100, 'status': 1} + connection.close() + return response + #logtask.insert({}) + +#app = application = default_app() + +# Init the task servers + +def run_app(app): + #run(app=app, host=pastafari_host, port=pastafari_port, debug=config.debug, server='gevent', reloader=config.reloader) + print('Init task server...') + http_server=WSGIServer((pastafari_host, pastafari_port), app) + http_server.serve_forever() + +if __name__=='__main__': + + run_app(app) +