summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-09 15:20:34 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-18 10:18:41 +0100
commitebd97e728ae1bd9442299c871a07a1b3b9f9efdf (patch)
treea76fec3b71434a64d9a9b8310434b78bceafdc89
parent05c32d2de1ee4681cc78cb107a158e9ab22c9619 (diff)
downloadpoky-ebd97e728ae1bd9442299c871a07a1b3b9f9efdf.tar.gz
oeqa: Add selftest parallelisation support
This allows oe-selftest to take a -j option which specifies how much test parallelisation to use. Currently this is "module" based with each module being split and run in a separate build directory. Further splitting could be done but this seems a good compromise between test setup and parallelism. You need python-testtools and python-subunit installed to use this but only when the -j option is specified. See notes posted to the openedmbedded-architecture list for more details about the design choices here. Some of this functionality may make more sense in the oeqa core ultimately. (From OE-Core rev: 326ababfd620ae5ea29bf486b9d68ba3d60cad30) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--meta/lib/oeqa/core/context.py10
-rw-r--r--meta/lib/oeqa/core/runner.py24
-rw-r--r--meta/lib/oeqa/core/utils/concurrencytest.py254
-rw-r--r--meta/lib/oeqa/selftest/context.py8
4 files changed, 288 insertions, 8 deletions
diff --git a/meta/lib/oeqa/core/context.py b/meta/lib/oeqa/core/context.py
index 10481b44b6..8cdfbf834f 100644
--- a/meta/lib/oeqa/core/context.py
+++ b/meta/lib/oeqa/core/context.py
@@ -58,14 +58,20 @@ class OETestContext(object):
58 modules_required, filters) 58 modules_required, filters)
59 self.suites = self.loader.discover() 59 self.suites = self.loader.discover()
60 60
61 def runTests(self, skips=[]): 61 def runTests(self, processes=None, skips=[]):
62 self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True) 62 self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True)
63 63
64 # Dinamically skip those tests specified though arguments 64 # Dinamically skip those tests specified though arguments
65 self.skipTests(skips) 65 self.skipTests(skips)
66 66
67 self._run_start_time = time.time() 67 self._run_start_time = time.time()
68 result = self.runner.run(self.suites) 68 if processes:
69 from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
70
71 concurrent_suite = ConcurrentTestSuite(self.suites, processes)
72 result = self.runner.run(concurrent_suite)
73 else:
74 result = self.runner.run(self.suites)
69 self._run_end_time = time.time() 75 self._run_end_time = time.time()
70 76
71 return result 77 return result
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
index 219102c6b0..6adbe3827b 100644
--- a/meta/lib/oeqa/core/runner.py
+++ b/meta/lib/oeqa/core/runner.py
@@ -43,11 +43,17 @@ class OETestResult(_TestResult):
43 super(OETestResult, self).__init__(*args, **kwargs) 43 super(OETestResult, self).__init__(*args, **kwargs)
44 44
45 self.successes = [] 45 self.successes = []
46 self.starttime = {}
47 self.endtime = {}
48 self.progressinfo = {}
46 49
47 self.tc = tc 50 self.tc = tc
48 self._tc_map_results() 51 self._tc_map_results()
49 52
50 def startTest(self, test): 53 def startTest(self, test):
54 # May have been set by concurrencytest
55 if test.id() not in self.starttime:
56 self.starttime[test.id()] = time.time()
51 super(OETestResult, self).startTest(test) 57 super(OETestResult, self).startTest(test)
52 58
53 def _tc_map_results(self): 59 def _tc_map_results(self):
@@ -57,6 +63,12 @@ class OETestResult(_TestResult):
57 self.tc._results['expectedFailures'] = self.expectedFailures 63 self.tc._results['expectedFailures'] = self.expectedFailures
58 self.tc._results['successes'] = self.successes 64 self.tc._results['successes'] = self.successes
59 65
66 def stopTest(self, test):
67 self.endtime[test.id()] = time.time()
68 super(OETestResult, self).stopTest(test)
69 if test.id() in self.progressinfo:
70 print(self.progressinfo[test.id()])
71
60 def logSummary(self, component, context_msg=''): 72 def logSummary(self, component, context_msg=''):
61 elapsed_time = self.tc._run_end_time - self.tc._run_start_time 73 elapsed_time = self.tc._run_end_time - self.tc._run_start_time
62 self.tc.logger.info("SUMMARY:") 74 self.tc.logger.info("SUMMARY:")
@@ -141,12 +153,16 @@ class OETestResult(_TestResult):
141 if hasattr(d, 'oeid'): 153 if hasattr(d, 'oeid'):
142 oeid = d.oeid 154 oeid = d.oeid
143 155
156 t = ""
157 if case.id() in self.starttime and case.id() in self.endtime:
158 t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
159
144 if fail: 160 if fail:
145 self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), 161 self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
146 oeid, desc)) 162 oeid, desc, t))
147 else: 163 else:
148 self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), 164 self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
149 oeid, 'UNKNOWN')) 165 oeid, 'UNKNOWN', t))
150 166
151class OEListTestsResult(object): 167class OEListTestsResult(object):
152 def wasSuccessful(self): 168 def wasSuccessful(self):
diff --git a/meta/lib/oeqa/core/utils/concurrencytest.py b/meta/lib/oeqa/core/utils/concurrencytest.py
new file mode 100644
index 0000000000..850586516a
--- /dev/null
+++ b/meta/lib/oeqa/core/utils/concurrencytest.py
@@ -0,0 +1,254 @@
1#!/usr/bin/env python3
2#
3# Modified for use in OE by Richard Purdie, 2018
4#
5# Modified by: Corey Goldberg, 2013
6# License: GPLv2+
7#
8# Original code from:
9# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
10# Copyright (C) 2005-2011 Canonical Ltd
11# License: GPLv2+
12
13import os
14import sys
15import traceback
16import unittest
17import subprocess
18import testtools
19import threading
20import time
21import io
22
23from queue import Queue
24from itertools import cycle
25from subunit import ProtocolTestCase, TestProtocolClient
26from subunit.test_results import AutoTimingTestResultDecorator
27from testtools import ThreadsafeForwardingResult, iterate_tests
28
29import bb.utils
30import oe.path
31
32_all__ = [
33 'ConcurrentTestSuite',
34 'fork_for_tests',
35 'partition_tests',
36]
37
38#
39# Patch the version from testtools to allow access to _test_start and allow
40# computation of timing information and threading progress
41#
42class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
43
44 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
45 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
46 self.threadnum = threadnum
47 self.totalinprocess = totalinprocess
48 self.totaltests = totaltests
49
50 def _add_result_with_semaphore(self, method, test, *args, **kwargs):
51 self.semaphore.acquire()
52 try:
53 self.result.starttime[test.id()] = self._test_start.timestamp()
54 self.result.threadprogress[self.threadnum].append(test.id())
55 totalprogress = sum(len(x) for x in self.result.threadprogress.values())
56 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
57 self.threadnum,
58 len(self.result.threadprogress[self.threadnum]),
59 self.totalinprocess,
60 totalprogress,
61 self.totaltests,
62 "{0:.2f}".format(time.time()-self._test_start.timestamp()),
63 test.id())
64 finally:
65 self.semaphore.release()
66 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
67
68#
69# A dummy structure to add to io.StringIO so that the .buffer object
70# is available and accepts writes. This allows unittest with buffer=True
71# to interact ok with subunit which wants to access sys.stdout.buffer.
72#
73class dummybuf(object):
74 def __init__(self, parent):
75 self.p = parent
76 def write(self, data):
77 self.p.write(data.decode("utf-8"))
78
79#
80# Taken from testtools.ConncurrencyTestSuite but modified for OE use
81#
82class ConcurrentTestSuite(unittest.TestSuite):
83
84 def __init__(self, suite, processes):
85 super(ConcurrentTestSuite, self).__init__([suite])
86 self.processes = processes
87
88 def run(self, result):
89 tests, totaltests = fork_for_tests(self.processes, self)
90 try:
91 threads = {}
92 queue = Queue()
93 semaphore = threading.Semaphore(1)
94 result.threadprogress = {}
95 for i, (test, testnum) in enumerate(tests):
96 result.threadprogress[i] = []
97 process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
98 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
99 # as per default in parent code
100 process_result.buffer = True
101 # We have to add a buffer object to stdout to keep subunit happy
102 process_result._stderr_buffer = io.StringIO()
103 process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
104 process_result._stdout_buffer = io.StringIO()
105 process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
106 reader_thread = threading.Thread(
107 target=self._run_test, args=(test, process_result, queue))
108 threads[test] = reader_thread, process_result
109 reader_thread.start()
110 while threads:
111 finished_test = queue.get()
112 threads[finished_test][0].join()
113 del threads[finished_test]
114 except:
115 for thread, process_result in threads.values():
116 process_result.stop()
117 raise
118
119 def _run_test(self, test, process_result, queue):
120 try:
121 try:
122 test.run(process_result)
123 except Exception:
124 # The run logic itself failed
125 case = testtools.ErrorHolder(
126 "broken-runner",
127 error=sys.exc_info())
128 case.run(process_result)
129 finally:
130 queue.put(test)
131
132def removebuilddir(d):
133 delay = 5
134 while delay and os.path.exists(d + "/bitbake.lock"):
135 time.sleep(1)
136 delay = delay - 1
137 bb.utils.prunedir(d)
138
139def fork_for_tests(concurrency_num, suite):
140 result = []
141 test_blocks = partition_tests(suite, concurrency_num)
142 # Clear the tests from the original suite so it doesn't keep them alive
143 suite._tests[:] = []
144 totaltests = sum(len(x) for x in test_blocks)
145 for process_tests in test_blocks:
146 numtests = len(process_tests)
147 process_suite = unittest.TestSuite(process_tests)
148 # Also clear each split list so new suite has only reference
149 process_tests[:] = []
150 c2pread, c2pwrite = os.pipe()
151 # Clear buffers before fork to avoid duplicate output
152 sys.stdout.flush()
153 sys.stderr.flush()
154 pid = os.fork()
155 if pid == 0:
156 ourpid = os.getpid()
157 try:
158 newbuilddir = None
159 stream = os.fdopen(c2pwrite, 'wb', 1)
160 os.close(c2pread)
161
162 # Create a new separate BUILDDIR for each group of tests
163 if 'BUILDDIR' in os.environ:
164 builddir = os.environ['BUILDDIR']
165 newbuilddir = builddir + "-st-" + str(ourpid)
166 selftestdir = os.path.abspath(builddir + "/../meta-selftest")
167 newselftestdir = newbuilddir + "/meta-selftest"
168
169 bb.utils.mkdirhier(newbuilddir)
170 oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
171 oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
172 oe.path.copytree(selftestdir, newselftestdir)
173
174 for e in os.environ:
175 if builddir in os.environ[e]:
176 os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
177
178 subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
179
180 # Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
181 subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
182
183 os.chdir(newbuilddir)
184
185 for t in process_suite:
186 if not hasattr(t, "tc"):
187 continue
188 cp = t.tc.config_paths
189 for p in cp:
190 if selftestdir in cp[p] and newselftestdir not in cp[p]:
191 cp[p] = cp[p].replace(selftestdir, newselftestdir)
192 if builddir in cp[p] and newbuilddir not in cp[p]:
193 cp[p] = cp[p].replace(builddir, newbuilddir)
194
195 # Leave stderr and stdout open so we can see test noise
196 # Close stdin so that the child goes away if it decides to
197 # read from stdin (otherwise its a roulette to see what
198 # child actually gets keystrokes for pdb etc).
199 newsi = os.open(os.devnull, os.O_RDWR)
200 os.dup2(newsi, sys.stdin.fileno())
201
202 subunit_client = TestProtocolClient(stream)
203 # Force buffering of stdout/stderr so the console doesn't get corrupted by test output
204 # as per default in parent code
205 subunit_client.buffer = True
206 subunit_result = AutoTimingTestResultDecorator(subunit_client)
207 process_suite.run(subunit_result)
208 if ourpid != os.getpid():
209 os._exit(0)
210 if newbuilddir:
211 removebuilddir(newbuilddir)
212 except:
213 # Don't do anything with process children
214 if ourpid != os.getpid():
215 os._exit(1)
216 # Try and report traceback on stream, but exit with error
217 # even if stream couldn't be created or something else
218 # goes wrong. The traceback is formatted to a string and
219 # written in one go to avoid interleaving lines from
220 # multiple failing children.
221 try:
222 stream.write(traceback.format_exc().encode('utf-8'))
223 except:
224 sys.stderr.write(traceback.format_exc())
225 finally:
226 if newbuilddir:
227 removebuilddir(newbuilddir)
228 os._exit(1)
229 os._exit(0)
230 else:
231 os.close(c2pwrite)
232 stream = os.fdopen(c2pread, 'rb', 1)
233 test = ProtocolTestCase(stream)
234 result.append((test, numtests))
235 return result, totaltests
236
237def partition_tests(suite, count):
238 # Keep tests from the same class together but allow tests from modules
239 # to go to different processes to aid parallelisation.
240 modules = {}
241 for test in iterate_tests(suite):
242 m = test.__module__ + "." + test.__class__.__name__
243 if m not in modules:
244 modules[m] = []
245 modules[m].append(test)
246
247 # Simply divide the test blocks between the available processes
248 partitions = [list() for _ in range(count)]
249 for partition, m in zip(cycle(partitions), modules):
250 partition.extend(modules[m])
251
252 # No point in empty threads so drop them
253 return [p for p in partitions if p]
254
diff --git a/meta/lib/oeqa/selftest/context.py b/meta/lib/oeqa/selftest/context.py
index 9e90d3c256..c937b8171c 100644
--- a/meta/lib/oeqa/selftest/context.py
+++ b/meta/lib/oeqa/selftest/context.py
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
25 self.custommachine = None 25 self.custommachine = None
26 self.config_paths = config_paths 26 self.config_paths = config_paths
27 27
28 def runTests(self, machine=None, skips=[]): 28 def runTests(self, processes=None, machine=None, skips=[]):
29 if machine: 29 if machine:
30 self.custommachine = machine 30 self.custommachine = machine
31 if machine == 'random': 31 if machine == 'random':
32 self.custommachine = choice(self.machines) 32 self.custommachine = choice(self.machines)
33 self.logger.info('Run tests with custom MACHINE set to: %s' % \ 33 self.logger.info('Run tests with custom MACHINE set to: %s' % \
34 self.custommachine) 34 self.custommachine)
35 return super(OESelftestTestContext, self).runTests(skips) 35 return super(OESelftestTestContext, self).runTests(processes, skips)
36 36
37 def listTests(self, display_type, machine=None): 37 def listTests(self, display_type, machine=None):
38 return super(OESelftestTestContext, self).listTests(display_type) 38 return super(OESelftestTestContext, self).listTests(display_type)
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
68 action="store_true", default=False, 68 action="store_true", default=False,
69 help='List all available tests.') 69 help='List all available tests.')
70 70
71 parser.add_argument('-j', '--num-processes', dest='processes', action='store',
72 type=int, help="number of processes to execute in parallel with")
73
71 parser.add_argument('--machine', required=False, choices=['random', 'all'], 74 parser.add_argument('--machine', required=False, choices=['random', 'all'],
72 help='Run tests on different machines (random/all).') 75 help='Run tests on different machines (random/all).')
73 76
@@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
137 self.tc_kwargs['init']['config_paths']['bblayers_backup']) 140 self.tc_kwargs['init']['config_paths']['bblayers_backup'])
138 141
139 self.tc_kwargs['run']['skips'] = args.skips 142 self.tc_kwargs['run']['skips'] = args.skips
143 self.tc_kwargs['run']['processes'] = args.processes
140 144
141 def _pre_run(self): 145 def _pre_run(self):
142 def _check_required_env_variables(vars): 146 def _check_required_env_variables(vars):