Skip to content

Commit

Permalink
#124 + worker api and shared object changes
Browse files Browse the repository at this point in the history
  • Loading branch information
z1pti3 committed Jun 27, 2021
1 parent d8f41f8 commit 8c66748
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 13 deletions.
12 changes: 6 additions & 6 deletions core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

import jimi

# Initialize
dbCollectionName = "clusterMembers"
systemIndexes = []

class _clusterMember(jimi.db._document):
systemID = int()
Expand All @@ -22,15 +21,16 @@ class _clusterMember(jimi.db._document):
lastSyncTime = int()
checksum = str()

_dbCollection = jimi.db.db[dbCollectionName]
_dbCollection = jimi.db.db["clusterMembers"]

def new(self,systemID):
self.systemID = systemID
self.master = False
self.acl = { "ids":[ { "accessID":"0","delete": True,"read": True,"write": True } ] }
return super(_clusterMember, self).new()

def sync(self,systemIndexes):
def sync(self):
global systemIndexes
now = time.time()
self.syncCount+=1
self.lastSyncTime = int(now)
Expand Down Expand Up @@ -247,14 +247,14 @@ class _cluster:
lastHandle = 0
clusterMember = None

def handler(self,systemIndexes):
def handler(self):
self.startTime = int(time.time())
self.clusterMember = loadClusterMember()
while not self.stopped:
jimi.audit._audit().add("cluster","poll",{ "systemID" : self.clusterMember.systemID, "master" : self.clusterMember.master, "systemUID" : self.clusterMember.systemUID })
now = int(time.time())
self.lastHandle = now
if not self.clusterMember.sync(systemIndexes):
if not self.clusterMember.sync():
self.stopped = True
# pause
time.sleep(clusterSettings["loopP"])
Expand Down
76 changes: 76 additions & 0 deletions core/workers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from multiprocessing import Process, Queue
import multiprocessing
from re import L
import threading
import time
import uuid
import ctypes
import traceback
import json

import requests

import jimi

workers = None

class _threading(threading.Thread):
def __init__(self, *args, **keywords):
threading.Thread.__init__(self, *args, **keywords)
Expand Down Expand Up @@ -319,3 +325,73 @@ def multiprocessingThreadStart(Q,threadCall,args):
error = e
rc = 1
Q.put((rc,error))

######### --------- API --------- #########
if jimi.api.webServer:
if not jimi.api.webServer.got_first_request:
if jimi.api.webServer.name == "jimi_core":
@jimi.api.webServer.route(jimi.api.base+"worker/", methods=["GET"])
@jimi.auth.adminEndpoint
def getWorkers():
results = []
global workers
workersData = workers.getAll()
for workerData in workersData:
results.append({
"system" : "Cluster System {0}".format(jimi.cluster.getSystemId()),
"name" : workerData.name,
"call" : workerData.call.__name__,
"id" : workerData.id,
"createdTime" : workerData.createdTime,
"startTime" : workerData.startTime,
"endTime" : workerData.endTime,
"duration" : workerData.duration,
"running" : workerData.running
})
apiToken = jimi.auth.generateSystemSession()
headers = { "X-api-token" : apiToken }
for systemIndex in jimi.cluster.systemIndexes:
url = systemIndex["apiAddress"]
apiEndpoint = "worker/"
response = requests.get("{0}{1}{2}".format(url,jimi.api.base,apiEndpoint),headers=headers, timeout=10)
if response.status_code == 200:
jsonResponse = json.loads(response.text)["results"]
for jsonResult in jsonResponse:
jsonResult["system"] = "Cluster System {0}, index {1}".format(jimi.cluster.getSystemId(),systemIndex["systemIndex"])
results += jsonResponse
return { "results" : results }, 200

if jimi.api.webServer.name == "jimi_worker":
@jimi.api.webServer.route(jimi.api.base+"worker/", methods=["GET"])
@jimi.auth.systemEndpoint
def getWorkers():
results = []
global workers
workersData = workers.getAll()
for workerData in workersData:
results.append({
"name" : workerData.name,
"call" : workerData.call.__name__,
"id" : workerData.id,
"createdTime" : workerData.createdTime,
"startTime" : workerData.startTime,
"endTime" : workerData.endTime,
"duration" : workerData.duration,
"running" : workerData.running
})
return { "results" : results }, 200

if jimi.api.webServer.name == "jimi_web":
from flask import Flask, request, render_template

