pastafari2/models/tasks.py

306 lines
13 KiB
Python

from paramecio2.libraries.db.webmodel import WebModel
from paramecio2.libraries.db import corefields
from paramecio2.libraries.db.extrafields.ipfield import IpField
from paramecio2.libraries.db.extrafields.arrayfield import ArrayField
from paramecio2.libraries.db.extrafields.parentfield import ParentField
from paramecio2.libraries.db.extrafields.datefield import DateField
from paramecio2.libraries.db.extrafields.dictfield import DictField
from paramecio2.libraries.i18n import I18n
import urllib3
try:
import ujson as json
except:
import json
from redis import Redis
from rq import Queue
import importlib
import traceback
from modules.monit.models.monit import LonelyIpField
class Task(WebModel):
def __init__(self, connection):
super().__init__(connection)
self.register(corefields.CharField('name_task'), True)
self.register(corefields.CharField('description_task'), True)
self.register(corefields.CharField('codename_task'), True)
self.register(corefields.CharField('path'))
self.register(IpField('server'))
self.register(corefields.CharField('hostname'))
self.register(corefields.CharField('where_sql_server'), 2000)
self.register(corefields.CharField('user'))
self.register(corefields.CharField('password'), 2000)
self.register(corefields.CharField('user_path'))
self.register(corefields.CharField('os_codename'))
self.register(corefields.CharField('url_return'))
self.register(corefields.CharField('ssh_user'))
self.register(corefields.CharField('ssh_key_pub'))
self.register(corefields.CharField('ssh_key_priv'))
self.register(corefields.CharField('ssh_key_password'))
self.register(DictField('data', corefields.CharField('')))
self.register(corefields.IntegerField('num_servers'))
self.register(corefields.BooleanField('is_parent'))
self.register(ParentField('parent_id', size=11, required=False, field_name='name_task'))
self.fields['where_sql_server'].escape=True
self.txt_error=''
def run_task(self, server, path, name_task, codename_task, description_task, data={}, user='', password='', where_sql_server='', url='', ssh_key_priv='', ssh_key_password='', send_task=True):
logtask=LogTask(self.connection)
#servers=Server(self.connection)
self.safe_query()
logtask.safe_query()
parent_id=None
yes_parent=False
"""
if type(server).__name__=='list':
server_list=server
server=None
yes_parent=True
"""
if self.insert({'name_task': name_task,'description_task': description_task, 'url_return': url, 'server': server, 'where_sql_server': where_sql_server, 'data': data , 'user': user, 'password': password, 'path': path, 'where_sql_server' : where_sql_server, 'ssh_key_priv': ssh_key_priv, 'ssh_key_password': ssh_key_password}):
task_id=self.insert_id()
parent_id=task_id
self.task_id=task_id
arr_ids=[]
if where_sql_server!='':
yes_parent=True
"""
if yes_parent:
for server in server_list:
self.insert({'name_task': name_task,'description_task': description_task, 'url_return': url, 'server': server, 'where_sql_server': '', 'data': data , 'user': user, 'password': password, 'path': path, 'where_sql_server' : where_sql_server, 'ssh_key_priv': ssh_key_priv, 'ssh_key_password': ssh_key_password, 'parent_id': parent_id})
arr_ids.append(self.insert_id())
"""
if send_task:
"""
http = urllib3.PoolManager()
try:
r = http.request('GET', 'http://127.0.0.1:1337', fields={'task_id': task_id, 'api_key': api_key})
if r.status!=200:
self.error=True
self.txt_error="Cannot access to task server: Error "+str(r.status)
return False
else:
resp=json.loads(r.data.decode('utf-8'))
if resp['error']:
self.error=True
self.txt_error=resp['message']
logtask.insert(resp)
return False
except urllib3.exceptions.MaxRetryError:
self.txt_error='Cannot connect to the task server, check if is up'
return False
"""
return self.send_task(task_id, yes_parent, where_sql_server)
"""
q = Queue(connection=Redis(host=redis_host, port=redis_port, password=redis_password))
try:
if where_sql_server=='':
result = q.enqueue('modules.pastafari.servers.server_rq.send_pastafari_task', task_id, server, job_timeout=rq_timeout)
else:
for arr_server in servers.set_conditions(where_sql_server, []).select_to_array(['ip']):
result=q.enqueue('modules.pastafari.servers.server_rq.send_pastafari_task', task_id, arr_server['ip'], job_timeout=rq_timeout)
except:
self.error=True
self.txt_error=traceback.format_exc()
return False
"""
return True
else:
self.error=True
self.txt_error="Cannot insert the task"
return False
def send_task(self, task_id, yes_parent=False, where_sql_server=''):
"""
http = urllib3.PoolManager()
try:
r = http.request('GET', 'http://127.0.0.1:1337', fields={'task_id': task_id, 'api_key': api_key})
if r.status!=200:
self.error=True
self.txt_error="Cannot access to task server: Error "+str(r.status)
return False
else:
resp=json.loads(r.data.decode('utf-8'))
if resp['error']:
self.error=True
self.txt_error=resp['message']
logtask.insert(resp)
return False
except urllib3.exceptions.MaxRetryError:
self.error=True
self.txt_error='Cannot connect to the task server, check if is up'
return False
"""
q = Queue(connection=Redis())
if yes_parent:
arr_task=self.select_a_row(task_id)
#If where sql, send multiple tasks.
sql_insert_values=[]
sql_insert='insert into task (`name_task`, `description_task`, `url_return`, `server`, `hostname`, `data`, `user`, `password`, `path`, `ssh_key_priv`, `ssh_key_password`, `parent_id`) VALUES '
with self.query('select id, hostname from serverdbtask '+where_sql_server) as cursor:
for row in cursor:
arr_task['server']=row['hostname']
sql_insert_values.append("('"+arr_task['name_task']+"', '"+arr_task['description_task']+"', '"+arr_task['url_return']+"', '"+arr_task['server']+"', '"+arr_task['server']+"', '"+arr_task['data']+"', '"+arr_task['user']+"', '"+arr_task['password']+"', '"+arr_task['path']+"', '"+arr_task['ssh_key_priv']+"', '"+arr_task['ssh_key_password']+"', '"+str(task_id)+"')")
#self.insert({'name_task': name_task,'description_task': description_task, 'url_return': url, 'server': server, 'where_sql_server': where_sql_server, 'data': data , 'user': user, 'password': password, 'path': path, 'where_sql_server' : where_sql_server, 'ssh_key_priv': ssh_key_priv, 'ssh_key_password': ssh_key_password})
pass
final_sql=sql_insert+", ".join(sql_insert_values)
self.query(final_sql)
with self.query('select id from task WHERE parent_id=%s', [task_id]) as cursor:
for row in cursor:
#print(row)
result = q.enqueue(task, row['id'], job_timeout=3600)
else:
#Enqueue task function.
result = q.enqueue(task, task_id, job_timeout=3600)
return True
# Function used in rq worker for exec the ssh task.
def task(task_id):
conn=WebModel.connection()
task=Task(conn)
logtask=LogTask(conn)
logtask.safe_query()
arr_task=task.select_a_row(task_id)
if not arr_task:
self.logtask.insert({'task_id': task_id, 'progress': 100, 'message': I18n.lang('pastafari', 'error_task_not_exists', 'Error: task not exists'), 'status': 1, 'error': 1, 'server': ''})
conn.close()
return False
server=arr_task['server']
try:
ssh_task=importlib.import_module(arr_task['path'])
"""
| name_task | varchar(255) | NO | | | |
| description_task | varchar(255) | NO | | | |
| codename_task | varchar(255) | NO | | | |
| path | varchar(255) | NO | | | |
| server | varchar(255) | NO | | | |
| hostname | varchar(255) | NO | | | |
| where_sql_server | varchar(255) | NO | | | |
| user | varchar(255) | NO | | | |
| password | varchar(255) | NO | | | |
| user_path | varchar(255) | NO | | | |
| os_codename | varchar(255) | NO | | | |
| url_return | varchar(255) | NO | | | |
| data | text | NO | | NULL | |
| ssh_key_pub | varchar(255) | NO | | | |
| ssh_key_priv | varchar(255) | NO | | | |
| ssh_user | varchar(255) | NO | | | |
| num_servers | int(11) | NO | | 0 | |
+------------------+--------------+------+-----+---------+----------------+
"""
remote_user=arr_task['user']
remote_password=arr_task['password']
private_key=arr_task['ssh_key_priv']
password_key=arr_task.get('ssh_key_password', '')
final_task=ssh_task.ServerTask(server, conn, remote_user=remote_user, remote_password=remote_password, private_key=private_key, password_key=password_key, remote_path='pastafari2', task_id=task_id, data=json.loads(arr_task['data']))
final_task.exec()
except:
if conn==None:
conn=WebModel.connection()
logtask.sqlclass=conn
logtask.insert({'task_id': task_id, 'progress': 100, 'message': I18n.lang('pastafari', 'error_in_task', 'Error: error in task ')+traceback.format_exc(), 'error': 1, 'status': 1, 'server': server})
conn.close()
return False
conn.close()
return True
class LogTask(WebModel):
def __init__(self, connection):
super().__init__(connection)
self.register(DateField('path'))
self.register(corefields.ForeignKeyField('task_id', Task(connection), size=11, required=False, identifier_field='id', named_field="name_task"))
self.register(IpField('server'))
self.register(corefields.DoubleField('progress'))
self.register(corefields.BooleanField('no_progress'))
self.register(corefields.TextField('message'))
self.register(corefields.BooleanField('error'))
self.register(corefields.BooleanField('status'))
self.register(ArrayField('data', corefields.CharField('data')))
self.register(corefields.BooleanField('code_error'))
# For grouping
class TaskDone(WebModel):
def __init__(self, connection):
super().__init__(connection)
self.register(corefields.CharField('name_task'), True)
self.register(LonelyIpField('ip'), True)