diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2018-07-19 20:31:35 +0000 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2018-07-24 11:52:27 +0100 |
commit | 6d66b574090dbfa17f840e45c9920895c4e2784a (patch) | |
tree | 2aa6692e9e486c86ac98bd0802eacec008c4e615 /meta/lib/oe/utils.py | |
parent | 4c67ffef2e50b6e0be1530ea332e69afa025574c (diff) | |
download | poky-6d66b574090dbfa17f840e45c9920895c4e2784a.tar.gz |
utils: Add multiprocess_launch API and testcase
The current methods of spawning processes for parallel execution have
issues around collection of results or exceptions.
Take the code from package_ipk/deb, make it generic, add a results
collection mechanism, fix the exception handling and for it into a
standard library function.
Also add a test case which tests both the success and failure modes
of operation to stop this functionality regressiing again.
In particular, compared to multiprocess_exec, this fork off the parent
approach means we can pass in the datastore and functions work in the
same scope as the parent. This removes some of the complexities
found trying to scale multiprocess_exec to wider use.
(From OE-Core rev: 88f0c214e593a45566df5131bda4c946f5ccc8c2)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta/lib/oe/utils.py')
-rw-r--r-- | meta/lib/oe/utils.py | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 6aed6dc993..753b577555 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py | |||
@@ -1,4 +1,6 @@ | |||
1 | import subprocess | 1 | import subprocess |
2 | import multiprocessing | ||
3 | import traceback | ||
2 | 4 | ||
3 | def read_file(filename): | 5 | def read_file(filename): |
4 | try: | 6 | try: |
@@ -280,6 +282,74 @@ def multiprocess_exec(commands, function): | |||
280 | 282 | ||
281 | return results | 283 | return results |
282 | 284 | ||
285 | # For each item in items, call the function 'target' with item as the first | ||
286 | # argument, extraargs as the other arguments and handle any exceptions in the | ||
287 | # parent thread | ||
288 | def multiprocess_launch(target, items, d, extraargs=None): | ||
289 | |||
290 | class ProcessLaunch(multiprocessing.Process): | ||
291 | def __init__(self, *args, **kwargs): | ||
292 | multiprocessing.Process.__init__(self, *args, **kwargs) | ||
293 | self._pconn, self._cconn = multiprocessing.Pipe() | ||
294 | self._exception = None | ||
295 | self._result = None | ||
296 | |||
297 | def run(self): | ||
298 | try: | ||
299 | ret = self._target(*self._args, **self._kwargs) | ||
300 | self._cconn.send((None, ret)) | ||
301 | except Exception as e: | ||
302 | tb = traceback.format_exc() | ||
303 | self._cconn.send((e, tb)) | ||
304 | |||
305 | def update(self): | ||
306 | if self._pconn.poll(): | ||
307 | (e, tb) = self._pconn.recv() | ||
308 | if e is not None: | ||
309 | self._exception = (e, tb) | ||
310 | else: | ||
311 | self._result = tb | ||
312 | |||
313 | @property | ||
314 | def exception(self): | ||
315 | self.update() | ||
316 | return self._exception | ||
317 | |||
318 | @property | ||
319 | def result(self): | ||
320 | self.update() | ||
321 | return self._result | ||
322 | |||
323 | max_process = int(d.getVar("BB_NUMBER_THREADS") or os.cpu_count() or 1) | ||
324 | launched = [] | ||
325 | errors = [] | ||
326 | results = [] | ||
327 | items = list(items) | ||
328 | while (items and not errors) or launched: | ||
329 | if not errors and items and len(launched) < max_process: | ||
330 | args = (items.pop(),) | ||
331 | if extraargs is not None: | ||
332 | args = args + extraargs | ||
333 | p = ProcessLaunch(target=target, args=args) | ||
334 | p.start() | ||
335 | launched.append(p) | ||
336 | for q in launched: | ||
337 | # The finished processes are joined when calling is_alive() | ||
338 | if not q.is_alive(): | ||
339 | if q.exception: | ||
340 | errors.append(q.exception) | ||
341 | if q.result: | ||
342 | results.append(q.result) | ||
343 | launched.remove(q) | ||
344 | # Paranoia doesn't hurt | ||
345 | for p in launched: | ||
346 | p.join() | ||
347 | if errors: | ||
348 | for (e, tb) in errors: | ||
349 | bb.error(str(tb)) | ||
350 | bb.fatal("Fatal errors occurred in subprocesses, tracebacks printed above") | ||
351 | return results | ||
352 | |||
283 | def squashspaces(string): | 353 | def squashspaces(string): |
284 | import re | 354 | import re |
285 | return re.sub("\s+", " ", string).strip() | 355 | return re.sub("\s+", " ", string).strip() |