@jimi.api.webServer.route("/taskManager/", methods=["GET"])
def taskManagerPage():
workers = []
apiEndpoint = "worker/"
servers = jimi.cluster.getAll()
for url in servers:
response = jimi.helpers.apiCall("GET",apiEndpoint,token=jimi.api.g.sessionToken,overrideURL=url)
if response.status_code == 200:
workers += json.loads(response.text)["results"]
return render_template("taskManager.html",CSRF=jimi.api.g.sessionData["CSRF"], workers=workers)

15 changes: 8 additions & 7 deletions jimi_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def healthChecker(scheduler):
workerAPISettings = settings.config["api"]["worker"]
api.createServer("jimi_worker")
import jimi
api.startServer(True,host=workerAPISettings["bind"], port=workerAPISettings["startPort"]+systemIndex)
api.startServer(True, host=workerAPISettings["bind"], port=workerAPISettings["startPort"]+systemIndex, threads=1)
logging.info("Index %i booting on system %i",systemIndex,systemId)
logging.info("Starting worker handler")
jimi.workers.workers = jimi.workers.workerHandler()
Expand All @@ -51,6 +51,7 @@ def startProcess(systemIndex):
p.start()
systemIndex["process"] = p
systemIndex["pid"] = p.pid
systemIndex["apiAddress"] = "http://{0}:{1}".format(workerAPISettings["bind"],workerAPISettings["startPort"]+systemIndex["systemIndex"])
logging.debug("Started index %i, PID=%i API=%s:%i",systemIndex["systemIndex"],p.pid,workerAPISettings["bind"],workerAPISettings["startPort"]+systemIndex["systemIndex"])

def healthChecker(cluster,systemIndexes):
Expand Down Expand Up @@ -122,19 +123,19 @@ def healthChecker(cluster,systemIndexes):

# Starting workers
cpuCount = os.cpu_count()
systemIndexes = []
jimi.cluster.systemIndexes = []
logging.debug("Detected %i CPU",cpuCount)
if cpuCount == 1:
logging.info("Selected single cluster mode")
systemIndexes.append({ "systemIndex" : 0 })
jimi.cluster.systemIndexes.append({ "systemIndex" : 0 })
else:
logging.info("Selected multi cluster mode")
for index in range(0,cpuCount):
systemIndexes.append({ "systemIndex" : index })
for systemIndex in systemIndexes:
jimi.cluster.systemIndexes.append({ "systemIndex" : index })
for systemIndex in jimi.cluster.systemIndexes:
startProcess(systemIndex)

cluster = jimi.cluster._cluster()
jimi.workers.workers.new("healthChecker",healthChecker,(cluster,systemIndexes),True,0)
jimi.workers.workers.new("healthChecker",healthChecker,(cluster,jimi.cluster.systemIndexes),True,0)
logging.info("Starting cluster processing")
cluster.handler(systemIndexes)
cluster.handler()
56 changes: 56 additions & 0 deletions web/build/taskManager.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{% extends "main.html" %}

{% block head %}
{{ jimi.jquery() }}
{{ jimi.tables() }}
{% endblock %}

{% block main %}
<table id="orderTableTaskManager" class="table table-sm theme-table" cellspacing="0" width="100%">
<thead class="theme-tableHeader">
<tr>
<th class="th-sm">system</th>
<th class="th-sm">name</th>
<th class="th-sm">call</th>
<th class="th-sm">_id</th>
<th class="th-sm">createdTime</th>
<th class="th-sm">startTime</th>
<th class="th-sm">endTime</th>
<th class="th-sm">duration</th>
<th class="th-sm">running</th>
</tr>
</thead>
<tbody class="theme-tableBody">
{% for worker in workers %}
<tr>
<td>{{ worker['system'] }}</td>
<td>{{ worker['name'] }}</td>
<td>{{ worker['call'] }}</td>
<td>{{ worker['_id'] }}</td>
<td>{{ worker['createdTime'] }}</td>
<td>{{ worker['startTime'] }}</td>
<td>{{ worker['endTime'] }}</td>
<td>{{ worker['duration'] }}</td>
<td>{{ worker['running'] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<script>
$(document).ready(function () {
$('#orderTableTaskManager').DataTable({
"stripeClasses" : [ 'theme-tableRowOdd', 'theme-tableRowEven' ],
fixedHeader: {
header: true,
footer: true
},
"paging" : false,
"order" : [[ 0, "desc" ]],
"autoWidth": false,
});
$('.dataTables_length').addClass('bs-select');
$('.dataTables_filter input[type="search"]').css({'width':'200px'});
$('.dataTables_filter input[type="search"]').addClass("theme-panelTextbox")
});
</script>
{% endblock %}

0 comments on commit 8c66748

Please sign in to comment.