summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-12 11:10:38 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-18 10:18:41 +0100
commit4e4958cba2e083eac20e2770e04bfc7d56d9db42 (patch)
treeb8fb57f9b3a3e37b3a5f095c70c127af9bce82fa /meta/lib/oeqa/core
parentebd97e728ae1bd9442299c871a07a1b3b9f9efdf (diff)
downloadpoky-4e4958cba2e083eac20e2770e04bfc7d56d9db42.tar.gz
oeqa/core/threaded: Remove in favour of using concurrenttests
We have several options for parallel processing in oeqa, parallel execution of modules, threading and mulitple processes for the runners. After much experimentation is appears the most scalable and least invasive approach is multiple processes using concurrenttestsuite from testtools. This means we can drop the current threading code which is only used by the sdk test execution. oeqa/decorator/depends: Remove threading code Revert "oeqa/sdk: Enable usage of OEQA thread mode" This reverts commit adc434c0636b7dea2ef70c8d2c8e61cdb5c703b1. Revert "oeqa/core/tests: Add tests of OEQA Threaded mode" This reverts commit a4eef558c9933eb32413b61ff80a11b999951b40. Revert "oeqa/core/decorator/oetimeout: Add support for OEQA threaded mode" This reverts commit d3d4ba902dee8b19fa1054330cffdf73f9b81fe7. (From OE-Core rev: a98ab5e560e73b6988512fbae5cefe9e42ceed53) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta/lib/oeqa/core')
-rw-r--r--meta/lib/oeqa/core/decorator/depends.py7
-rw-r--r--meta/lib/oeqa/core/decorator/oetimeout.py40
-rw-r--r--meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py12
-rw-r--r--meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py8
-rw-r--r--meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py10
-rw-r--r--meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py12
-rw-r--r--meta/lib/oeqa/core/tests/common.py10
-rwxr-xr-xmeta/lib/oeqa/core/tests/test_decorators.py12
-rwxr-xr-xmeta/lib/oeqa/core/tests/test_loader.py30
-rw-r--r--meta/lib/oeqa/core/threaded.py275
10 files changed, 12 insertions, 404 deletions
diff --git a/meta/lib/oeqa/core/decorator/depends.py b/meta/lib/oeqa/core/decorator/depends.py
index baa04341c7..99eccc1268 100644
--- a/meta/lib/oeqa/core/decorator/depends.py
+++ b/meta/lib/oeqa/core/decorator/depends.py
@@ -3,7 +3,6 @@
3 3
4from unittest import SkipTest 4from unittest import SkipTest
5 5
6from oeqa.core.threaded import OETestRunnerThreaded
7from oeqa.core.exception import OEQADependency 6from oeqa.core.exception import OEQADependency
8 7
9from . import OETestDiscover, registerDecorator 8from . import OETestDiscover, registerDecorator
@@ -64,11 +63,7 @@ def _order_test_case_by_depends(cases, depends):
64 return [cases[case_id] for case_id in cases_ordered] 63 return [cases[case_id] for case_id in cases_ordered]
65 64
66def _skipTestDependency(case, depends): 65def _skipTestDependency(case, depends):
67 if isinstance(case.tc.runner, OETestRunnerThreaded): 66 results = case.tc._results
68 import threading
69 results = case.tc._results[threading.get_ident()]
70 else:
71 results = case.tc._results
72 67
73 skipReasons = ['errors', 'failures', 'skipped'] 68 skipReasons = ['errors', 'failures', 'skipped']
74 69
diff --git a/meta/lib/oeqa/core/decorator/oetimeout.py b/meta/lib/oeqa/core/decorator/oetimeout.py
index f85e7d9792..a247583f7f 100644
--- a/meta/lib/oeqa/core/decorator/oetimeout.py
+++ b/meta/lib/oeqa/core/decorator/oetimeout.py
@@ -1,12 +1,8 @@
1# Copyright (C) 2016 Intel Corporation 1# Copyright (C) 2016 Intel Corporation
2# Released under the MIT license (see COPYING.MIT) 2# Released under the MIT license (see COPYING.MIT)
3 3
4from . import OETestDecorator, registerDecorator
5
6import signal 4import signal
7from threading import Timer 5from . import OETestDecorator, registerDecorator
8
9from oeqa.core.threaded import OETestRunnerThreaded
10from oeqa.core.exception import OEQATimeoutError 6from oeqa.core.exception import OEQATimeoutError
11 7
12@registerDecorator 8@registerDecorator
@@ -14,32 +10,16 @@ class OETimeout(OETestDecorator):
14 attrs = ('oetimeout',) 10 attrs = ('oetimeout',)
15 11
16 def setUpDecorator(self): 12 def setUpDecorator(self):
17 self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout) 13 timeout = self.oetimeout
18 14 def _timeoutHandler(signum, frame):
19 if isinstance(self.case.tc.runner, OETestRunnerThreaded): 15 raise OEQATimeoutError("Timed out after %s "
20 self.timeouted = False
21 def _timeoutHandler():
22 self.timeouted = True
23
24 self.timer = Timer(self.oetimeout, _timeoutHandler)
25 self.timer.start()
26 else:
27 timeout = self.oetimeout
28 def _timeoutHandler(signum, frame):
29 raise OEQATimeoutError("Timed out after %s "
30 "seconds of execution" % timeout) 16 "seconds of execution" % timeout)
31 17
32 self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler) 18 self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout)
33 signal.alarm(self.oetimeout) 19 self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler)
20 signal.alarm(self.oetimeout)
34 21
35 def tearDownDecorator(self): 22 def tearDownDecorator(self):
36 if isinstance(self.case.tc.runner, OETestRunnerThreaded): 23 signal.alarm(0)
37 self.timer.cancel() 24 signal.signal(signal.SIGALRM, self.alarmSignal)
38 self.logger.debug("Removed Timer handler") 25 self.logger.debug("Removed SIGALRM handler")
39 if self.timeouted:
40 raise OEQATimeoutError("Timed out after %s "
41 "seconds of execution" % self.oetimeout)
42 else:
43 signal.alarm(0)
44 signal.signal(signal.SIGALRM, self.alarmSignal)
45 self.logger.debug("Removed SIGALRM handler")
diff --git a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py b/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py
deleted file mode 100644
index 0fe4cb3f11..0000000000
--- a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded.py
+++ /dev/null
@@ -1,12 +0,0 @@
1# Copyright (C) 2017 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4from oeqa.core.case import OETestCase
5
6class ThreadedTest(OETestCase):
7 def test_threaded_no_depends(self):
8 self.assertTrue(True, msg='How is this possible?')
9
10class ThreadedTest2(OETestCase):
11 def test_threaded_same_module(self):
12 self.assertTrue(True, msg='How is this possible?')
diff --git a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py b/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py
deleted file mode 100644
index 905f397846..0000000000
--- a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_alone.py
+++ /dev/null
@@ -1,8 +0,0 @@
1# Copyright (C) 2017 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4from oeqa.core.case import OETestCase
5
6class ThreadedTestAlone(OETestCase):
7 def test_threaded_alone(self):
8 self.assertTrue(True, msg='How is this possible?')
diff --git a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py b/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py
deleted file mode 100644
index 0c158d3bac..0000000000
--- a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_depends.py
+++ /dev/null
@@ -1,10 +0,0 @@
1# Copyright (C) 2017 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4from oeqa.core.case import OETestCase
5from oeqa.core.decorator.depends import OETestDepends
6
7class ThreadedTest3(OETestCase):
8 @OETestDepends(['threaded.ThreadedTest.test_threaded_no_depends'])
9 def test_threaded_depends(self):
10 self.assertTrue(True, msg='How is this possible?')
diff --git a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py b/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py
deleted file mode 100644
index 63d17e0401..0000000000
--- a/meta/lib/oeqa/core/tests/cases/loader/threaded/threaded_module.py
+++ /dev/null
@@ -1,12 +0,0 @@
1# Copyright (C) 2017 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4from oeqa.core.case import OETestCase
5
6class ThreadedTestModule(OETestCase):
7 def test_threaded_module(self):
8 self.assertTrue(True, msg='How is this possible?')
9
10class ThreadedTestModule2(OETestCase):
11 def test_threaded_module2(self):
12 self.assertTrue(True, msg='How is this possible?')
diff --git a/meta/lib/oeqa/core/tests/common.py b/meta/lib/oeqa/core/tests/common.py
index 1932323409..52b18a1c3e 100644
--- a/meta/lib/oeqa/core/tests/common.py
+++ b/meta/lib/oeqa/core/tests/common.py
@@ -33,13 +33,3 @@ class TestBase(unittest.TestCase):
33 tc.loadTests(self.cases_path, modules=modules, tests=tests, 33 tc.loadTests(self.cases_path, modules=modules, tests=tests,
34 filters=filters) 34 filters=filters)
35 return tc 35 return tc
36
37 def _testLoaderThreaded(self, d={}, modules=[],
38 tests=[], filters={}):
39 from oeqa.core.threaded import OETestContextThreaded
40
41 tc = OETestContextThreaded(d, self.logger)
42 tc.loadTests(self.cases_path, modules=modules, tests=tests,
43 filters=filters)
44
45 return tc
diff --git a/meta/lib/oeqa/core/tests/test_decorators.py b/meta/lib/oeqa/core/tests/test_decorators.py
index cf99e0d72d..f7d11e885a 100755
--- a/meta/lib/oeqa/core/tests/test_decorators.py
+++ b/meta/lib/oeqa/core/tests/test_decorators.py
@@ -131,17 +131,5 @@ class TestTimeoutDecorator(TestBase):
131 msg = "OETestTimeout didn't restore SIGALRM" 131 msg = "OETestTimeout didn't restore SIGALRM"
132 self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg) 132 self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg)
133 133
134 def test_timeout_thread(self):
135 tests = ['timeout.TimeoutTest.testTimeoutPass']
136 msg = 'Failed to run test using OETestTimeout'
137 tc = self._testLoaderThreaded(modules=self.modules, tests=tests)
138 self.assertTrue(tc.runTests().wasSuccessful(), msg=msg)
139
140 def test_timeout_threaded_fail(self):
141 tests = ['timeout.TimeoutTest.testTimeoutFail']
142 msg = "OETestTimeout test didn't timeout as expected"
143 tc = self._testLoaderThreaded(modules=self.modules, tests=tests)
144 self.assertFalse(tc.runTests().wasSuccessful(), msg=msg)
145
146if __name__ == '__main__': 134if __name__ == '__main__':
147 unittest.main() 135 unittest.main()
diff --git a/meta/lib/oeqa/core/tests/test_loader.py b/meta/lib/oeqa/core/tests/test_loader.py
index e0d917d317..b79b8bad4d 100755
--- a/meta/lib/oeqa/core/tests/test_loader.py
+++ b/meta/lib/oeqa/core/tests/test_loader.py
@@ -1,6 +1,6 @@
1#!/usr/bin/env python3 1#!/usr/bin/env python3
2 2
3# Copyright (C) 2016-2017 Intel Corporation 3# Copyright (C) 2016 Intel Corporation
4# Released under the MIT license (see COPYING.MIT) 4# Released under the MIT license (see COPYING.MIT)
5 5
6import os 6import os
@@ -82,33 +82,5 @@ class TestLoader(TestBase):
82 msg = 'Expected modules from two different paths' 82 msg = 'Expected modules from two different paths'
83 self.assertEqual(modules, expected_modules, msg=msg) 83 self.assertEqual(modules, expected_modules, msg=msg)
84 84
85 def test_loader_threaded(self):
86 cases_path = self.cases_path
87
88 self.cases_path = [os.path.join(self.cases_path, 'loader', 'threaded')]
89
90 tc = self._testLoaderThreaded()
91 self.assertEqual(len(tc.suites), 3, "Expected to be 3 suites")
92
93 case_ids = ['threaded.ThreadedTest.test_threaded_no_depends',
94 'threaded.ThreadedTest2.test_threaded_same_module',
95 'threaded_depends.ThreadedTest3.test_threaded_depends']
96 for case in tc.suites[0]._tests:
97 self.assertEqual(case.id(),
98 case_ids[tc.suites[0]._tests.index(case)])
99
100 case_ids = ['threaded_alone.ThreadedTestAlone.test_threaded_alone']
101 for case in tc.suites[1]._tests:
102 self.assertEqual(case.id(),
103 case_ids[tc.suites[1]._tests.index(case)])
104
105 case_ids = ['threaded_module.ThreadedTestModule.test_threaded_module',
106 'threaded_module.ThreadedTestModule2.test_threaded_module2']
107 for case in tc.suites[2]._tests:
108 self.assertEqual(case.id(),
109 case_ids[tc.suites[2]._tests.index(case)])
110
111 self.cases_path = cases_path
112
113if __name__ == '__main__': 85if __name__ == '__main__':
114 unittest.main() 86 unittest.main()
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
deleted file mode 100644
index 2cafe03a21..0000000000
--- a/meta/lib/oeqa/core/threaded.py
+++ /dev/null
@@ -1,275 +0,0 @@
1# Copyright (C) 2017 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4import threading
5import multiprocessing
6import queue
7import time
8
9from unittest.suite import TestSuite
10
11from oeqa.core.loader import OETestLoader
12from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
13from oeqa.core.context import OETestContext
14
15class OETestLoaderThreaded(OETestLoader):
16 def __init__(self, tc, module_paths, modules, tests, modules_required,
17 filters, process_num=0, *args, **kwargs):
18 super(OETestLoaderThreaded, self).__init__(tc, module_paths, modules,
19 tests, modules_required, filters, *args, **kwargs)
20
21 self.process_num = process_num
22
23 def discover(self):
24 suite = super(OETestLoaderThreaded, self).discover()
25
26 if self.process_num <= 0:
27 self.process_num = min(multiprocessing.cpu_count(),
28 len(suite._tests))
29
30 suites = []
31 for _ in range(self.process_num):
32 suites.append(self.suiteClass())
33
34 def _search_for_module_idx(suites, case):
35 """
36 Cases in the same module needs to be run
37 in the same thread because PyUnit keeps track
38 of setUp{Module, Class,} and tearDown{Module, Class,}.
39 """
40
41 for idx in range(self.process_num):
42 suite = suites[idx]
43 for c in suite._tests:
44 if case.__module__ == c.__module__:
45 return idx
46
47 return -1
48
49 def _search_for_depend_idx(suites, depends):
50 """
51 Dependency cases needs to be run in the same
52 thread, because OEQA framework look at the state
53 of dependant test to figure out if skip or not.
54 """
55
56 for idx in range(self.process_num):
57 suite = suites[idx]
58
59 for case in suite._tests:
60 if case.id() in depends:
61 return idx
62 return -1
63
64 def _get_best_idx(suites):
65 sizes = [len(suite._tests) for suite in suites]
66 return sizes.index(min(sizes))
67
68 def _fill_suites(suite):
69 idx = -1
70 for case in suite:
71 if isinstance(case, TestSuite):
72 _fill_suites(case)
73 else:
74 idx = _search_for_module_idx(suites, case)
75
76 depends = {}
77 if 'depends' in self.tc._registry:
78 depends = self.tc._registry['depends']
79
80 if idx == -1 and case.id() in depends:
81 case_depends = depends[case.id()]
82 idx = _search_for_depend_idx(suites, case_depends)
83
84 if idx == -1:
85 idx = _get_best_idx(suites)
86
87 suites[idx].addTest(case)
88 _fill_suites(suite)
89
90 suites_tmp = suites
91 suites = []
92 for suite in suites_tmp:
93 if len(suite._tests) > 0:
94 suites.append(suite)
95
96 return suites
97
98class OEStreamLoggerThreaded(OEStreamLogger):
99 _lock = threading.Lock()
100 buffers = {}
101
102 def write(self, msg):
103 tid = threading.get_ident()
104
105 if not tid in self.buffers:
106 self.buffers[tid] = ""
107
108 if msg:
109 self.buffers[tid] += msg
110
111 def finish(self):
112 tid = threading.get_ident()
113
114 self._lock.acquire()
115 self.logger.info('THREAD: %d' % tid)
116 self.logger.info('-' * 70)
117 for line in self.buffers[tid].split('\n'):
118 self.logger.info(line)
119 self._lock.release()
120
121class OETestResultThreadedInternal(OETestResult):
122 def _tc_map_results(self):
123 tid = threading.get_ident()
124
125 # PyUnit generates a result for every test module run, test
126 # if the thread already has an entry to avoid lose the previous
127 # test module results.
128 if not tid in self.tc._results:
129 self.tc._results[tid] = {}
130 self.tc._results[tid]['failures'] = self.failures
131 self.tc._results[tid]['errors'] = self.errors
132 self.tc._results[tid]['skipped'] = self.skipped
133 self.tc._results[tid]['expectedFailures'] = self.expectedFailures
134
135class OETestResultThreaded(object):
136 _results = {}
137 _lock = threading.Lock()
138
139 def __init__(self, tc):
140 self.tc = tc
141
142 def _fill_tc_results(self):
143 tids = list(self.tc._results.keys())
144 fields = ['failures', 'errors', 'skipped', 'expectedFailures']
145
146 for tid in tids:
147 result = self.tc._results[tid]
148 for field in fields:
149 if not field in self.tc._results:
150 self.tc._results[field] = []
151 self.tc._results[field].extend(result[field])
152
153 def addResult(self, result, run_start_time, run_end_time):
154 tid = threading.get_ident()
155
156 self._lock.acquire()
157 self._results[tid] = {}
158 self._results[tid]['result'] = result
159 self._results[tid]['run_start_time'] = run_start_time
160 self._results[tid]['run_end_time'] = run_end_time
161 self._results[tid]['result'] = result
162 self._lock.release()
163
164 def wasSuccessful(self):
165 wasSuccessful = True
166 for tid in self._results.keys():
167 wasSuccessful = wasSuccessful and \
168 self._results[tid]['result'].wasSuccessful()
169 return wasSuccessful
170
171 def stop(self):
172 for tid in self._results.keys():
173 self._results[tid]['result'].stop()
174
175 def logSummary(self, component, context_msg=''):
176 elapsed_time = (self.tc._run_end_time - self.tc._run_start_time)
177
178 self.tc.logger.info("SUMMARY:")
179 self.tc.logger.info("%s (%s) - Ran %d tests in %.3fs" % (component,
180 context_msg, len(self.tc._registry['cases']), elapsed_time))
181 if self.wasSuccessful():
182 msg = "%s - OK - All required tests passed" % component
183 else:
184 msg = "%s - FAIL - Required tests failed" % component
185 self.tc.logger.info(msg)
186
187 def logDetails(self):
188 if list(self._results):
189 tid = list(self._results)[0]
190 result = self._results[tid]['result']
191 result.logDetails()
192
193class _Worker(threading.Thread):
194 """Thread executing tasks from a given tasks queue"""
195 def __init__(self, tasks, result, stream):
196 threading.Thread.__init__(self)
197 self.tasks = tasks
198
199 self.result = result
200 self.stream = stream
201
202 def run(self):
203 while True:
204 try:
205 func, args, kargs = self.tasks.get(block=False)
206 except queue.Empty:
207 break
208
209 try:
210 run_start_time = time.time()
211 rc = func(*args, **kargs)
212 run_end_time = time.time()
213 self.result.addResult(rc, run_start_time, run_end_time)
214 self.stream.finish()
215 except Exception as e:
216 print(e)
217 finally:
218 self.tasks.task_done()
219
220class _ThreadedPool:
221 """Pool of threads consuming tasks from a queue"""
222 def __init__(self, num_workers, num_tasks, stream=None, result=None):
223 self.tasks = queue.Queue(num_tasks)
224 self.workers = []
225
226 for _ in range(num_workers):
227 worker = _Worker(self.tasks, result, stream)
228 self.workers.append(worker)
229
230 def start(self):
231 for worker in self.workers:
232 worker.start()
233
234 def add_task(self, func, *args, **kargs):
235 """Add a task to the queue"""
236 self.tasks.put((func, args, kargs))
237
238 def wait_completion(self):
239 """Wait for completion of all the tasks in the queue"""
240 self.tasks.join()
241 for worker in self.workers:
242 worker.join()
243
244class OETestRunnerThreaded(OETestRunner):
245 streamLoggerClass = OEStreamLoggerThreaded
246
247 def __init__(self, tc, *args, **kwargs):
248 super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
249 self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
250
251 def run(self, suites):
252 result = OETestResultThreaded(self.tc)
253
254 pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
255 result=result)
256 for s in suites:
257 pool.add_task(super(OETestRunnerThreaded, self).run, s)
258 pool.start()
259 pool.wait_completion()
260 result._fill_tc_results()
261
262 return result
263
264class OETestContextThreaded(OETestContext):
265 loaderClass = OETestLoaderThreaded
266 runnerClass = OETestRunnerThreaded
267
268 def loadTests(self, module_paths, modules=[], tests=[],
269 modules_manifest="", modules_required=[], filters={}, process_num=0):
270 if modules_manifest:
271 modules = self._read_modules_from_manifest(modules_manifest)
272
273 self.loader = self.loaderClass(self, module_paths, modules, tests,
274 modules_required, filters, process_num)
275 self.suites = self.loader.discover()