diff options
-rw-r--r-- | meta/lib/oe/utils.py | 70 | ||||
-rw-r--r-- | meta/lib/oeqa/selftest/cases/oelib/utils.py | 46 |
2 files changed, 115 insertions, 1 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 6aed6dc993..753b577555 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py | |||
@@ -1,4 +1,6 @@ | |||
1 | import subprocess | 1 | import subprocess |
2 | import multiprocessing | ||
3 | import traceback | ||
2 | 4 | ||
3 | def read_file(filename): | 5 | def read_file(filename): |
4 | try: | 6 | try: |
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function): | |||
280 | 282 | ||
281 | return results | 283 | return results |
282 | 284 | ||
285 | # For each item in items, call the function 'target' with item as the first | ||
286 | # argument, extraargs as the other arguments and handle any exceptions in the | ||
287 | # parent thread | ||
288 | def multiprocess_launch(target, items, d, extraargs=None): | ||
289 | |||
290 | class ProcessLaunch(multiprocessing.Process): | ||
291 | def __init__(self, *args, **kwargs): | ||
292 | multiprocessing.Process.__init__(self, *args, **kwargs) | ||
293 | self._pconn, self._cconn = multiprocessing.Pipe() | ||
294 | self._exception = None | ||
295 | self._result = None | ||
296 | |||
297 | def run(self): | ||
298 | try: | ||
299 | ret = self._target(*self._args, **self._kwargs) | ||
300 | self._cconn.send((None, ret)) | ||
301 | except Exception as e: | ||
302 | tb = traceback.format_exc() | ||
303 | self._cconn.send((e, tb)) | ||
304 | |||
305 | def update(self): | ||
306 | if self._pconn.poll(): | ||
307 | (e, tb) = self._pconn.recv() | ||
308 | if e is not None: | ||
309 | self._exception = (e, tb) | ||
310 | else: | ||
311 | self._result = tb | ||
312 | |||
313 | @property | ||
314 | def exception(self): | ||
315 | self.update() | ||
316 | return self._exception | ||
317 | |||
318 | @property | ||
319 | def result(self): | ||
320 | self.update() | ||
321 | return self._result | ||
322 | |||
323 | max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1) | ||
324 | launched = [] | ||
325 | errors = [] | ||
326 | results = [] | ||
327 | items = list(items) | ||
328 | while (items and not errors) or launched: | ||
329 | if not errors and items and len(launched) < max_process: | ||
330 | args = (items.pop(),) | ||
331 | if extraargs is not None: | ||
332 | args = args + extraargs | ||
333 | p = ProcessLaunch(target=target, args=args) | ||
334 | p.start() | ||
335 | launched.append(p) | ||
336 | for q in launched: | ||
337 | # The finished processes are joined when calling is_alive() | ||
338 | if not q.is_alive(): | ||
339 | if q.exception: | ||
340 | errors.append(q.exception) | ||
341 | if q.result: | ||
342 | results.append(q.result) | ||
343 | launched.remove(q) | ||
344 | # Paranoia doesn't hurt | ||
345 | for p in launched: | ||
346 | p.join() | ||
347 | if errors: | ||
348 | for (e, tb) in errors: | ||
349 | bb.error(str(tb)) | ||
350 | bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above") | ||
351 | return results | ||
352 | |||
283 | def squashspaces(string): | 353 | def squashspaces(string): |
284 | import re | 354 | import re |
285 | return re.sub("\s+", " ", string).strip() | 355 | return re.sub("\s+", " ", string).strip() |
diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py index 9fb6c1576e..275aeda74e 100644 --- a/meta/lib/oeqa/selftest/cases/oelib/utils.py +++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py | |||
@@ -1,5 +1,8 @@ | |||
1 | import sys | ||
1 | from unittest.case import TestCase | 2 | from unittest.case import TestCase |
2 | from oe.utils import packages_filter_out_system, trim_version | 3 | from contextlib import contextmanager |
4 | from io import StringIO | ||
5 | from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch | ||
3 | 6 | ||
4 | class TestPackagesFilterOutSystem(TestCase): | 7 | class TestPackagesFilterOutSystem(TestCase): |
5 | def test_filter(self): | 8 | def test_filter(self): |
@@ -49,3 +52,44 @@ class TestTrimVersion(TestCase): | |||
49 | self.assertEqual(trim_version("1.2.3", 2), "1.2") | 52 | self.assertEqual(trim_version("1.2.3", 2), "1.2") |
50 | self.assertEqual(trim_version("1.2.3", 3), "1.2.3") | 53 | self.assertEqual(trim_version("1.2.3", 3), "1.2.3") |
51 | self.assertEqual(trim_version("1.2.3", 4), "1.2.3") | 54 | self.assertEqual(trim_version("1.2.3", 4), "1.2.3") |
55 | |||
56 | |||
57 | class TestMultiprocessLaunch(TestCase): | ||
58 | |||
59 | def test_multiprocesslaunch(self): | ||
60 | import bb | ||
61 | |||
62 | def testfunction(item, d): | ||
63 | if item == "2" or item == "1": | ||
64 | raise KeyError("Invalid number %s" % item) | ||
65 | return "Found %s" % item | ||
66 | |||
67 | def dummyerror(msg): | ||
68 | print("ERROR: %s" % msg) | ||
69 | |||
70 | @contextmanager | ||
71 | def captured_output(): | ||
72 | new_out, new_err = StringIO(), StringIO() | ||
73 | old_out, old_err = sys.stdout, sys.stderr | ||
74 | try: | ||
75 | sys.stdout, sys.stderr = new_out, new_err | ||
76 | yield sys.stdout, sys.stderr | ||
77 | finally: | ||
78 | sys.stdout, sys.stderr = old_out, old_err | ||
79 | |||
80 | d = bb.data_smart.DataSmart() | ||
81 | bb.error = dummyerror | ||
82 | |||
83 | # Assert the function returns the right results | ||
84 | result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,)) | ||
85 | self.assertIn("Found 3", result) | ||
86 | self.assertIn("Found 4", result) | ||
87 | self.assertIn("Found 5", result) | ||
88 | self.assertIn("Found 6", result) | ||
89 | self.assertEqual(len(result), 4) | ||
90 | |||
91 | # Assert the function prints exceptions | ||
92 | with captured_output() as (out, err): | ||
93 | self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,)) | ||
94 | self.assertIn("KeyError: 'Invalid number 1'", out.getvalue()) | ||
95 | self.assertIn("KeyError: 'Invalid number 2'", out.getvalue()) | ||