diff options
Diffstat (limited to 'meta/lib/oeqa/core/threaded.py')
-rw-r--r-- | meta/lib/oeqa/core/threaded.py | 275 |
1 files changed, 0 insertions, 275 deletions
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py deleted file mode 100644 index 2cafe03a21..0000000000 --- a/meta/lib/oeqa/core/threaded.py +++ /dev/null | |||
@@ -1,275 +0,0 @@ | |||
1 | # Copyright (C) 2017 Intel Corporation | ||
2 | # Released under the MIT license (see COPYING.MIT) | ||
3 | |||
4 | import threading | ||
5 | import multiprocessing | ||
6 | import queue | ||
7 | import time | ||
8 | |||
9 | from unittest.suite import TestSuite | ||
10 | |||
11 | from oeqa.core.loader import OETestLoader | ||
12 | from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner | ||
13 | from oeqa.core.context import OETestContext | ||
14 | |||
15 | class OETestLoaderThreaded(OETestLoader): | ||
16 | def __init__(self, tc, module_paths, modules, tests, modules_required, | ||
17 | filters, process_num=0, *args, **kwargs): | ||
18 | super(OETestLoaderThreaded, self).__init__(tc, module_paths, modules, | ||
19 | tests, modules_required, filters, *args, **kwargs) | ||
20 | |||
21 | self.process_num = process_num | ||
22 | |||
23 | def discover(self): | ||
24 | suite = super(OETestLoaderThreaded, self).discover() | ||
25 | |||
26 | if self.process_num <= 0: | ||
27 | self.process_num = min(multiprocessing.cpu_count(), | ||
28 | len(suite._tests)) | ||
29 | |||
30 | suites = [] | ||
31 | for _ in range(self.process_num): | ||
32 | suites.append(self.suiteClass()) | ||
33 | |||
34 | def _search_for_module_idx(suites, case): | ||
35 | """ | ||
36 | Cases in the same module needs to be run | ||
37 | in the same thread because PyUnit keeps track | ||
38 | of setUp{Module, Class,} and tearDown{Module, Class,}. | ||
39 | """ | ||
40 | |||
41 | for idx in range(self.process_num): | ||
42 | suite = suites[idx] | ||
43 | for c in suite._tests: | ||
44 | if case.__module__ == c.__module__: | ||
45 | return idx | ||
46 | |||
47 | return -1 | ||
48 | |||
49 | def _search_for_depend_idx(suites, depends): | ||
50 | """ | ||
51 | Dependency cases needs to be run in the same | ||
52 | thread, because OEQA framework look at the state | ||
53 | of dependant test to figure out if skip or not. | ||
54 | """ | ||
55 | |||
56 | for idx in range(self.process_num): | ||
57 | suite = suites[idx] | ||
58 | |||
59 | for case in suite._tests: | ||
60 | if case.id() in depends: | ||
61 | return idx | ||
62 | return -1 | ||
63 | |||
64 | def _get_best_idx(suites): | ||
65 | sizes = [len(suite._tests) for suite in suites] | ||
66 | return sizes.index(min(sizes)) | ||
67 | |||
68 | def _fill_suites(suite): | ||
69 | idx = -1 | ||
70 | for case in suite: | ||
71 | if isinstance(case, TestSuite): | ||
72 | _fill_suites(case) | ||
73 | else: | ||
74 | idx = _search_for_module_idx(suites, case) | ||
75 | |||
76 | depends = {} | ||
77 | if 'depends' in self.tc._registry: | ||
78 | depends = self.tc._registry['depends'] | ||
79 | |||
80 | if idx == -1 and case.id() in depends: | ||
81 | case_depends = depends[case.id()] | ||
82 | idx = _search_for_depend_idx(suites, case_depends) | ||
83 | |||
84 | if idx == -1: | ||
85 | idx = _get_best_idx(suites) | ||
86 | |||
87 | suites[idx].addTest(case) | ||
88 | _fill_suites(suite) | ||
89 | |||
90 | suites_tmp = suites | ||
91 | suites = [] | ||
92 | for suite in suites_tmp: | ||
93 | if len(suite._tests) > 0: | ||
94 | suites.append(suite) | ||
95 | |||
96 | return suites | ||
97 | |||
98 | class OEStreamLoggerThreaded(OEStreamLogger): | ||
99 | _lock = threading.Lock() | ||
100 | buffers = {} | ||
101 | |||
102 | def write(self, msg): | ||
103 | tid = threading.get_ident() | ||
104 | |||
105 | if not tid in self.buffers: | ||
106 | self.buffers[tid] = "" | ||
107 | |||
108 | if msg: | ||
109 | self.buffers[tid] += msg | ||
110 | |||
111 | def finish(self): | ||
112 | tid = threading.get_ident() | ||
113 | |||
114 | self._lock.acquire() | ||
115 | self.logger.info('THREAD: %d' % tid) | ||
116 | self.logger.info('-' * 70) | ||
117 | for line in self.buffers[tid].split('\n'): | ||
118 | self.logger.info(line) | ||
119 | self._lock.release() | ||
120 | |||
121 | class OETestResultThreadedInternal(OETestResult): | ||
122 | def _tc_map_results(self): | ||
123 | tid = threading.get_ident() | ||
124 | |||
125 | # PyUnit generates a result for every test module run, test | ||
126 | # if the thread already has an entry to avoid lose the previous | ||
127 | # test module results. | ||
128 | if not tid in self.tc._results: | ||
129 | self.tc._results[tid] = {} | ||
130 | self.tc._results[tid]['failures'] = self.failures | ||
131 | self.tc._results[tid]['errors'] = self.errors | ||
132 | self.tc._results[tid]['skipped'] = self.skipped | ||
133 | self.tc._results[tid]['expectedFailures'] = self.expectedFailures | ||
134 | |||
135 | class OETestResultThreaded(object): | ||
136 | _results = {} | ||
137 | _lock = threading.Lock() | ||
138 | |||
139 | def __init__(self, tc): | ||
140 | self.tc = tc | ||
141 | |||
142 | def _fill_tc_results(self): | ||
143 | tids = list(self.tc._results.keys()) | ||
144 | fields = ['failures', 'errors', 'skipped', 'expectedFailures'] | ||
145 | |||
146 | for tid in tids: | ||
147 | result = self.tc._results[tid] | ||
148 | for field in fields: | ||
149 | if not field in self.tc._results: | ||
150 | self.tc._results[field] = [] | ||
151 | self.tc._results[field].extend(result[field]) | ||
152 | |||
153 | def addResult(self, result, run_start_time, run_end_time): | ||
154 | tid = threading.get_ident() | ||
155 | |||
156 | self._lock.acquire() | ||
157 | self._results[tid] = {} | ||
158 | self._results[tid]['result'] = result | ||
159 | self._results[tid]['run_start_time'] = run_start_time | ||
160 | self._results[tid]['run_end_time'] = run_end_time | ||
161 | self._results[tid]['result'] = result | ||
162 | self._lock.release() | ||
163 | |||
164 | def wasSuccessful(self): | ||
165 | wasSuccessful = True | ||
166 | for tid in self._results.keys(): | ||
167 | wasSuccessful = wasSuccessful and \ | ||
168 | self._results[tid]['result'].wasSuccessful() | ||
169 | return wasSuccessful | ||
170 | |||
171 | def stop(self): | ||
172 | for tid in self._results.keys(): | ||
173 | self._results[tid]['result'].stop() | ||
174 | |||
175 | def logSummary(self, component, context_msg=''): | ||
176 | elapsed_time = (self.tc._run_end_time - self.tc._run_start_time) | ||
177 | |||
178 | self.tc.logger.info("SUMMARY:") | ||
179 | self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component, | ||
180 | context_msg, len(self.tc._registry['cases']), elapsed_time)) | ||
181 | if self.wasSuccessful(): | ||
182 | msg = "%s - OK - All required tests passed" % component | ||
183 | else: | ||
184 | msg = "%s - FAIL - Required tests failed" % component | ||
185 | self.tc.logger.info(msg) | ||
186 | |||
187 | def logDetails(self): | ||
188 | if list(self._results): | ||
189 | tid = list(self._results)[0] | ||
190 | result = self._results[tid]['result'] | ||
191 | result.logDetails() | ||
192 | |||
193 | class _Worker(threading.Thread): | ||
194 | """Thread executing tasks from a given tasks queue""" | ||
195 | def __init__(self, tasks, result, stream): | ||
196 | threading.Thread.__init__(self) | ||
197 | self.tasks = tasks | ||
198 | |||
199 | self.result = result | ||
200 | self.stream = stream | ||
201 | |||
202 | def run(self): | ||
203 | while True: | ||
204 | try: | ||
205 | func, args, kargs = self.tasks.get(block=False) | ||
206 | except queue.Empty: | ||
207 | break | ||
208 | |||
209 | try: | ||
210 | run_start_time = time.time() | ||
211 | rc = func(*args, **kargs) | ||
212 | run_end_time = time.time() | ||
213 | self.result.addResult(rc, run_start_time, run_end_time) | ||
214 | self.stream.finish() | ||
215 | except Exception as e: | ||
216 | print(e) | ||
217 | finally: | ||
218 | self.tasks.task_done() | ||
219 | |||
220 | class _ThreadedPool: | ||
221 | """Pool of threads consuming tasks from a queue""" | ||
222 | def __init__(self, num_workers, num_tasks, stream=None, result=None): | ||
223 | self.tasks = queue.Queue(num_tasks) | ||
224 | self.workers = [] | ||
225 | |||
226 | for _ in range(num_workers): | ||
227 | worker = _Worker(self.tasks, result, stream) | ||
228 | self.workers.append(worker) | ||
229 | |||
230 | def start(self): | ||
231 | for worker in self.workers: | ||
232 | worker.start() | ||
233 | |||
234 | def add_task(self, func, *args, **kargs): | ||
235 | """Add a task to the queue""" | ||
236 | self.tasks.put((func, args, kargs)) | ||
237 | |||
238 | def wait_completion(self): | ||
239 | """Wait for completion of all the tasks in the queue""" | ||
240 | self.tasks.join() | ||
241 | for worker in self.workers: | ||
242 | worker.join() | ||
243 | |||
244 | class OETestRunnerThreaded(OETestRunner): | ||
245 | streamLoggerClass = OEStreamLoggerThreaded | ||
246 | |||
247 | def __init__(self, tc, *args, **kwargs): | ||
248 | super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs) | ||
249 | self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__ | ||
250 | |||
251 | def run(self, suites): | ||
252 | result = OETestResultThreaded(self.tc) | ||
253 | |||
254 | pool = _ThreadedPool(len(suites), len(suites), stream=self.stream, | ||
255 | result=result) | ||
256 | for s in suites: | ||
257 | pool.add_task(super(OETestRunnerThreaded, self).run, s) | ||
258 | pool.start() | ||
259 | pool.wait_completion() | ||
260 | result._fill_tc_results() | ||
261 | |||
262 | return result | ||
263 | |||
264 | class OETestContextThreaded(OETestContext): | ||
265 | loaderClass = OETestLoaderThreaded | ||
266 | runnerClass = OETestRunnerThreaded | ||
267 | |||
268 | def loadTests(self, module_paths, modules=[], tests=[], | ||
269 | modules_manifest="", modules_required=[], filters={}, process_num=0): | ||
270 | if modules_manifest: | ||
271 | modules = self._read_modules_from_manifest(modules_manifest) | ||
272 | |||
273 | self.loader = self.loaderClass(self, module_paths, modules, tests, | ||
274 | modules_required, filters, process_num) | ||
275 | self.suites = self.loader.discover() | ||