From 6632277986ad20ce0cbcb4432018156dfa3ba002 Mon Sep 17 00:00:00 2001 From: Aníbal Limón Date: Fri, 26 May 2017 15:37:37 -0500 Subject: oeqa/core/threaded: Add support of OETestRunnerThreaded MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OETestRunnerThreaded overrides the run method of OETestRunner it recieves a list of suites to be executed by a ThreadPool. The new run method handles the ThreadPool creation and the OETestResultThreaded fill. [YOCTO #11450] (From OE-Core rev: 48b7a407d692e6c49c41b16f2bd11e8c3f47a421) Signed-off-by: Aníbal Limón Signed-off-by: Richard Purdie --- meta/lib/oeqa/core/threaded.py | 75 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) (limited to 'meta') diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py index f216685f46..81df340366 100644 --- a/meta/lib/oeqa/core/threaded.py +++ b/meta/lib/oeqa/core/threaded.py @@ -3,11 +3,13 @@ import threading import multiprocessing +import queue +import time from unittest.suite import TestSuite from oeqa.core.loader import OETestLoader -from oeqa.core.runner import OEStreamLogger, OETestResult +from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner class OETestLoaderThreaded(OETestLoader): def __init__(self, tc, module_paths, modules, tests, modules_required, @@ -185,3 +187,74 @@ class OETestResultThreaded(object): tid = list(self._results)[0] result = self._results[tid]['result'] result.logDetails() + +class _Worker(threading.Thread): + """Thread executing tasks from a given tasks queue""" + def __init__(self, tasks, result, stream): + threading.Thread.__init__(self) + self.tasks = tasks + + self.result = result + self.stream = stream + + def run(self): + while True: + try: + func, args, kargs = self.tasks.get(block=False) + except queue.Empty: + break + + try: + run_start_time = time.time() + rc = func(*args, **kargs) + run_end_time = time.time() + self.result.addResult(rc, run_start_time, run_end_time) + self.stream.finish() + except Exception as e: + print(e) + finally: + self.tasks.task_done() + +class _ThreadedPool: + """Pool of threads consuming tasks from a queue""" + def __init__(self, num_workers, num_tasks, stream=None, result=None): + self.tasks = queue.Queue(num_tasks) + self.workers = [] + + for _ in range(num_workers): + worker = _Worker(self.tasks, result, stream) + self.workers.append(worker) + + def start(self): + for worker in self.workers: + worker.start() + + def add_task(self, func, *args, **kargs): + """Add a task to the queue""" + self.tasks.put((func, args, kargs)) + + def wait_completion(self): + """Wait for completion of all the tasks in the queue""" + self.tasks.join() + for worker in self.workers: + worker.join() + +class OETestRunnerThreaded(OETestRunner): + streamLoggerClass = OEStreamLoggerThreaded + + def __init__(self, tc, *args, **kwargs): + super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs) + self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__ + + def run(self, suites): + result = OETestResultThreaded(self.tc) + + pool = _ThreadedPool(len(suites), len(suites), stream=self.stream, + result=result) + for s in suites: + pool.add_task(super(OETestRunnerThreaded, self).run, s) + pool.start() + pool.wait_completion() + result._fill_tc_results() + + return result -- cgit v1.2.3-54-g00ecf