summaryrefslogtreecommitdiffstats
path: root/meta/lib/oe/utils.py
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-19 20:31:35 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2018-07-24 11:52:27 +0100
commit6d66b574090dbfa17f840e45c9920895c4e2784a (patch)
tree2aa6692e9e486c86ac98bd0802eacec008c4e615 /meta/lib/oe/utils.py
parent4c67ffef2e50b6e0be1530ea332e69afa025574c (diff)
downloadpoky-6d66b574090dbfa17f840e45c9920895c4e2784a.tar.gz
utils: Add multiprocess_launch API and testcase
The current methods of spawning processes for parallel execution have issues around collection of results or exceptions. Take the code from package_ipk/deb, make it generic, add a results collection mechanism, fix the exception handling and for it into a standard library function. Also add a test case which tests both the success and failure modes of operation to stop this functionality regressiing again. In particular, compared to multiprocess_exec, this fork off the parent approach means we can pass in the datastore and functions work in the same scope as the parent. This removes some of the complexities found trying to scale multiprocess_exec to wider use. (From OE-Core rev: 88f0c214e593a45566df5131bda4c946f5ccc8c2) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta/lib/oe/utils.py')
-rw-r--r--meta/lib/oe/utils.py70
1 files changed, 70 insertions, 0 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 6aed6dc993..753b577555 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -1,4 +1,6 @@
1import subprocess 1import subprocess
2import multiprocessing
3import traceback
2 4
3def read_file(filename): 5def read_file(filename):
4 try: 6 try:
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function):
280 282
281 return results 283 return results
282 284
285# For each item in items, call the function 'target' with item as the first
286# argument, extraargs as the other arguments and handle any exceptions in the
287# parent thread
288def multiprocess_launch(target, items, d, extraargs=None):
289
290 class ProcessLaunch(multiprocessing.Process):
291 def __init__(self, *args, **kwargs):
292 multiprocessing.Process.__init__(self, *args, **kwargs)
293 self._pconn, self._cconn = multiprocessing.Pipe()
294 self._exception = None
295 self._result = None
296
297 def run(self):
298 try:
299 ret = self._target(*self._args, **self._kwargs)
300 self._cconn.send((None, ret))
301 except Exception as e:
302 tb = traceback.format_exc()
303 self._cconn.send((e, tb))
304
305 def update(self):
306 if self._pconn.poll():
307 (e, tb) = self._pconn.recv()
308 if e is not None:
309 self._exception = (e, tb)
310 else:
311 self._result = tb
312
313 @property
314 def exception(self):
315 self.update()
316 return self._exception
317
318 @property
319 def result(self):
320 self.update()
321 return self._result
322
323 max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1)
324 launched = []
325 errors = []
326 results = []
327 items = list(items)
328 while (items and not errors) or launched:
329 if not errors and items and len(launched) < max_process:
330 args = (items.pop(),)
331 if extraargs is not None:
332 args = args + extraargs
333 p = ProcessLaunch(target=target, args=args)
334 p.start()
335 launched.append(p)
336 for q in launched:
337 # The finished processes are joined when calling is_alive()
338 if not q.is_alive():
339 if q.exception:
340 errors.append(q.exception)
341 if q.result:
342 results.append(q.result)
343 launched.remove(q)
344 # Paranoia doesn't hurt
345 for p in launched:
346 p.join()
347 if errors:
348 for (e, tb) in errors:
349 bb.error(str(tb))
350 bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above")
351 return results
352
283def squashspaces(string): 353def squashspaces(string):
284 import re 354 import re
285 return re.sub("\s+", " ", string).strip() 355 return re.sub("\s+", " ", string).strip()