diff options
| -rw-r--r-- | meta/lib/oeqa/core/threaded.py | 75 |
1 files changed, 74 insertions, 1 deletions
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py index f216685f46..81df340366 100644 --- a/meta/lib/oeqa/core/threaded.py +++ b/meta/lib/oeqa/core/threaded.py | |||
| @@ -3,11 +3,13 @@ | |||
| 3 | 3 | ||
| 4 | import threading | 4 | import threading |
| 5 | import multiprocessing | 5 | import multiprocessing |
| 6 | import queue | ||
| 7 | import time | ||
| 6 | 8 | ||
| 7 | from unittest.suite import TestSuite | 9 | from unittest.suite import TestSuite |
| 8 | 10 | ||
| 9 | from oeqa.core.loader import OETestLoader | 11 | from oeqa.core.loader import OETestLoader |
| 10 | from oeqa.core.runner import OEStreamLogger, OETestResult | 12 | from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner |
| 11 | 13 | ||
| 12 | class OETestLoaderThreaded(OETestLoader): | 14 | class OETestLoaderThreaded(OETestLoader): |
| 13 | def __init__(self, tc, module_paths, modules, tests, modules_required, | 15 | def __init__(self, tc, module_paths, modules, tests, modules_required, |
| @@ -185,3 +187,74 @@ class OETestResultThreaded(object): | |||
| 185 | tid = list(self._results)[0] | 187 | tid = list(self._results)[0] |
| 186 | result = self._results[tid]['result'] | 188 | result = self._results[tid]['result'] |
| 187 | result.logDetails() | 189 | result.logDetails() |
| 190 | |||
| 191 | class _Worker(threading.Thread): | ||
| 192 | """Thread executing tasks from a given tasks queue""" | ||
| 193 | def __init__(self, tasks, result, stream): | ||
| 194 | threading.Thread.__init__(self) | ||
| 195 | self.tasks = tasks | ||
| 196 | |||
| 197 | self.result = result | ||
| 198 | self.stream = stream | ||
| 199 | |||
| 200 | def run(self): | ||
| 201 | while True: | ||
| 202 | try: | ||
| 203 | func, args, kargs = self.tasks.get(block=False) | ||
| 204 | except queue.Empty: | ||
| 205 | break | ||
| 206 | |||
| 207 | try: | ||
| 208 | run_start_time = time.time() | ||
| 209 | rc = func(*args, **kargs) | ||
| 210 | run_end_time = time.time() | ||
| 211 | self.result.addResult(rc, run_start_time, run_end_time) | ||
| 212 | self.stream.finish() | ||
| 213 | except Exception as e: | ||
| 214 | print(e) | ||
| 215 | finally: | ||
| 216 | self.tasks.task_done() | ||
| 217 | |||
| 218 | class _ThreadedPool: | ||
| 219 | """Pool of threads consuming tasks from a queue""" | ||
| 220 | def __init__(self, num_workers, num_tasks, stream=None, result=None): | ||
| 221 | self.tasks = queue.Queue(num_tasks) | ||
| 222 | self.workers = [] | ||
| 223 | |||
| 224 | for _ in range(num_workers): | ||
| 225 | worker = _Worker(self.tasks, result, stream) | ||
| 226 | self.workers.append(worker) | ||
| 227 | |||
| 228 | def start(self): | ||
| 229 | for worker in self.workers: | ||
| 230 | worker.start() | ||
| 231 | |||
| 232 | def add_task(self, func, *args, **kargs): | ||
| 233 | """Add a task to the queue""" | ||
| 234 | self.tasks.put((func, args, kargs)) | ||
| 235 | |||
| 236 | def wait_completion(self): | ||
| 237 | """Wait for completion of all the tasks in the queue""" | ||
| 238 | self.tasks.join() | ||
| 239 | for worker in self.workers: | ||
| 240 | worker.join() | ||
| 241 | |||
| 242 | class OETestRunnerThreaded(OETestRunner): | ||
| 243 | streamLoggerClass = OEStreamLoggerThreaded | ||
| 244 | |||
| 245 | def __init__(self, tc, *args, **kwargs): | ||
| 246 | super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs) | ||
| 247 | self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__ | ||
| 248 | |||
| 249 | def run(self, suites): | ||
| 250 | result = OETestResultThreaded(self.tc) | ||
| 251 | |||
| 252 | pool = _ThreadedPool(len(suites), len(suites), stream=self.stream, | ||
| 253 | result=result) | ||
| 254 | for s in suites: | ||
| 255 | pool.add_task(super(OETestRunnerThreaded, self).run, s) | ||
| 256 | pool.start() | ||
| 257 | pool.wait_completion() | ||
| 258 | result._fill_tc_results() | ||
| 259 | |||
| 260 | return result | ||
