summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--meta/lib/oe/utils.py70
-rw-r--r--meta/lib/oeqa/selftest/cases/oelib/utils.py46
2 files changed, 115 insertions, 1 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 6aed6dc993..753b577555 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -1,4 +1,6 @@
1import subprocess 1import subprocess
2import multiprocessing
3import traceback
2 4
3def read_file(filename): 5def read_file(filename):
4 try: 6 try:
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
280 282
281 return results 283 return results
282 284
285# For each item in items, call the function 'target' with item as the first
286# argument, extraargs as the other arguments and handle any exceptions in the
287# parent thread
288def multiprocess_launch(target, items, d, extraargs=None):
289
290 class ProcessLaunch(multiprocessing.Process):
291 def __init__(self, *args, **kwargs):
292 multiprocessing.Process.__init__(self, *args, **kwargs)
293 self._pconn, self._cconn = multiprocessing.Pipe()
294 self._exception = None
295 self._result = None
296
297 def run(self):
298 try:
299 ret = self._target(*self._args, **self._kwargs)
300 self._cconn.send((None, ret))
301 except Exception as e:
302 tb = traceback.format_exc()
303 self._cconn.send((e, tb))
304
305 def update(self):
306 if self._pconn.poll():
307 (e, tb) = self._pconn.recv()
308 if e is not None:
309 self._exception = (e, tb)
310 else:
311 self._result = tb
312
313 @property
314 def exception(self):
315 self.update()
316 return self._exception
317
318 @property
319 def result(self):
320 self.update()
321 return self._result
322
323 max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
324 launched = []
325 errors = []
326 results = []
327 items = list(items)
328 while (items and not errors) or launched:
329 if not errors and items and len(launched) < max_process:
330 args = (items.pop(),)
331 if extraargs is not None:
332 args = args + extraargs
333 p = ProcessLaunch(target=target, args=args)
334 p.start()
335 launched.append(p)
336 for q in launched:
337 # The finished processes are joined when calling is_alive()
338 if not q.is_alive():
339 if q.exception:
340 errors.append(q.exception)
341 if q.result:
342 results.append(q.result)
343 launched.remove(q)
344 # Paranoia doesn't hurt
345 for p in launched:
346 p.join()
347 if errors:
348 for (e, tb) in errors:
349 bb.error(str(tb))
350 bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
351 return results
352
283def squashspaces(string): 353def squashspaces(string):
284 import re 354 import re
285 return re.sub("\s+", " ", string).strip() 355 return re.sub("\s+", " ", string).strip()
diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py
index 9fb6c1576e..275aeda74e 100644
--- a/meta/lib/oeqa/selftest/cases/oelib/utils.py
+++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py
@@ -1,5 +1,8 @@
1import sys
1from unittest.case import TestCase 2from unittest.case import TestCase
2from oe.utils import packages_filter_out_system, trim_version 3from contextlib import contextmanager
4from io import StringIO
5from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
3 6
4class TestPackagesFilterOutSystem(TestCase): 7class TestPackagesFilterOutSystem(TestCase):
5 def test_filter(self): 8 def test_filter(self):
@@ -49,3 +52,44 @@ class TestTrimVersion(TestCase):
49 self.assertEqual(trim_version("1.2.3", 2), "1.2") 52 self.assertEqual(trim_version("1.2.3", 2), "1.2")
50 self.assertEqual(trim_version("1.2.3", 3), "1.2.3") 53 self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
51 self.assertEqual(trim_version("1.2.3", 4), "1.2.3") 54 self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
55
56
57class TestMultiprocessLaunch(TestCase):
58
59 def test_multiprocesslaunch(self):
60 import bb
61
62 def testfunction(item, d):
63 if item == "2" or item == "1":
64 raise KeyError("Invalid number %s" % item)
65 return "Found %s" % item
66
67 def dummyerror(msg):
68 print("ERROR: %s" % msg)
69
70 @contextmanager
71 def captured_output():
72 new_out, new_err = StringIO(), StringIO()
73 old_out, old_err = sys.stdout, sys.stderr
74 try:
75 sys.stdout, sys.stderr = new_out, new_err
76 yield sys.stdout, sys.stderr
77 finally:
78 sys.stdout, sys.stderr = old_out, old_err
79
80 d = bb.data_smart.DataSmart()
81 bb.error = dummyerror
82
83 # Assert the function returns the right results
84 result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
85 self.assertIn("Found 3", result)
86 self.assertIn("Found 4", result)
87 self.assertIn("Found 5", result)
88 self.assertIn("Found 6", result)
89 self.assertEqual(len(result), 4)
90
91 # Assert the function prints exceptions
92 with captured_output() as (out, err):
93 self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
94 self.assertIn("KeyError: 'Invalid number 1'", out.getvalue())
95 self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())