Source code for rpyc_docker.manager
import Queue,threading,time,os,traceback,subprocess,rpyc,logging
from docker.utils import create_host_config
from subprocess import check_output,CalledProcessError
logger = logging.getLogger("rpyc_docker")
logger.setLevel(logging.INFO)
[docs]class Manager(threading.Thread):
def __init__(self,argQueue,numWorkers,maxTime = 300):
"""
argQueue cls [args] {kwArgs}
"""
threading.Thread.__init__(self)
self.running = True
self._traceback = None
self.argQueue = argQueue
self.numWorkers = numWorkers
self.maxTime = maxTime
self.workers = []
self._results = []
self._errors = []
[docs] def run(self):
try:
self.__run()
except Exception:
self._traceback = traceback.format_exc()
self.running = False
[docs] def stop(self):
"""
stops the manager, there might be a delay before it loops throught the workers
"""
self.running = False
def __run(self):
while self.running:
runningWorkers = []
doneWorkers = []
for worker,workerCls,args,kwArgs in self.workers :
if worker.upTime > self.maxTime :
self._errors.append(["timeout",workerCls,args,kwArgs])
doneWorkers.append(worker)
elif worker.status == "running" :
runningWorkers.append([worker,workerCls,args,kwArgs])
elif worker.status == "done" :
self._results.append([worker.result,workerCls,args,kwArgs])
doneWorkers.append(worker)
elif worker.status == "error" :
self._errors.append([worker.traceback,workerCls,args,kwArgs])
doneWorkers.append(worker)
else :
pass
for worker in doneWorkers :
worker.teardown()
self.workers = runningWorkers
for n in range(self.numWorkers - len(self.workers)) :
try:
workerCls,args,kwArgs = self.argQueue.get_nowait()
logger.info("%s,%s,%s" % (workerCls,args,kwArgs))
worker = workerCls(*args,**kwArgs)
self.workers.append([worker,workerCls,args,kwArgs])
worker.start()
except Queue.Empty:
break
if len(self.workers) == 0 :
self.running = False
time.sleep(1)
[docs] def report(self):
"""
generates report of the status of the manager
:return: report
:rtype: string
"""
result = [
"WorkerManager Report",
"Running workers %d " % len(self.workers),
"Results %d " % len(self._results),
"Errors %d " % len(self._errors),
"Queue Size %d" % self.argQueue.qsize()
]
if self._traceback:
result.append("TRACEBACK ERROR")
result.append(self._traceback)
return "\n".join(result)
@property
def managerTraceback(self):
"""
shows traceback of manager if it has crashed
:return: traceback of manager
:rtype: string
"""
[docs] def get_error(self,n):
"""
returns the traceback of a worker if it has crashed
:param n: worker number
:type n: int
:return: traceback
:rtype: str:
"""
return self._errors[n][0]
[docs] def get_result(self,n):
"""
returns the result of a worker if it has finished
:param n: worker number
:type n: int
:return: result
:rtype: object:
"""
return self._results[n][0]