summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core/utils/concurrencytest.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core/utils/concurrencytest.py')
-rw-r--r--meta/lib/oeqa/core/utils/concurrencytest.py68
1 files changed, 37 insertions, 31 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
index b2eb68fb02..d10f8f7f04 100644
--- a/meta/lib/oeqa/core/utils/concurrencytest.py
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -1,5 +1,7 @@
1#!/usr/bin/env python3 1#!/usr/bin/env python3
2# 2#
3# Copyright OpenEmbedded Contributors
4#
3# SPDX-License-Identifier: GPL-2.0-or-later 5# SPDX-License-Identifier: GPL-2.0-or-later
4# 6#
5# Modified for use in OE by Richard Purdie, 2018 7# Modified for use in OE by Richard Purdie, 2018
@@ -48,11 +50,16 @@ _all__ = [
48# 50#
49class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): 51class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
50 52
51 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): 53 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult):
52 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) 54 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
53 self.threadnum = threadnum 55 self.threadnum = threadnum
54 self.totalinprocess = totalinprocess 56 self.totalinprocess = totalinprocess
55 self.totaltests = totaltests 57 self.totaltests = totaltests
58 self.buffer = True
59 self.outputbuf = output
60 self.finalresult = finalresult
61 self.finalresult.buffer = True
62 self.target = target
56 63
57 def _add_result_with_semaphore(self, method, test, *args, **kwargs): 64 def _add_result_with_semaphore(self, method, test, *args, **kwargs):
58 self.semaphore.acquire() 65 self.semaphore.acquire()
@@ -61,16 +68,19 @@ class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
61 self.result.starttime[test.id()] = self._test_start.timestamp() 68 self.result.starttime[test.id()] = self._test_start.timestamp()
62 self.result.threadprogress[self.threadnum].append(test.id()) 69 self.result.threadprogress[self.threadnum].append(test.id())
63 totalprogress = sum(len(x) for x in self.result.threadprogress.values()) 70 totalprogress = sum(len(x) for x in self.result.threadprogress.values())
64 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( 71 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % (
65 self.threadnum, 72 self.threadnum,
66 len(self.result.threadprogress[self.threadnum]), 73 len(self.result.threadprogress[self.threadnum]),
67 self.totalinprocess, 74 self.totalinprocess,
68 totalprogress, 75 totalprogress,
69 self.totaltests, 76 self.totaltests,
70 "{0:.2f}".format(time.time()-self._test_start.timestamp()), 77 "{0:.2f}".format(time.time()-self._test_start.timestamp()),
78 self.target.failed_tests,
71 test.id()) 79 test.id())
72 finally: 80 finally:
73 self.semaphore.release() 81 self.semaphore.release()
82 self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8"))
83 self.finalresult._stdout_buffer = io.StringIO()
74 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) 84 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
75 85
76class ProxyTestResult: 86class ProxyTestResult:
@@ -183,35 +193,28 @@ class dummybuf(object):
183# 193#
184class ConcurrentTestSuite(unittest.TestSuite): 194class ConcurrentTestSuite(unittest.TestSuite):
185 195
186 def __init__(self, suite, processes, setupfunc, removefunc): 196 def __init__(self, suite, processes, setupfunc, removefunc, bb_vars):
187 super(ConcurrentTestSuite, self).__init__([suite]) 197 super(ConcurrentTestSuite, self).__init__([suite])
188 self.processes = processes 198 self.processes = processes
189 self.setupfunc = setupfunc 199 self.setupfunc = setupfunc
190 self.removefunc = removefunc 200 self.removefunc = removefunc
201 self.bb_vars = bb_vars
191 202
192 def run(self, result): 203 def run(self, result):
193 tests, totaltests = fork_for_tests(self.processes, self) 204 testservers, totaltests = fork_for_tests(self.processes, self)
194 try: 205 try:
195 threads = {} 206 threads = {}
196 queue = Queue() 207 queue = Queue()
197 semaphore = threading.Semaphore(1) 208 semaphore = threading.Semaphore(1)
198 result.threadprogress = {} 209 result.threadprogress = {}
199 for i, (test, testnum) in enumerate(tests): 210 for i, (testserver, testnum, output) in enumerate(testservers):
200 result.threadprogress[i] = [] 211 result.threadprogress[i] = []
201 process_result = BBThreadsafeForwardingResult( 212 process_result = BBThreadsafeForwardingResult(
202 ExtraResultsDecoderTestResult(result), 213 ExtraResultsDecoderTestResult(result),
203 semaphore, i, testnum, totaltests) 214 semaphore, i, testnum, totaltests, output, result)
204 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
205 # as per default in parent code
206 process_result.buffer = True
207 # We have to add a buffer object to stdout to keep subunit happy
208 process_result._stderr_buffer = io.StringIO()
209 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
210 process_result._stdout_buffer = io.StringIO()
211 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
212 reader_thread = threading.Thread( 215 reader_thread = threading.Thread(
213 target=self._run_test, args=(test, process_result, queue)) 216 target=self._run_test, args=(testserver, process_result, queue))
214 threads[test] = reader_thread, process_result 217 threads[testserver] = reader_thread, process_result
215 reader_thread.start() 218 reader_thread.start()
216 while threads: 219 while threads:
217 finished_test = queue.get() 220 finished_test = queue.get()
@@ -222,13 +225,13 @@ class ConcurrentTestSuite(unittest.TestSuite):
222 process_result.stop() 225 process_result.stop()
223 raise 226 raise
224 finally: 227 finally:
225 for test in tests: 228 for testserver in testservers:
226 test[0]._stream.close() 229 testserver[0]._stream.close()
227 230
228 def _run_test(self, test, process_result, queue): 231 def _run_test(self, testserver, process_result, queue):
229 try: 232 try:
230 try: 233 try:
231 test.run(process_result) 234 testserver.run(process_result)
232 except Exception: 235 except Exception:
233 # The run logic itself failed 236 # The run logic itself failed
234 case = testtools.ErrorHolder( 237 case = testtools.ErrorHolder(
@@ -236,12 +239,12 @@ class ConcurrentTestSuite(unittest.TestSuite):
236 error=sys.exc_info()) 239 error=sys.exc_info())
237 case.run(process_result) 240 case.run(process_result)
238 finally: 241 finally:
239 queue.put(test) 242 queue.put(testserver)
240 243
241def fork_for_tests(concurrency_num, suite): 244def fork_for_tests(concurrency_num, suite):
242 result = [] 245 testservers = []
243 if 'BUILDDIR' in os.environ: 246 if 'BUILDDIR' in os.environ:
244 selftestdir = get_test_layer() 247 selftestdir = get_test_layer(suite.bb_vars['BBLAYERS'])
245 248
246 test_blocks = partition_tests(suite, concurrency_num) 249 test_blocks = partition_tests(suite, concurrency_num)
247 # Clear the tests from the original suite so it doesn't keep them alive 250 # Clear the tests from the original suite so it doesn't keep them alive
@@ -261,7 +264,7 @@ def fork_for_tests(concurrency_num, suite):
261 ourpid = os.getpid() 264 ourpid = os.getpid()
262 try: 265 try:
263 newbuilddir = None 266 newbuilddir = None
264 stream = os.fdopen(c2pwrite, 'wb', 1) 267 stream = os.fdopen(c2pwrite, 'wb')
265 os.close(c2pread) 268 os.close(c2pread)
266 269
267 (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite) 270 (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite)
@@ -273,10 +276,11 @@ def fork_for_tests(concurrency_num, suite):
273 newsi = os.open(os.devnull, os.O_RDWR) 276 newsi = os.open(os.devnull, os.O_RDWR)
274 os.dup2(newsi, sys.stdin.fileno()) 277 os.dup2(newsi, sys.stdin.fileno())
275 278
279 # Send stdout/stderr over the stream
280 os.dup2(c2pwrite, sys.stdout.fileno())
281 os.dup2(c2pwrite, sys.stderr.fileno())
282
276 subunit_client = TestProtocolClient(stream) 283 subunit_client = TestProtocolClient(stream)
277 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
278 # as per default in parent code
279 subunit_client.buffer = True
280 subunit_result = AutoTimingTestResultDecorator(subunit_client) 284 subunit_result = AutoTimingTestResultDecorator(subunit_client)
281 unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) 285 unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
282 if ourpid != os.getpid(): 286 if ourpid != os.getpid():
@@ -305,10 +309,12 @@ def fork_for_tests(concurrency_num, suite):
305 os._exit(0) 309 os._exit(0)
306 else: 310 else:
307 os.close(c2pwrite) 311 os.close(c2pwrite)
308 stream = os.fdopen(c2pread, 'rb', 1) 312 stream = os.fdopen(c2pread, 'rb')
309 test = ProtocolTestCase(stream) 313 # Collect stdout/stderr into an io buffer
310 result.append((test, numtests)) 314 output = io.BytesIO()
311 return result, totaltests 315 testserver = ProtocolTestCase(stream, passthrough=output)
316 testservers.append((testserver, numtests, output))
317 return testservers, totaltests
312 318
313def partition_tests(suite, count): 319def partition_tests(suite, count):
314 # Keep tests from the same class together but allow tests from modules 320 # Keep tests from the same class together but allow tests from modules