diff options
Diffstat (limited to 'meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r-- | meta/lib/oeqa/core/utils/concurrencytest.py | 68 |
1 files changed, 37 insertions, 31 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py index b2eb68fb02..d10f8f7f04 100644 --- a/meta/lib/oeqa/core/utils/concurrencytest.py +++ b/meta/lib/oeqa/core/utils/concurrencytest.py | |||
@@ -1,5 +1,7 @@ | |||
1 | #!/usr/bin/env python3 | 1 | #!/usr/bin/env python3 |
2 | # | 2 | # |
3 | # Copyright OpenEmbedded Contributors | ||
4 | # | ||
3 | # SPDX-License-Identifier: GPL-2.0-or-later | 5 | # SPDX-License-Identifier: GPL-2.0-or-later |
4 | # | 6 | # |
5 | # Modified for use in OE by Richard Purdie, 2018 | 7 | # Modified for use in OE by Richard Purdie, 2018 |
@@ -48,11 +50,16 @@ _all__ = [ | |||
48 | # | 50 | # |
49 | class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): | 51 | class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): |
50 | 52 | ||
51 | def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): | 53 | def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult): |
52 | super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) | 54 | super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) |
53 | self.threadnum = threadnum | 55 | self.threadnum = threadnum |
54 | self.totalinprocess = totalinprocess | 56 | self.totalinprocess = totalinprocess |
55 | self.totaltests = totaltests | 57 | self.totaltests = totaltests |
58 | self.buffer = True | ||
59 | self.outputbuf = output | ||
60 | self.finalresult = finalresult | ||
61 | self.finalresult.buffer = True | ||
62 | self.target = target | ||
56 | 63 | ||
57 | def _add_result_with_semaphore(self, method, test, *args, **kwargs): | 64 | def _add_result_with_semaphore(self, method, test, *args, **kwargs): |
58 | self.semaphore.acquire() | 65 | self.semaphore.acquire() |
@@ -61,16 +68,19 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): | |||
61 | self.result.starttime[test.id()] = self._test_start.timestamp() | 68 | self.result.starttime[test.id()] = self._test_start.timestamp() |
62 | self.result.threadprogress[self.threadnum].append(test.id()) | 69 | self.result.threadprogress[self.threadnum].append(test.id()) |
63 | totalprogress = sum(len(x) for x in self.result.threadprogress.values()) | 70 | totalprogress = sum(len(x) for x in self.result.threadprogress.values()) |
64 | self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( | 71 | self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % ( |
65 | self.threadnum, | 72 | self.threadnum, |
66 | len(self.result.threadprogress[self.threadnum]), | 73 | len(self.result.threadprogress[self.threadnum]), |
67 | self.totalinprocess, | 74 | self.totalinprocess, |
68 | totalprogress, | 75 | totalprogress, |
69 | self.totaltests, | 76 | self.totaltests, |
70 | "{0:.2f}".format(time.time()-self._test_start.timestamp()), | 77 | "{0:.2f}".format(time.time()-self._test_start.timestamp()), |
78 | self.target.failed_tests, | ||
71 | test.id()) | 79 | test.id()) |
72 | finally: | 80 | finally: |
73 | self.semaphore.release() | 81 | self.semaphore.release() |
82 | self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8")) | ||
83 | self.finalresult._stdout_buffer = io.StringIO() | ||
74 | super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) | 84 | super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) |
75 | 85 | ||
76 | class ProxyTestResult: | 86 | class ProxyTestResult: |
@@ -183,35 +193,28 @@ class dummybuf(object): | |||
183 | # | 193 | # |
184 | class ConcurrentTestSuite(unittest.TestSuite): | 194 | class ConcurrentTestSuite(unittest.TestSuite): |
185 | 195 | ||
186 | def __init__(self, suite, processes, setupfunc, removefunc): | 196 | def __init__(self, suite, processes, setupfunc, removefunc, bb_vars): |
187 | super(ConcurrentTestSuite, self).__init__([suite]) | 197 | super(ConcurrentTestSuite, self).__init__([suite]) |
188 | self.processes = processes | 198 | self.processes = processes |
189 | self.setupfunc = setupfunc | 199 | self.setupfunc = setupfunc |
190 | self.removefunc = removefunc | 200 | self.removefunc = removefunc |
201 | self.bb_vars = bb_vars | ||
191 | 202 | ||
192 | def run(self, result): | 203 | def run(self, result): |
193 | tests, totaltests = fork_for_tests(self.processes, self) | 204 | testservers, totaltests = fork_for_tests(self.processes, self) |
194 | try: | 205 | try: |
195 | threads = {} | 206 | threads = {} |
196 | queue = Queue() | 207 | queue = Queue() |
197 | semaphore = threading.Semaphore(1) | 208 | semaphore = threading.Semaphore(1) |
198 | result.threadprogress = {} | 209 | result.threadprogress = {} |
199 | for i, (test, testnum) in enumerate(tests): | 210 | for i, (testserver, testnum, output) in enumerate(testservers): |
200 | result.threadprogress[i] = [] | 211 | result.threadprogress[i] = [] |
201 | process_result = BBThreadsafeForwardingResult( | 212 | process_result = BBThreadsafeForwardingResult( |
202 | ExtraResultsDecoderTestResult(result), | 213 | ExtraResultsDecoderTestResult(result), |
203 | semaphore, i, testnum, totaltests) | 214 | semaphore, i, testnum, totaltests, output, result) |
204 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output | ||
205 | # as per default in parent code | ||
206 | process_result.buffer = True | ||
207 | # We have to add a buffer object to stdout to keep subunit happy | ||
208 | process_result._stderr_buffer = io.StringIO() | ||
209 | process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) | ||
210 | process_result._stdout_buffer = io.StringIO() | ||
211 | process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) | ||
212 | reader_thread = threading.Thread( | 215 | reader_thread = threading.Thread( |
213 | target=self._run_test, args=(test, process_result, queue)) | 216 | target=self._run_test, args=(testserver, process_result, queue)) |
214 | threads[test] = reader_thread, process_result | 217 | threads[testserver] = reader_thread, process_result |
215 | reader_thread.start() | 218 | reader_thread.start() |
216 | while threads: | 219 | while threads: |
217 | finished_test = queue.get() | 220 | finished_test = queue.get() |
@@ -222,13 +225,13 @@ class ConcurrentTestSuite(unittest.TestSuite): | |||
222 | process_result.stop() | 225 | process_result.stop() |
223 | raise | 226 | raise |
224 | finally: | 227 | finally: |
225 | for test in tests: | 228 | for testserver in testservers: |
226 | test[0]._stream.close() | 229 | testserver[0]._stream.close() |
227 | 230 | ||
228 | def _run_test(self, test, process_result, queue): | 231 | def _run_test(self, testserver, process_result, queue): |
229 | try: | 232 | try: |
230 | try: | 233 | try: |
231 | test.run(process_result) | 234 | testserver.run(process_result) |
232 | except Exception: | 235 | except Exception: |
233 | # The run logic itself failed | 236 | # The run logic itself failed |
234 | case = testtools.ErrorHolder( | 237 | case = testtools.ErrorHolder( |
@@ -236,12 +239,12 @@ class ConcurrentTestSuite(unittest.TestSuite): | |||
236 | error=sys.exc_info()) | 239 | error=sys.exc_info()) |
237 | case.run(process_result) | 240 | case.run(process_result) |
238 | finally: | 241 | finally: |
239 | queue.put(test) | 242 | queue.put(testserver) |
240 | 243 | ||
241 | def fork_for_tests(concurrency_num, suite): | 244 | def fork_for_tests(concurrency_num, suite): |
242 | result = [] | 245 | testservers = [] |
243 | if 'BUILDDIR' in os.environ: | 246 | if 'BUILDDIR' in os.environ: |
244 | selftestdir = get_test_layer() | 247 | selftestdir = get_test_layer(suite.bb_vars['BBLAYERS']) |
245 | 248 | ||
246 | test_blocks = partition_tests(suite, concurrency_num) | 249 | test_blocks = partition_tests(suite, concurrency_num) |
247 | # Clear the tests from the original suite so it doesn't keep them alive | 250 | # Clear the tests from the original suite so it doesn't keep them alive |
@@ -261,7 +264,7 @@ def fork_for_tests(concurrency_num, suite): | |||
261 | ourpid = os.getpid() | 264 | ourpid = os.getpid() |
262 | try: | 265 | try: |
263 | newbuilddir = None | 266 | newbuilddir = None |
264 | stream = os.fdopen(c2pwrite, 'wb', 1) | 267 | stream = os.fdopen(c2pwrite, 'wb') |
265 | os.close(c2pread) | 268 | os.close(c2pread) |
266 | 269 | ||
267 | (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite) | 270 | (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite) |
@@ -273,10 +276,11 @@ def fork_for_tests(concurrency_num, suite): | |||
273 | newsi = os.open(os.devnull, os.O_RDWR) | 276 | newsi = os.open(os.devnull, os.O_RDWR) |
274 | os.dup2(newsi, sys.stdin.fileno()) | 277 | os.dup2(newsi, sys.stdin.fileno()) |
275 | 278 | ||
279 | # Send stdout/stderr over the stream | ||
280 | os.dup2(c2pwrite, sys.stdout.fileno()) | ||
281 | os.dup2(c2pwrite, sys.stderr.fileno()) | ||
282 | |||
276 | subunit_client = TestProtocolClient(stream) | 283 | subunit_client = TestProtocolClient(stream) |
277 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output | ||
278 | # as per default in parent code | ||
279 | subunit_client.buffer = True | ||
280 | subunit_result = AutoTimingTestResultDecorator(subunit_client) | 284 | subunit_result = AutoTimingTestResultDecorator(subunit_client) |
281 | unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) | 285 | unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) |
282 | if ourpid != os.getpid(): | 286 | if ourpid != os.getpid(): |
@@ -305,10 +309,12 @@ def fork_for_tests(concurrency_num, suite): | |||
305 | os._exit(0) | 309 | os._exit(0) |
306 | else: | 310 | else: |
307 | os.close(c2pwrite) | 311 | os.close(c2pwrite) |
308 | stream = os.fdopen(c2pread, 'rb', 1) | 312 | stream = os.fdopen(c2pread, 'rb') |
309 | test = ProtocolTestCase(stream) | 313 | # Collect stdout/stderr into an io buffer |
310 | result.append((test, numtests)) | 314 | output = io.BytesIO() |
311 | return result, totaltests | 315 | testserver = ProtocolTestCase(stream, passthrough=output) |
316 | testservers.append((testserver, numtests, output)) | ||
317 | return testservers, totaltests | ||
312 | 318 | ||
313 | def partition_tests(suite, count): | 319 | def partition_tests(suite, count): |
314 | # Keep tests from the same class together but allow tests from modules | 320 | # Keep tests from the same class together but allow tests from modules |