diff options
| author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2018-07-09 15:20:34 +0000 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2018-07-18 10:18:41 +0100 |
| commit | ebd97e728ae1bd9442299c871a07a1b3b9f9efdf (patch) | |
| tree | a76fec3b71434a64d9a9b8310434b78bceafdc89 /meta/lib/oeqa/core/utils | |
| parent | 05c32d2de1ee4681cc78cb107a158e9ab22c9619 (diff) | |
| download | poky-ebd97e728ae1bd9442299c871a07a1b3b9f9efdf.tar.gz | |
oeqa: Add selftest parallelisation support
This allows oe-selftest to take a -j option which specifies how much test
parallelisation to use. Currently this is "module" based with each module
being split and run in a separate build directory. Further splitting could
be done but this seems a good compromise between test setup and parallelism.
You need python-testtools and python-subunit installed to use this but only
when the -j option is specified.
See notes posted to the openedmbedded-architecture list for more details
about the design choices here.
Some of this functionality may make more sense in the oeqa core ultimately.
(From OE-Core rev: 326ababfd620ae5ea29bf486b9d68ba3d60cad30)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta/lib/oeqa/core/utils')
| -rw-r--r-- | meta/lib/oeqa/core/utils/concurrencytest.py | 254 |
1 files changed, 254 insertions, 0 deletions
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py new file mode 100644 index 0000000000..850586516a --- /dev/null +++ b/meta/lib/oeqa/core/utils/concurrencytest.py | |||
| @@ -0,0 +1,254 @@ | |||
| 1 | #!/usr/bin/env python3 | ||
| 2 | # | ||
| 3 | # Modified for use in OE by Richard Purdie, 2018 | ||
| 4 | # | ||
| 5 | # Modified by: Corey Goldberg, 2013 | ||
| 6 | # License: GPLv2+ | ||
| 7 | # | ||
| 8 | # Original code from: | ||
| 9 | # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) | ||
| 10 | # Copyright (C) 2005-2011 Canonical Ltd | ||
| 11 | # License: GPLv2+ | ||
| 12 | |||
| 13 | import os | ||
| 14 | import sys | ||
| 15 | import traceback | ||
| 16 | import unittest | ||
| 17 | import subprocess | ||
| 18 | import testtools | ||
| 19 | import threading | ||
| 20 | import time | ||
| 21 | import io | ||
| 22 | |||
| 23 | from queue import Queue | ||
| 24 | from itertools import cycle | ||
| 25 | from subunit import ProtocolTestCase, TestProtocolClient | ||
| 26 | from subunit.test_results import AutoTimingTestResultDecorator | ||
| 27 | from testtools import ThreadsafeForwardingResult, iterate_tests | ||
| 28 | |||
| 29 | import bb.utils | ||
| 30 | import oe.path | ||
| 31 | |||
| 32 | _all__ = [ | ||
| 33 | 'ConcurrentTestSuite', | ||
| 34 | 'fork_for_tests', | ||
| 35 | 'partition_tests', | ||
| 36 | ] | ||
| 37 | |||
| 38 | # | ||
| 39 | # Patch the version from testtools to allow access to _test_start and allow | ||
| 40 | # computation of timing information and threading progress | ||
| 41 | # | ||
| 42 | class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): | ||
| 43 | |||
| 44 | def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): | ||
| 45 | super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) | ||
| 46 | self.threadnum = threadnum | ||
| 47 | self.totalinprocess = totalinprocess | ||
| 48 | self.totaltests = totaltests | ||
| 49 | |||
| 50 | def _add_result_with_semaphore(self, method, test, *args, **kwargs): | ||
| 51 | self.semaphore.acquire() | ||
| 52 | try: | ||
| 53 | self.result.starttime[test.id()] = self._test_start.timestamp() | ||
| 54 | self.result.threadprogress[self.threadnum].append(test.id()) | ||
| 55 | totalprogress = sum(len(x) for x in self.result.threadprogress.values()) | ||
| 56 | self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( | ||
| 57 | self.threadnum, | ||
| 58 | len(self.result.threadprogress[self.threadnum]), | ||
| 59 | self.totalinprocess, | ||
| 60 | totalprogress, | ||
| 61 | self.totaltests, | ||
| 62 | "{0:.2f}".format(time.time()-self._test_start.timestamp()), | ||
| 63 | test.id()) | ||
| 64 | finally: | ||
| 65 | self.semaphore.release() | ||
| 66 | super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) | ||
| 67 | |||
| 68 | # | ||
| 69 | # A dummy structure to add to io.StringIO so that the .buffer object | ||
| 70 | # is available and accepts writes. This allows unittest with buffer=True | ||
| 71 | # to interact ok with subunit which wants to access sys.stdout.buffer. | ||
| 72 | # | ||
| 73 | class dummybuf(object): | ||
| 74 | def __init__(self, parent): | ||
| 75 | self.p = parent | ||
| 76 | def write(self, data): | ||
| 77 | self.p.write(data.decode("utf-8")) | ||
| 78 | |||
| 79 | # | ||
| 80 | # Taken from testtools.ConncurrencyTestSuite but modified for OE use | ||
| 81 | # | ||
| 82 | class ConcurrentTestSuite(unittest.TestSuite): | ||
| 83 | |||
| 84 | def __init__(self, suite, processes): | ||
| 85 | super(ConcurrentTestSuite, self).__init__([suite]) | ||
| 86 | self.processes = processes | ||
| 87 | |||
| 88 | def run(self, result): | ||
| 89 | tests, totaltests = fork_for_tests(self.processes, self) | ||
| 90 | try: | ||
| 91 | threads = {} | ||
| 92 | queue = Queue() | ||
| 93 | semaphore = threading.Semaphore(1) | ||
| 94 | result.threadprogress = {} | ||
| 95 | for i, (test, testnum) in enumerate(tests): | ||
| 96 | result.threadprogress[i] = [] | ||
| 97 | process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests) | ||
| 98 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output | ||
| 99 | # as per default in parent code | ||
| 100 | process_result.buffer = True | ||
| 101 | # We have to add a buffer object to stdout to keep subunit happy | ||
| 102 | process_result._stderr_buffer = io.StringIO() | ||
| 103 | process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) | ||
| 104 | process_result._stdout_buffer = io.StringIO() | ||
| 105 | process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) | ||
| 106 | reader_thread = threading.Thread( | ||
| 107 | target=self._run_test, args=(test, process_result, queue)) | ||
| 108 | threads[test] = reader_thread, process_result | ||
| 109 | reader_thread.start() | ||
| 110 | while threads: | ||
| 111 | finished_test = queue.get() | ||
| 112 | threads[finished_test][0].join() | ||
| 113 | del threads[finished_test] | ||
| 114 | except: | ||
| 115 | for thread, process_result in threads.values(): | ||
| 116 | process_result.stop() | ||
| 117 | raise | ||
| 118 | |||
| 119 | def _run_test(self, test, process_result, queue): | ||
| 120 | try: | ||
| 121 | try: | ||
| 122 | test.run(process_result) | ||
| 123 | except Exception: | ||
| 124 | # The run logic itself failed | ||
| 125 | case = testtools.ErrorHolder( | ||
| 126 | "broken-runner", | ||
| 127 | error=sys.exc_info()) | ||
| 128 | case.run(process_result) | ||
| 129 | finally: | ||
| 130 | queue.put(test) | ||
| 131 | |||
| 132 | def removebuilddir(d): | ||
| 133 | delay = 5 | ||
| 134 | while delay and os.path.exists(d + "/bitbake.lock"): | ||
| 135 | time.sleep(1) | ||
| 136 | delay = delay - 1 | ||
| 137 | bb.utils.prunedir(d) | ||
| 138 | |||
| 139 | def fork_for_tests(concurrency_num, suite): | ||
| 140 | result = [] | ||
| 141 | test_blocks = partition_tests(suite, concurrency_num) | ||
| 142 | # Clear the tests from the original suite so it doesn't keep them alive | ||
| 143 | suite._tests[:] = [] | ||
| 144 | totaltests = sum(len(x) for x in test_blocks) | ||
| 145 | for process_tests in test_blocks: | ||
| 146 | numtests = len(process_tests) | ||
| 147 | process_suite = unittest.TestSuite(process_tests) | ||
| 148 | # Also clear each split list so new suite has only reference | ||
| 149 | process_tests[:] = [] | ||
| 150 | c2pread, c2pwrite = os.pipe() | ||
| 151 | # Clear buffers before fork to avoid duplicate output | ||
| 152 | sys.stdout.flush() | ||
| 153 | sys.stderr.flush() | ||
| 154 | pid = os.fork() | ||
| 155 | if pid == 0: | ||
| 156 | ourpid = os.getpid() | ||
| 157 | try: | ||
| 158 | newbuilddir = None | ||
| 159 | stream = os.fdopen(c2pwrite, 'wb', 1) | ||
| 160 | os.close(c2pread) | ||
| 161 | |||
| 162 | # Create a new separate BUILDDIR for each group of tests | ||
| 163 | if 'BUILDDIR' in os.environ: | ||
| 164 | builddir = os.environ['BUILDDIR'] | ||
| 165 | newbuilddir = builddir + "-st-" + str(ourpid) | ||
| 166 | selftestdir = os.path.abspath(builddir + "/../meta-selftest") | ||
| 167 | newselftestdir = newbuilddir + "/meta-selftest" | ||
| 168 | |||
| 169 | bb.utils.mkdirhier(newbuilddir) | ||
| 170 | oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") | ||
| 171 | oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") | ||
| 172 | oe.path.copytree(selftestdir, newselftestdir) | ||
| 173 | |||
| 174 | for e in os.environ: | ||
| 175 | if builddir in os.environ[e]: | ||
| 176 | os.environ[e] = os.environ[e].replace(builddir, newbuilddir) | ||
| 177 | |||
| 178 | subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) | ||
| 179 | |||
| 180 | # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow | ||
| 181 | subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) | ||
| 182 | |||
| 183 | os.chdir(newbuilddir) | ||
| 184 | |||
| 185 | for t in process_suite: | ||
| 186 | if not hasattr(t, "tc"): | ||
| 187 | continue | ||
| 188 | cp = t.tc.config_paths | ||
| 189 | for p in cp: | ||
| 190 | if selftestdir in cp[p] and newselftestdir not in cp[p]: | ||
| 191 | cp[p] = cp[p].replace(selftestdir, newselftestdir) | ||
| 192 | if builddir in cp[p] and newbuilddir not in cp[p]: | ||
| 193 | cp[p] = cp[p].replace(builddir, newbuilddir) | ||
| 194 | |||
| 195 | # Leave stderr and stdout open so we can see test noise | ||
| 196 | # Close stdin so that the child goes away if it decides to | ||
| 197 | # read from stdin (otherwise its a roulette to see what | ||
| 198 | # child actually gets keystrokes for pdb etc). | ||
| 199 | newsi = os.open(os.devnull, os.O_RDWR) | ||
| 200 | os.dup2(newsi, sys.stdin.fileno()) | ||
| 201 | |||
| 202 | subunit_client = TestProtocolClient(stream) | ||
| 203 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output | ||
| 204 | # as per default in parent code | ||
| 205 | subunit_client.buffer = True | ||
| 206 | subunit_result = AutoTimingTestResultDecorator(subunit_client) | ||
| 207 | process_suite.run(subunit_result) | ||
| 208 | if ourpid != os.getpid(): | ||
| 209 | os._exit(0) | ||
| 210 | if newbuilddir: | ||
| 211 | removebuilddir(newbuilddir) | ||
| 212 | except: | ||
| 213 | # Don't do anything with process children | ||
| 214 | if ourpid != os.getpid(): | ||
| 215 | os._exit(1) | ||
| 216 | # Try and report traceback on stream, but exit with error | ||
| 217 | # even if stream couldn't be created or something else | ||
| 218 | # goes wrong. The traceback is formatted to a string and | ||
| 219 | # written in one go to avoid interleaving lines from | ||
| 220 | # multiple failing children. | ||
| 221 | try: | ||
| 222 | stream.write(traceback.format_exc().encode('utf-8')) | ||
| 223 | except: | ||
| 224 | sys.stderr.write(traceback.format_exc()) | ||
| 225 | finally: | ||
| 226 | if newbuilddir: | ||
| 227 | removebuilddir(newbuilddir) | ||
| 228 | os._exit(1) | ||
| 229 | os._exit(0) | ||
| 230 | else: | ||
| 231 | os.close(c2pwrite) | ||
| 232 | stream = os.fdopen(c2pread, 'rb', 1) | ||
| 233 | test = ProtocolTestCase(stream) | ||
| 234 | result.append((test, numtests)) | ||
| 235 | return result, totaltests | ||
| 236 | |||
| 237 | def partition_tests(suite, count): | ||
| 238 | # Keep tests from the same class together but allow tests from modules | ||
| 239 | # to go to different processes to aid parallelisation. | ||
| 240 | modules = {} | ||
| 241 | for test in iterate_tests(suite): | ||
| 242 | m = test.__module__ + "." + test.__class__.__name__ | ||
| 243 | if m not in modules: | ||
| 244 | modules[m] = [] | ||
| 245 | modules[m].append(test) | ||
| 246 | |||
| 247 | # Simply divide the test blocks between the available processes | ||
| 248 | partitions = [list() for _ in range(count)] | ||
| 249 | for partition, m in zip(cycle(partitions), modules): | ||
| 250 | partition.extend(modules[m]) | ||
| 251 | |||
| 252 | # No point in empty threads so drop them | ||
| 253 | return [p for p in partitions if p] | ||
| 254 | |||
