summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-19 20:31:35 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-24 11:52:27 +0100
commit6d66b574090dbfa17f840e45c9920895c4e2784a (patch)
tree2aa6692e9e486c86ac98bd0802eacec008c4e615
parent4c67ffef2e50b6e0be1530ea332e69afa025574c (diff)
downloadpoky-6d66b574090dbfa17f840e45c9920895c4e2784a.tar.gz
utils: Add multiprocess_launch API and testcase
The current methods of spawning processes for parallel execution have issues around collection of results or exceptions. Take the code from package_ipk/deb, make it generic, add a results collection mechanism, fix the exception handling and for it into a standard library function. Also add a test case which tests both the success and failure modes of operation to stop this functionality regressiing again. In particular, compared to multiprocess_exec, this fork off the parent approach means we can pass in the datastore and functions work in the same scope as the parent. This removes some of the complexities found trying to scale multiprocess_exec to wider use. (From OE-Core rev: 88f0c214e593a45566df5131bda4c946f5ccc8c2) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--meta/lib/oe/utils.py70
-rw-r--r--meta/lib/oeqa/selftest/cases/oelib/utils.py46
2 files changed, 115 insertions, 1 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 6aed6dc993..753b577555 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -1,4 +1,6 @@
1import subprocess 1import subprocess
2import multiprocessing
3import traceback
2 4
3def read_file(filename): 5def read_file(filename):
4 try: 6 try:
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
280 282
281 return results 283 return results
282 284
285# For each item in items, call the function 'target' with item as the first
286# argument, extraargs as the other arguments and handle any exceptions in the
287# parent thread
288def multiprocess_launch(target, items, d, extraargs=None):
289
290 class ProcessLaunch(multiprocessing.Process):
291 def __init__(self, *args, **kwargs):
292 multiprocessing.Process.__init__(self, *args, **kwargs)
293 self._pconn, self._cconn = multiprocessing.Pipe()
294 self._exception = None
295 self._result = None
296
297 def run(self):
298 try:
299 ret = self._target(*self._args, **self._kwargs)
300 self._cconn.send((None, ret))
301 except Exception as e:
302 tb = traceback.format_exc()
303 self._cconn.send((e, tb))
304
305 def update(self):
306 if self._pconn.poll():
307 (e, tb) = self._pconn.recv()
308 if e is not None:
309 self._exception = (e, tb)
310 else:
311 self._result = tb
312
313 @property
314 def exception(self):
315 self.update()
316 return self._exception
317
318 @property
319 def result(self):
320 self.update()
321 return self._result
322
323 max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
324 launched = []
325 errors = []
326 results = []
327 items = list(items)
328 while (items and not errors) or launched:
329 if not errors and items and len(launched) < max_process:
330 args = (items.pop(),)
331 if extraargs is not None:
332 args = args + extraargs
333 p = ProcessLaunch(target=target, args=args)
334 p.start()
335 launched.append(p)
336 for q in launched:
337 # The finished processes are joined when calling is_alive()
338 if not q.is_alive():
339 if q.exception:
340 errors.append(q.exception)
341 if q.result:
342 results.append(q.result)
343 launched.remove(q)
344 # Paranoia doesn't hurt
345 for p in launched:
346 p.join()
347 if errors:
348 for (e, tb) in errors:
349 bb.error(str(tb))
350 bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
351 return results
352
283def squashspaces(string): 353def squashspaces(string):
284 import re 354 import re
285 return re.sub("\s+", " ", string).strip() 355 return re.sub("\s+", " ", string).strip()
diff --git a/meta/lib/oeqa/selftest/cases/oelib/utils.py b/meta/lib/oeqa/selftest/cases/oelib/utils.py
index 9fb6c1576e..275aeda74e 100644
--- a/meta/lib/oeqa/selftest/cases/oelib/utils.py
+++ b/meta/lib/oeqa/selftest/cases/oelib/utils.py
@@ -1,5 +1,8 @@
1import sys
1from unittest.case import TestCase 2from unittest.case import TestCase
2from oe.utils import packages_filter_out_system, trim_version 3from contextlib import contextmanager
4from io import StringIO
5from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
3 6
4class TestPackagesFilterOutSystem(TestCase): 7class TestPackagesFilterOutSystem(TestCase):
5 def test_filter(self): 8 def test_filter(self):
@@ -49,3 +52,44 @@ class TestTrimVersion(TestCase):
49 self.assertEqual(trim_version("1.2.3", 2), "1.2") 52 self.assertEqual(trim_version("1.2.3", 2), "1.2")
50 self.assertEqual(trim_version("1.2.3", 3), "1.2.3") 53 self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
51 self.assertEqual(trim_version("1.2.3", 4), "1.2.3") 54 self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
55
56
57class TestMultiprocessLaunch(TestCase):
58
59 def test_multiprocesslaunch(self):
60 import bb
61
62 def testfunction(item, d):
63 if item == "2" or item == "1":
64 raise KeyError("Invalid number %s" % item)
65 return "Found %s" % item
66
67 def dummyerror(msg):
68 print("ERROR: %s" % msg)
69
70 @contextmanager
71 def captured_output():
72 new_out, new_err = StringIO(), StringIO()
73 old_out, old_err = sys.stdout, sys.stderr
74 try:
75 sys.stdout, sys.stderr = new_out, new_err
76 yield sys.stdout, sys.stderr
77 finally:
78 sys.stdout, sys.stderr = old_out, old_err
79
80 d = bb.data_smart.DataSmart()
81 bb.error = dummyerror
82
83 # Assert the function returns the right results
84 result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
85 self.assertIn("Found 3", result)
86 self.assertIn("Found 4", result)
87 self.assertIn("Found 5", result)
88 self.assertIn("Found 6", result)
89 self.assertEqual(len(result), 4)
90
91 # Assert the function prints exceptions
92 with captured_output() as (out, err):
93 self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
94 self.assertIn("KeyError: 'Invalid number 1'", out.getvalue())
95 self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())