diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2018-07-09 15:20:34 +0000 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2018-07-18 10:18:41 +0100 |
commit | ebd97e728ae1bd9442299c871a07a1b3b9f9efdf (patch) | |
tree | a76fec3b71434a64d9a9b8310434b78bceafdc89 /meta | |
parent | 05c32d2de1ee4681cc78cb107a158e9ab22c9619 (diff) | |
download | poky-ebd97e728ae1bd9442299c871a07a1b3b9f9efdf.tar.gz |
oeqa: Add selftest parallelisation support
This allows oe-selftest to take a -j option which specifies how much test
parallelisation to use. Currently this is "module" based with each module
being split and run in a separate build directory. Further splitting could
be done but this seems a good compromise between test setup and parallelism.
You need python-testtools and python-subunit installed to use this but only
when the -j option is specified.
See notes posted to the openedmbedded-architecture list for more details
about the design choices here.
Some of this functionality may make more sense in the oeqa core ultimately.
(From OE-Core rev: 326ababfd620ae5ea29bf486b9d68ba3d60cad30)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta')
-rw-r--r-- | meta/lib/oeqa/core/context.py | 10 | ||||
-rw-r--r-- | meta/lib/oeqa/core/runner.py | 24 | ||||
-rw-r--r-- | meta/lib/oeqa/core/utils/concurrencytest.py | 254 | ||||
-rw-r--r-- | meta/lib/oeqa/selftest/context.py | 8 |
4 files changed, 288 insertions, 8 deletions
diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py index 10481b44b6..8cdfbf834f 100644 --- a/meta/lib/oeqa/core/context.py +++ b/meta/lib/oeqa/core/context.py | |||
@@ -58,14 +58,20 @@ class OETestContext(object): | |||
58 | modules_required, filters) | 58 | modules_required, filters) |
59 | self.suites = self.loader.discover() | 59 | self.suites = self.loader.discover() |
60 | 60 | ||
61 | def runTests(self, skips=[]): | 61 | def runTests(self, processes=None, skips=[]): |
62 | self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True) | 62 | self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True) |
63 | 63 | ||
64 | # Dinamically skip those tests specified though arguments | 64 | # Dinamically skip those tests specified though arguments |
65 | self.skipTests(skips) | 65 | self.skipTests(skips) |
66 | 66 | ||
67 | self._run_start_time = time.time() | 67 | self._run_start_time = time.time() |
68 | result = self.runner.run(self.suites) | 68 | if processes: |
69 | from oeqa.core.utils.concurrencytest import ConcurrentTestSuite | ||
70 | |||
71 | concurrent_suite = ConcurrentTestSuite(self.suites, processes) | ||
72 | result = self.runner.run(concurrent_suite) | ||
73 | else: | ||
74 | result = self.runner.run(self.suites) | ||
69 | self._run_end_time = time.time() | 75 | self._run_end_time = time.time() |
70 | 76 | ||
71 | return result | 77 | return result |
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py index 219102c6b0..6adbe3827b 100644 --- a/meta/lib/oeqa/core/runner.py +++ b/meta/lib/oeqa/core/runner.py | |||
@@ -43,11 +43,17 @@ class OETestResult(_TestResult): | |||
43 | super(OETestResult, self).__init__(*args, **kwargs) | 43 | super(OETestResult, self).__init__(*args, **kwargs) |
44 | 44 | ||
45 | self.successes = [] | 45 | self.successes = [] |
46 | self.starttime = {} | ||
47 | self.endtime = {} | ||
48 | self.progressinfo = {} | ||
46 | 49 | ||
47 | self.tc = tc | 50 | self.tc = tc |
48 | self._tc_map_results() | 51 | self._tc_map_results() |
49 | 52 | ||
50 | def startTest(self, test): | 53 | def startTest(self, test): |
54 | # May have been set by concurrencytest | ||
55 | if test.id() not in self.starttime: | ||
56 | self.starttime[test.id()] = time.time() | ||
51 | super(OETestResult, self).startTest(test) | 57 | super(OETestResult, self).startTest(test) |
52 | 58 | ||
53 | def _tc_map_results(self): | 59 | def _tc_map_results(self): |
@@ -57,6 +63,12 @@ class OETestResult(_TestResult): | |||
57 | self.tc._results['expectedFailures'] = self.expectedFailures | 63 | self.tc._results['expectedFailures'] = self.expectedFailures |
58 | self.tc._results['successes'] = self.successes | 64 | self.tc._results['successes'] = self.successes |
59 | 65 | ||
66 | def stopTest(self, test): | ||
67 | self.endtime[test.id()] = time.time() | ||
68 | super(OETestResult, self).stopTest(test) | ||
69 | if test.id() in self.progressinfo: | ||
70 | print(self.progressinfo[test.id()]) | ||
71 | |||
60 | def logSummary(self, component, context_msg=''): | 72 | def logSummary(self, component, context_msg=''): |
61 | elapsed_time = self.tc._run_end_time - self.tc._run_start_time | 73 | elapsed_time = self.tc._run_end_time - self.tc._run_start_time |
62 | self.tc.logger.info("SUMMARY:") | 74 | self.tc.logger.info("SUMMARY:") |
@@ -141,12 +153,16 @@ class OETestResult(_TestResult): | |||
141 | if hasattr(d, 'oeid'): | 153 | if hasattr(d, 'oeid'): |
142 | oeid = d.oeid | 154 | oeid = d.oeid |
143 | 155 | ||
156 | t = "" | ||
157 | if case.id() in self.starttime and case.id() in self.endtime: | ||
158 | t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)" | ||
159 | |||
144 | if fail: | 160 | if fail: |
145 | self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), | 161 | self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(), |
146 | oeid, desc)) | 162 | oeid, desc, t)) |
147 | else: | 163 | else: |
148 | self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), | 164 | self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(), |
149 | oeid, 'UNKNOWN')) | 165 | oeid, 'UNKNOWN', t)) |
150 | 166 | ||
151 | class OEListTestsResult(object): | 167 | class OEListTestsResult(object): |
152 | def wasSuccessful(self): | 168 | def wasSuccessful(self): |
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py new file mode 100644 index 0000000000..850586516a --- /dev/null +++ b/meta/lib/oeqa/core/utils/concurrencytest.py | |||
@@ -0,0 +1,254 @@ | |||
1 | #!/usr/bin/env python3 | ||
2 | # | ||
3 | # Modified for use in OE by Richard Purdie, 2018 | ||
4 | # | ||
5 | # Modified by: Corey Goldberg, 2013 | ||
6 | # License: GPLv2+ | ||
7 | # | ||
8 | # Original code from: | ||
9 | # Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) | ||
10 | # Copyright (C) 2005-2011 Canonical Ltd | ||
11 | # License: GPLv2+ | ||
12 | |||
13 | import os | ||
14 | import sys | ||
15 | import traceback | ||
16 | import unittest | ||
17 | import subprocess | ||
18 | import testtools | ||
19 | import threading | ||
20 | import time | ||
21 | import io | ||
22 | |||
23 | from queue import Queue | ||
24 | from itertools import cycle | ||
25 | from subunit import ProtocolTestCase, TestProtocolClient | ||
26 | from subunit.test_results import AutoTimingTestResultDecorator | ||
27 | from testtools import ThreadsafeForwardingResult, iterate_tests | ||
28 | |||
29 | import bb.utils | ||
30 | import oe.path | ||
31 | |||
32 | _all__ = [ | ||
33 | 'ConcurrentTestSuite', | ||
34 | 'fork_for_tests', | ||
35 | 'partition_tests', | ||
36 | ] | ||
37 | |||
38 | # | ||
39 | # Patch the version from testtools to allow access to _test_start and allow | ||
40 | # computation of timing information and threading progress | ||
41 | # | ||
42 | class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): | ||
43 | |||
44 | def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests): | ||
45 | super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) | ||
46 | self.threadnum = threadnum | ||
47 | self.totalinprocess = totalinprocess | ||
48 | self.totaltests = totaltests | ||
49 | |||
50 | def _add_result_with_semaphore(self, method, test, *args, **kwargs): | ||
51 | self.semaphore.acquire() | ||
52 | try: | ||
53 | self.result.starttime[test.id()] = self._test_start.timestamp() | ||
54 | self.result.threadprogress[self.threadnum].append(test.id()) | ||
55 | totalprogress = sum(len(x) for x in self.result.threadprogress.values()) | ||
56 | self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % ( | ||
57 | self.threadnum, | ||
58 | len(self.result.threadprogress[self.threadnum]), | ||
59 | self.totalinprocess, | ||
60 | totalprogress, | ||
61 | self.totaltests, | ||
62 | "{0:.2f}".format(time.time()-self._test_start.timestamp()), | ||
63 | test.id()) | ||
64 | finally: | ||
65 | self.semaphore.release() | ||
66 | super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) | ||
67 | |||
68 | # | ||
69 | # A dummy structure to add to io.StringIO so that the .buffer object | ||
70 | # is available and accepts writes. This allows unittest with buffer=True | ||
71 | # to interact ok with subunit which wants to access sys.stdout.buffer. | ||
72 | # | ||
73 | class dummybuf(object): | ||
74 | def __init__(self, parent): | ||
75 | self.p = parent | ||
76 | def write(self, data): | ||
77 | self.p.write(data.decode("utf-8")) | ||
78 | |||
79 | # | ||
80 | # Taken from testtools.ConncurrencyTestSuite but modified for OE use | ||
81 | # | ||
82 | class ConcurrentTestSuite(unittest.TestSuite): | ||
83 | |||
84 | def __init__(self, suite, processes): | ||
85 | super(ConcurrentTestSuite, self).__init__([suite]) | ||
86 | self.processes = processes | ||
87 | |||
88 | def run(self, result): | ||
89 | tests, totaltests = fork_for_tests(self.processes, self) | ||
90 | try: | ||
91 | threads = {} | ||
92 | queue = Queue() | ||
93 | semaphore = threading.Semaphore(1) | ||
94 | result.threadprogress = {} | ||
95 | for i, (test, testnum) in enumerate(tests): | ||
96 | result.threadprogress[i] = [] | ||
97 | process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests) | ||
98 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output | ||
99 | # as per default in parent code | ||
100 | process_result.buffer = True | ||
101 | # We have to add a buffer object to stdout to keep subunit happy | ||
102 | process_result._stderr_buffer = io.StringIO() | ||
103 | process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer) | ||
104 | process_result._stdout_buffer = io.StringIO() | ||
105 | process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer) | ||
106 | reader_thread = threading.Thread( | ||
107 | target=self._run_test, args=(test, process_result, queue)) | ||
108 | threads[test] = reader_thread, process_result | ||
109 | reader_thread.start() | ||
110 | while threads: | ||
111 | finished_test = queue.get() | ||
112 | threads[finished_test][0].join() | ||
113 | del threads[finished_test] | ||
114 | except: | ||
115 | for thread, process_result in threads.values(): | ||
116 | process_result.stop() | ||
117 | raise | ||
118 | |||
119 | def _run_test(self, test, process_result, queue): | ||
120 | try: | ||
121 | try: | ||
122 | test.run(process_result) | ||
123 | except Exception: | ||
124 | # The run logic itself failed | ||
125 | case = testtools.ErrorHolder( | ||
126 | "broken-runner", | ||
127 | error=sys.exc_info()) | ||
128 | case.run(process_result) | ||
129 | finally: | ||
130 | queue.put(test) | ||
131 | |||
132 | def removebuilddir(d): | ||
133 | delay = 5 | ||
134 | while delay and os.path.exists(d + "/bitbake.lock"): | ||
135 | time.sleep(1) | ||
136 | delay = delay - 1 | ||
137 | bb.utils.prunedir(d) | ||
138 | |||
139 | def fork_for_tests(concurrency_num, suite): | ||
140 | result = [] | ||
141 | test_blocks = partition_tests(suite, concurrency_num) | ||
142 | # Clear the tests from the original suite so it doesn't keep them alive | ||
143 | suite._tests[:] = [] | ||
144 | totaltests = sum(len(x) for x in test_blocks) | ||
145 | for process_tests in test_blocks: | ||
146 | numtests = len(process_tests) | ||
147 | process_suite = unittest.TestSuite(process_tests) | ||
148 | # Also clear each split list so new suite has only reference | ||
149 | process_tests[:] = [] | ||
150 | c2pread, c2pwrite = os.pipe() | ||
151 | # Clear buffers before fork to avoid duplicate output | ||
152 | sys.stdout.flush() | ||
153 | sys.stderr.flush() | ||
154 | pid = os.fork() | ||
155 | if pid == 0: | ||
156 | ourpid = os.getpid() | ||
157 | try: | ||
158 | newbuilddir = None | ||
159 | stream = os.fdopen(c2pwrite, 'wb', 1) | ||
160 | os.close(c2pread) | ||
161 | |||
162 | # Create a new separate BUILDDIR for each group of tests | ||
163 | if 'BUILDDIR' in os.environ: | ||
164 | builddir = os.environ['BUILDDIR'] | ||
165 | newbuilddir = builddir + "-st-" + str(ourpid) | ||
166 | selftestdir = os.path.abspath(builddir + "/../meta-selftest") | ||
167 | newselftestdir = newbuilddir + "/meta-selftest" | ||
168 | |||
169 | bb.utils.mkdirhier(newbuilddir) | ||
170 | oe.path.copytree(builddir + "/conf", newbuilddir + "/conf") | ||
171 | oe.path.copytree(builddir + "/cache", newbuilddir + "/cache") | ||
172 | oe.path.copytree(selftestdir, newselftestdir) | ||
173 | |||
174 | for e in os.environ: | ||
175 | if builddir in os.environ[e]: | ||
176 | os.environ[e] = os.environ[e].replace(builddir, newbuilddir) | ||
177 | |||
178 | subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True) | ||
179 | |||
180 | # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow | ||
181 | subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True) | ||
182 | |||
183 | os.chdir(newbuilddir) | ||
184 | |||
185 | for t in process_suite: | ||
186 | if not hasattr(t, "tc"): | ||
187 | continue | ||
188 | cp = t.tc.config_paths | ||
189 | for p in cp: | ||
190 | if selftestdir in cp[p] and newselftestdir not in cp[p]: | ||
191 | cp[p] = cp[p].replace(selftestdir, newselftestdir) | ||
192 | if builddir in cp[p] and newbuilddir not in cp[p]: | ||
193 | cp[p] = cp[p].replace(builddir, newbuilddir) | ||
194 | |||
195 | # Leave stderr and stdout open so we can see test noise | ||
196 | # Close stdin so that the child goes away if it decides to | ||
197 | # read from stdin (otherwise its a roulette to see what | ||
198 | # child actually gets keystrokes for pdb etc). | ||
199 | newsi = os.open(os.devnull, os.O_RDWR) | ||
200 | os.dup2(newsi, sys.stdin.fileno()) | ||
201 | |||
202 | subunit_client = TestProtocolClient(stream) | ||
203 | # Force buffering of stdout/stderr so the console doesn't get corrupted by test output | ||
204 | # as per default in parent code | ||
205 | subunit_client.buffer = True | ||
206 | subunit_result = AutoTimingTestResultDecorator(subunit_client) | ||
207 | process_suite.run(subunit_result) | ||
208 | if ourpid != os.getpid(): | ||
209 | os._exit(0) | ||
210 | if newbuilddir: | ||
211 | removebuilddir(newbuilddir) | ||
212 | except: | ||
213 | # Don't do anything with process children | ||
214 | if ourpid != os.getpid(): | ||
215 | os._exit(1) | ||
216 | # Try and report traceback on stream, but exit with error | ||
217 | # even if stream couldn't be created or something else | ||
218 | # goes wrong. The traceback is formatted to a string and | ||
219 | # written in one go to avoid interleaving lines from | ||
220 | # multiple failing children. | ||
221 | try: | ||
222 | stream.write(traceback.format_exc().encode('utf-8')) | ||
223 | except: | ||
224 | sys.stderr.write(traceback.format_exc()) | ||
225 | finally: | ||
226 | if newbuilddir: | ||
227 | removebuilddir(newbuilddir) | ||
228 | os._exit(1) | ||
229 | os._exit(0) | ||
230 | else: | ||
231 | os.close(c2pwrite) | ||
232 | stream = os.fdopen(c2pread, 'rb', 1) | ||
233 | test = ProtocolTestCase(stream) | ||
234 | result.append((test, numtests)) | ||
235 | return result, totaltests | ||
236 | |||
237 | def partition_tests(suite, count): | ||
238 | # Keep tests from the same class together but allow tests from modules | ||
239 | # to go to different processes to aid parallelisation. | ||
240 | modules = {} | ||
241 | for test in iterate_tests(suite): | ||
242 | m = test.__module__ + "." + test.__class__.__name__ | ||
243 | if m not in modules: | ||
244 | modules[m] = [] | ||
245 | modules[m].append(test) | ||
246 | |||
247 | # Simply divide the test blocks between the available processes | ||
248 | partitions = [list() for _ in range(count)] | ||
249 | for partition, m in zip(cycle(partitions), modules): | ||
250 | partition.extend(modules[m]) | ||
251 | |||
252 | # No point in empty threads so drop them | ||
253 | return [p for p in partitions if p] | ||
254 | |||
diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py index 9e90d3c256..c937b8171c 100644 --- a/meta/lib/oeqa/selftest/context.py +++ b/meta/lib/oeqa/selftest/context.py | |||
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext): | |||
25 | self.custommachine = None | 25 | self.custommachine = None |
26 | self.config_paths = config_paths | 26 | self.config_paths = config_paths |
27 | 27 | ||
28 | def runTests(self, machine=None, skips=[]): | 28 | def runTests(self, processes=None, machine=None, skips=[]): |
29 | if machine: | 29 | if machine: |
30 | self.custommachine = machine | 30 | self.custommachine = machine |
31 | if machine == 'random': | 31 | if machine == 'random': |
32 | self.custommachine = choice(self.machines) | 32 | self.custommachine = choice(self.machines) |
33 | self.logger.info('Run tests with custom MACHINE set to: %s' % \ | 33 | self.logger.info('Run tests with custom MACHINE set to: %s' % \ |
34 | self.custommachine) | 34 | self.custommachine) |
35 | return super(OESelftestTestContext, self).runTests(skips) | 35 | return super(OESelftestTestContext, self).runTests(processes, skips) |
36 | 36 | ||
37 | def listTests(self, display_type, machine=None): | 37 | def listTests(self, display_type, machine=None): |
38 | return super(OESelftestTestContext, self).listTests(display_type) | 38 | return super(OESelftestTestContext, self).listTests(display_type) |
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor): | |||
68 | action="store_true", default=False, | 68 | action="store_true", default=False, |
69 | help='List all available tests.') | 69 | help='List all available tests.') |
70 | 70 | ||
71 | parser.add_argument('-j', '--num-processes', dest='processes', action='store', | ||
72 | type=int, help="number of processes to execute in parallel with") | ||
73 | |||
71 | parser.add_argument('--machine', required=False, choices=['random', 'all'], | 74 | parser.add_argument('--machine', required=False, choices=['random', 'all'], |
72 | help='Run tests on different machines (random/all).') | 75 | help='Run tests on different machines (random/all).') |
73 | 76 | ||
@@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor): | |||
137 | self.tc_kwargs['init']['config_paths']['bblayers_backup']) | 140 | self.tc_kwargs['init']['config_paths']['bblayers_backup']) |
138 | 141 | ||
139 | self.tc_kwargs['run']['skips'] = args.skips | 142 | self.tc_kwargs['run']['skips'] = args.skips |
143 | self.tc_kwargs['run']['processes'] = args.processes | ||
140 | 144 | ||
141 | def _pre_run(self): | 145 | def _pre_run(self): |
142 | def _check_required_env_variables(vars): | 146 | def _check_required_env_variables(vars): |