summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core/threaded.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core/threaded.py')
-rw-r--r--meta/lib/oeqa/core/threaded.py275
1 files changed, 0 insertions, 275 deletions
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
deleted file mode 100644
index 2cafe03a21..0000000000
--- a/meta/lib/oeqa/core/threaded.py
+++ /dev/null
@@ -1,275 +0,0 @@
1# Copyright (C) 2017 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4import threading
5import multiprocessing
6import queue
7import time
8
9from unittest.suite import TestSuite
10
11from oeqa.core.loader import OETestLoader
12from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
13from oeqa.core.context import OETestContext
14
15class OETestLoaderThreaded(OETestLoader):
16 def __init__(self, tc, module_paths, modules, tests, modules_required,
17 filters, process_num=0, *args, **kwargs):
18 super(OETestLoaderThreaded, self).__init__(tc, module_paths, modules,
19 tests, modules_required, filters, *args, **kwargs)
20
21 self.process_num = process_num
22
23 def discover(self):
24 suite = super(OETestLoaderThreaded, self).discover()
25
26 if self.process_num <= 0:
27 self.process_num = min(multiprocessing.cpu_count(),
28 len(suite._tests))
29
30 suites = []
31 for _ in range(self.process_num):
32 suites.append(self.suiteClass())
33
34 def _search_for_module_idx(suites, case):
35 """
36 Cases in the same module needs to be run
37 in the same thread because PyUnit keeps track
38 of setUp{Module, Class,} and tearDown{Module, Class,}.
39 """
40
41 for idx in range(self.process_num):
42 suite = suites[idx]
43 for c in suite._tests:
44 if case.__module__ == c.__module__:
45 return idx
46
47 return -1
48
49 def _search_for_depend_idx(suites, depends):
50 """
51 Dependency cases needs to be run in the same
52 thread, because OEQA framework look at the state
53 of dependant test to figure out if skip or not.
54 """
55
56 for idx in range(self.process_num):
57 suite = suites[idx]
58
59 for case in suite._tests:
60 if case.id() in depends:
61 return idx
62 return -1
63
64 def _get_best_idx(suites):
65 sizes = [len(suite._tests) for suite in suites]
66 return sizes.index(min(sizes))
67
68 def _fill_suites(suite):
69 idx = -1
70 for case in suite:
71 if isinstance(case, TestSuite):
72 _fill_suites(case)
73 else:
74 idx = _search_for_module_idx(suites, case)
75
76 depends = {}
77 if 'depends' in self.tc._registry:
78 depends = self.tc._registry['depends']
79
80 if idx == -1 and case.id() in depends:
81 case_depends = depends[case.id()]
82 idx = _search_for_depend_idx(suites, case_depends)
83
84 if idx == -1:
85 idx = _get_best_idx(suites)
86
87 suites[idx].addTest(case)
88 _fill_suites(suite)
89
90 suites_tmp = suites
91 suites = []
92 for suite in suites_tmp:
93 if len(suite._tests) > 0:
94 suites.append(suite)
95
96 return suites
97
98class OEStreamLoggerThreaded(OEStreamLogger):
99 _lock = threading.Lock()
100 buffers = {}
101
102 def write(self, msg):
103 tid = threading.get_ident()
104
105 if not tid in self.buffers:
106 self.buffers[tid] = ""
107
108 if msg:
109 self.buffers[tid] += msg
110
111 def finish(self):
112 tid = threading.get_ident()
113
114 self._lock.acquire()
115 self.logger.info('THREAD: %d' % tid)
116 self.logger.info('-' * 70)
117 for line in self.buffers[tid].split('\n'):
118 self.logger.info(line)
119 self._lock.release()
120
121class OETestResultThreadedInternal(OETestResult):
122 def _tc_map_results(self):
123 tid = threading.get_ident()
124
125 # PyUnit generates a result for every test module run, test
126 # if the thread already has an entry to avoid lose the previous
127 # test module results.
128 if not tid in self.tc._results:
129 self.tc._results[tid] = {}
130 self.tc._results[tid]['failures'] = self.failures
131 self.tc._results[tid]['errors'] = self.errors
132 self.tc._results[tid]['skipped'] = self.skipped
133 self.tc._results[tid]['expectedFailures'] = self.expectedFailures
134
135class OETestResultThreaded(object):
136 _results = {}
137 _lock = threading.Lock()
138
139 def __init__(self, tc):
140 self.tc = tc
141
142 def _fill_tc_results(self):
143 tids = list(self.tc._results.keys())
144 fields = ['failures', 'errors', 'skipped', 'expectedFailures']
145
146 for tid in tids:
147 result = self.tc._results[tid]
148 for field in fields:
149 if not field in self.tc._results:
150 self.tc._results[field] = []
151 self.tc._results[field].extend(result[field])
152
153 def addResult(self, result, run_start_time, run_end_time):
154 tid = threading.get_ident()
155
156 self._lock.acquire()
157 self._results[tid] = {}
158 self._results[tid]['result'] = result
159 self._results[tid]['run_start_time'] = run_start_time
160 self._results[tid]['run_end_time'] = run_end_time
161 self._results[tid]['result'] = result
162 self._lock.release()
163
164 def wasSuccessful(self):
165 wasSuccessful = True
166 for tid in self._results.keys():
167 wasSuccessful = wasSuccessful and \
168 self._results[tid]['result'].wasSuccessful()
169 return wasSuccessful
170
171 def stop(self):
172 for tid in self._results.keys():
173 self._results[tid]['result'].stop()
174
175 def logSummary(self, component, context_msg=''):
176 elapsed_time = (self.tc._run_end_time - self.tc._run_start_time)
177
178 self.tc.logger.info("SUMMARY:")
179 self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component,
180 context_msg, len(self.tc._registry['cases']), elapsed_time))
181 if self.wasSuccessful():
182 msg = "%s - OK - All required tests passed" % component
183 else:
184 msg = "%s - FAIL - Required tests failed" % component
185 self.tc.logger.info(msg)
186
187 def logDetails(self):
188 if list(self._results):
189 tid = list(self._results)[0]
190 result = self._results[tid]['result']
191 result.logDetails()
192
193class _Worker(threading.Thread):
194 """Thread executing tasks from a given tasks queue"""
195 def __init__(self, tasks, result, stream):
196 threading.Thread.__init__(self)
197 self.tasks = tasks
198
199 self.result = result
200 self.stream = stream
201
202 def run(self):
203 while True:
204 try:
205 func, args, kargs = self.tasks.get(block=False)
206 except queue.Empty:
207 break
208
209 try:
210 run_start_time = time.time()
211 rc = func(*args, **kargs)
212 run_end_time = time.time()
213 self.result.addResult(rc, run_start_time, run_end_time)
214 self.stream.finish()
215 except Exception as e:
216 print(e)
217 finally:
218 self.tasks.task_done()
219
220class _ThreadedPool:
221 """Pool of threads consuming tasks from a queue"""
222 def __init__(self, num_workers, num_tasks, stream=None, result=None):
223 self.tasks = queue.Queue(num_tasks)
224 self.workers = []
225
226 for _ in range(num_workers):
227 worker = _Worker(self.tasks, result, stream)
228 self.workers.append(worker)
229
230 def start(self):
231 for worker in self.workers:
232 worker.start()
233
234 def add_task(self, func, *args, **kargs):
235 """Add a task to the queue"""
236 self.tasks.put((func, args, kargs))
237
238 def wait_completion(self):
239 """Wait for completion of all the tasks in the queue"""
240 self.tasks.join()
241 for worker in self.workers:
242 worker.join()
243
244class OETestRunnerThreaded(OETestRunner):
245 streamLoggerClass = OEStreamLoggerThreaded
246
247 def __init__(self, tc, *args, **kwargs):
248 super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
249 self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
250
251 def run(self, suites):
252 result = OETestResultThreaded(self.tc)
253
254 pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
255 result=result)
256 for s in suites:
257 pool.add_task(super(OETestRunnerThreaded, self).run, s)
258 pool.start()
259 pool.wait_completion()
260 result._fill_tc_results()
261
262 return result
263
264class OETestContextThreaded(OETestContext):
265 loaderClass = OETestLoaderThreaded
266 runnerClass = OETestRunnerThreaded
267
268 def loadTests(self, module_paths, modules=[], tests=[],
269 modules_manifest="", modules_required=[], filters={}, process_num=0):
270 if modules_manifest:
271 modules = self._read_modules_from_manifest(modules_manifest)
272
273 self.loader = self.loaderClass(self, module_paths, modules, tests,
274 modules_required, filters, process_num)
275 self.suites = self.loader.discover()