diff options
| -rw-r--r-- | meta/lib/oe/utils.py | 70 | ||||
| -rw-r--r-- | meta/lib/oeqa/selftest/cases/oelib/utils.py | 46 |
2 files changed, 115 insertions, 1 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 6aed6dc993..753b577555 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py | |||
| @@ -1,4 +1,6 @@ | |||
| 1 | import subprocess | 1 | import subprocess |
| 2 | import multiprocessing | ||
| 3 | import traceback | ||
| 2 | 4 | ||
| 3 | def read_file(filename): | 5 | def read_file(filename): |
| 4 | try: | 6 | try: |
| @@ -280,6 +282,74 @@ def multiprocess_exec(commands, function): | |||
| 280 | 282 | ||
| 281 | return results | 283 | return results |
| 282 | 284 | ||
| 285 | # For each item in items, call the function 'target' with item as the first | ||
| 286 | # argument, extraargs as the other arguments and handle any exceptions in the | ||
| 287 | # parent thread | ||
| 288 | def multiprocess_launch(target, items, d, extraargs=None): | ||
| 289 | |||
| 290 | class ProcessLaunch(multiprocessing.Process): | ||
| 291 | def __init__(self, *args, **kwargs): | ||
| 292 | multiprocessing.Process.__init__(self, *args, **kwargs) | ||
| 293 | self._pconn, self._cconn = multiprocessing.Pipe() | ||
| 294 | self._exception = None | ||
| 295 | self._result = None | ||
| 296 | |||
| 297 | def run(self): | ||
| 298 | try: | ||
| 299 | ret = self._target(*self._args, **self._kwargs) | ||
| 300 | self._cconn.send((None, ret)) | ||
| 301 | except Exception as e: | ||
| 302 | tb = traceback.format_exc() | ||
| 303 | self._cconn.send((e, tb)) | ||
| 304 | |||
| 305 | def update(self): | ||
| 306 | if self._pconn.poll(): | ||
| 307 | (e, tb) = self._pconn.recv() | ||
| 308 | if e is not None: | ||
| 309 | self._exception = (e, tb) | ||
| 310 | else: | ||
| 311 | self._result = tb | ||
| 312 | |||
| 313 | @property | ||
| 314 | def exception(self): | ||
| 315 | self.update() | ||
| 316 | return self._exception | ||
| 317 | |||
| 318 | @property | ||
| 319 | def result(self): | ||
| 320 | self.update() | ||
| 321 | return self._result | ||
| 322 | |||
| 323 | max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1) | ||
| 324 | launched = [] | ||
| 325 | errors = [] | ||
| 326 | results = [] | ||
| 327 | items = list(items) | ||
| 328 | while (items and not errors) or launched: | ||
| 329 | if not errors and items and len(launched) < max_process: | ||
| 330 | args = (items.pop(),) | ||
| 331 | if extraargs is not None: | ||
| 332 | args = args + extraargs | ||
| 333 | p = ProcessLaunch(target=target, args=args) | ||
| 334 | p.start() | ||
| 335 | launched.append(p) | ||
| 336 | for q in launched: | ||
| 337 | # The finished processes are joined when calling is_alive() | ||
| 338 | if not q.is_alive(): | ||
| 339 | if q.exception: | ||
| 340 | errors.append(q.exception) | ||
| 341 | if q.result: | ||
| 342 | results.append(q.result) | ||
| 343 | launched.remove(q) | ||
| 344 | # Paranoia doesn't hurt | ||
| 345 | for p in launched: | ||
| 346 | p.join() | ||
| 347 | if errors: | ||
| 348 | for (e, tb) in errors: | ||
| 349 | bb.error(str(tb)) | ||
| 350 | bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above") | ||
| 351 | return results | ||
| 352 | |||
| 283 | def squashspaces(string): | 353 | def squashspaces(string): |
| 284 | import re | 354 | import re |
| 285 | return re.sub("\s+", " ", string).strip() | 355 | return re.sub("\s+", " ", string).strip() |
diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py index 9fb6c1576e..275aeda74e 100644 --- a/meta/lib/oeqa/selftest/cases/oelib/utils.py +++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py | |||
| @@ -1,5 +1,8 @@ | |||
| 1 | import sys | ||
| 1 | from unittest.case import TestCase | 2 | from unittest.case import TestCase |
| 2 | from oe.utils import packages_filter_out_system, trim_version | 3 | from contextlib import contextmanager |
| 4 | from io import StringIO | ||
| 5 | from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch | ||
| 3 | 6 | ||
| 4 | class TestPackagesFilterOutSystem(TestCase): | 7 | class TestPackagesFilterOutSystem(TestCase): |
| 5 | def test_filter(self): | 8 | def test_filter(self): |
| @@ -49,3 +52,44 @@ class TestTrimVersion(TestCase): | |||
| 49 | self.assertEqual(trim_version("1.2.3", 2), "1.2") | 52 | self.assertEqual(trim_version("1.2.3", 2), "1.2") |
| 50 | self.assertEqual(trim_version("1.2.3", 3), "1.2.3") | 53 | self.assertEqual(trim_version("1.2.3", 3), "1.2.3") |
| 51 | self.assertEqual(trim_version("1.2.3", 4), "1.2.3") | 54 | self.assertEqual(trim_version("1.2.3", 4), "1.2.3") |
| 55 | |||
| 56 | |||
| 57 | class TestMultiprocessLaunch(TestCase): | ||
| 58 | |||
| 59 | def test_multiprocesslaunch(self): | ||
| 60 | import bb | ||
| 61 | |||
| 62 | def testfunction(item, d): | ||
| 63 | if item == "2" or item == "1": | ||
| 64 | raise KeyError("Invalid number %s" % item) | ||
| 65 | return "Found %s" % item | ||
| 66 | |||
| 67 | def dummyerror(msg): | ||
| 68 | print("ERROR: %s" % msg) | ||
| 69 | |||
| 70 | @contextmanager | ||
| 71 | def captured_output(): | ||
| 72 | new_out, new_err = StringIO(), StringIO() | ||
| 73 | old_out, old_err = sys.stdout, sys.stderr | ||
| 74 | try: | ||
| 75 | sys.stdout, sys.stderr = new_out, new_err | ||
| 76 | yield sys.stdout, sys.stderr | ||
| 77 | finally: | ||
| 78 | sys.stdout, sys.stderr = old_out, old_err | ||
| 79 | |||
| 80 | d = bb.data_smart.DataSmart() | ||
| 81 | bb.error = dummyerror | ||
| 82 | |||
| 83 | # Assert the function returns the right results | ||
| 84 | result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,)) | ||
| 85 | self.assertIn("Found 3", result) | ||
| 86 | self.assertIn("Found 4", result) | ||
| 87 | self.assertIn("Found 5", result) | ||
| 88 | self.assertIn("Found 6", result) | ||
| 89 | self.assertEqual(len(result), 4) | ||
| 90 | |||
| 91 | # Assert the function prints exceptions | ||
| 92 | with captured_output() as (out, err): | ||
| 93 | self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,)) | ||
| 94 | self.assertIn("KeyError: 'Invalid number 1'", out.getvalue()) | ||
| 95 | self.assertIn("KeyError: 'Invalid number 2'", out.getvalue()) | ||
