diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-02-06 00:28:08 +0000 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-02-06 13:13:01 +0000 |
commit | 1f192a7ade00b3e1dac8af603e9b48d254100fb6 (patch) | |
tree | 981fb6648a323659935bad6d79a01a73f789c3ae /bitbake | |
parent | bc8150d9d4db81006d1f778dedc4e167f5b89a8d (diff) | |
download | poky-1f192a7ade00b3e1dac8af603e9b48d254100fb6.tar.gz |
bitbake: compat/utils: Add copy of python multiprocessing pool for pre 2.7.3 issues
python 2.7 shows hangs with issues in its pool implmenetation. Rather than
try and hack around these, add a copy of the working pool implementation
to the compat module from 2.7.3.
(Bitbake rev: c9eb742637131e8dbd526d2ad9b458abea0a2d87)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/bb/compat.py | 687 | ||||
-rw-r--r-- | bitbake/lib/bb/utils.py | 8 |
2 files changed, 695 insertions, 0 deletions
diff --git a/bitbake/lib/bb/compat.py b/bitbake/lib/bb/compat.py index 1466da2379..ea4e23a008 100644 --- a/bitbake/lib/bb/compat.py +++ b/bitbake/lib/bb/compat.py | |||
@@ -239,3 +239,690 @@ class OrderedDict(dict): | |||
239 | def viewitems(self): | 239 | def viewitems(self): |
240 | "od.viewitems() -> a set-like object providing a view on od's items" | 240 | "od.viewitems() -> a set-like object providing a view on od's items" |
241 | return ItemsView(self) | 241 | return ItemsView(self) |
242 | |||
243 | # Multiprocessing pool code imported from python 2.7.3. Previous versions of | ||
244 | # python have issues in this code which hang pool usage | ||
245 | |||
246 | # | ||
247 | # Module providing the `Pool` class for managing a process pool | ||
248 | # | ||
249 | # multiprocessing/pool.py | ||
250 | # | ||
251 | # Copyright (c) 2006-2008, R Oudkerk | ||
252 | # All rights reserved. | ||
253 | # | ||
254 | # Redistribution and use in source and binary forms, with or without | ||
255 | # modification, are permitted provided that the following conditions | ||
256 | # are met: | ||
257 | # | ||
258 | # 1. Redistributions of source code must retain the above copyright | ||
259 | # notice, this list of conditions and the following disclaimer. | ||
260 | # 2. Redistributions in binary form must reproduce the above copyright | ||
261 | # notice, this list of conditions and the following disclaimer in the | ||
262 | # documentation and/or other materials provided with the distribution. | ||
263 | # 3. Neither the name of author nor the names of any contributors may be | ||
264 | # used to endorse or promote products derived from this software | ||
265 | # without specific prior written permission. | ||
266 | # | ||
267 | # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND | ||
268 | # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
269 | # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | ||
270 | # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE | ||
271 | # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||
272 | # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | ||
273 | # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | ||
274 | # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | ||
275 | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | ||
276 | # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | ||
277 | # SUCH DAMAGE. | ||
278 | # | ||
279 | import threading | ||
280 | import Queue | ||
281 | import itertools | ||
282 | import collections | ||
283 | import time | ||
284 | |||
285 | import multiprocessing | ||
286 | from multiprocessing import Process, cpu_count, TimeoutError, pool | ||
287 | from multiprocessing.util import Finalize, debug | ||
288 | |||
289 | # | ||
290 | # Constants representing the state of a pool | ||
291 | # | ||
292 | |||
293 | RUN = 0 | ||
294 | CLOSE = 1 | ||
295 | TERMINATE = 2 | ||
296 | |||
297 | # | ||
298 | # Miscellaneous | ||
299 | # | ||
300 | |||
301 | def mapstar(args): | ||
302 | return map(*args) | ||
303 | |||
304 | class MaybeEncodingError(Exception): | ||
305 | """Wraps possible unpickleable errors, so they can be | ||
306 | safely sent through the socket.""" | ||
307 | |||
308 | def __init__(self, exc, value): | ||
309 | self.exc = repr(exc) | ||
310 | self.value = repr(value) | ||
311 | super(MaybeEncodingError, self).__init__(self.exc, self.value) | ||
312 | |||
313 | def __str__(self): | ||
314 | return "Error sending result: '%s'. Reason: '%s'" % (self.value, | ||
315 | self.exc) | ||
316 | |||
317 | def __repr__(self): | ||
318 | return "<MaybeEncodingError: %s>" % str(self) | ||
319 | |||
320 | def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): | ||
321 | assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) | ||
322 | put = outqueue.put | ||
323 | get = inqueue.get | ||
324 | if hasattr(inqueue, '_writer'): | ||
325 | inqueue._writer.close() | ||
326 | outqueue._reader.close() | ||
327 | |||
328 | if initializer is not None: | ||
329 | initializer(*initargs) | ||
330 | |||
331 | completed = 0 | ||
332 | while maxtasks is None or (maxtasks and completed < maxtasks): | ||
333 | try: | ||
334 | task = get() | ||
335 | except (EOFError, IOError): | ||
336 | debug('worker got EOFError or IOError -- exiting') | ||
337 | break | ||
338 | |||
339 | if task is None: | ||
340 | debug('worker got sentinel -- exiting') | ||
341 | break | ||
342 | |||
343 | job, i, func, args, kwds = task | ||
344 | try: | ||
345 | result = (True, func(*args, **kwds)) | ||
346 | except Exception, e: | ||
347 | result = (False, e) | ||
348 | try: | ||
349 | put((job, i, result)) | ||
350 | except Exception as e: | ||
351 | wrapped = MaybeEncodingError(e, result[1]) | ||
352 | debug("Possible encoding error while sending result: %s" % ( | ||
353 | wrapped)) | ||
354 | put((job, i, (False, wrapped))) | ||
355 | completed += 1 | ||
356 | debug('worker exiting after %d tasks' % completed) | ||
357 | |||
358 | |||
359 | class Pool(object): | ||
360 | ''' | ||
361 | Class which supports an async version of the `apply()` builtin | ||
362 | ''' | ||
363 | Process = Process | ||
364 | |||
365 | def __init__(self, processes=None, initializer=None, initargs=(), | ||
366 | maxtasksperchild=None): | ||
367 | self._setup_queues() | ||
368 | self._taskqueue = Queue.Queue() | ||
369 | self._cache = {} | ||
370 | self._state = RUN | ||
371 | self._maxtasksperchild = maxtasksperchild | ||
372 | self._initializer = initializer | ||
373 | self._initargs = initargs | ||
374 | |||
375 | if processes is None: | ||
376 | try: | ||
377 | processes = cpu_count() | ||
378 | except NotImplementedError: | ||
379 | processes = 1 | ||
380 | if processes < 1: | ||
381 | raise ValueError("Number of processes must be at least 1") | ||
382 | |||
383 | if initializer is not None and not hasattr(initializer, '__call__'): | ||
384 | raise TypeError('initializer must be a callable') | ||
385 | |||
386 | self._processes = processes | ||
387 | self._pool = [] | ||
388 | self._repopulate_pool() | ||
389 | |||
390 | self._worker_handler = threading.Thread( | ||
391 | target=Pool._handle_workers, | ||
392 | args=(self, ) | ||
393 | ) | ||
394 | self._worker_handler.daemon = True | ||
395 | self._worker_handler._state = RUN | ||
396 | self._worker_handler.start() | ||
397 | |||
398 | |||
399 | self._task_handler = threading.Thread( | ||
400 | target=Pool._handle_tasks, | ||
401 | args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) | ||
402 | ) | ||
403 | self._task_handler.daemon = True | ||
404 | self._task_handler._state = RUN | ||
405 | self._task_handler.start() | ||
406 | |||
407 | self._result_handler = threading.Thread( | ||
408 | target=Pool._handle_results, | ||
409 | args=(self._outqueue, self._quick_get, self._cache) | ||
410 | ) | ||
411 | self._result_handler.daemon = True | ||
412 | self._result_handler._state = RUN | ||
413 | self._result_handler.start() | ||
414 | |||
415 | self._terminate = Finalize( | ||
416 | self, self._terminate_pool, | ||
417 | args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, | ||
418 | self._worker_handler, self._task_handler, | ||
419 | self._result_handler, self._cache), | ||
420 | exitpriority=15 | ||
421 | ) | ||
422 | |||
423 | def _join_exited_workers(self): | ||
424 | """Cleanup after any worker processes which have exited due to reaching | ||
425 | their specified lifetime. Returns True if any workers were cleaned up. | ||
426 | """ | ||
427 | cleaned = False | ||
428 | for i in reversed(range(len(self._pool))): | ||
429 | worker = self._pool[i] | ||
430 | if worker.exitcode is not None: | ||
431 | # worker exited | ||
432 | debug('cleaning up worker %d' % i) | ||
433 | worker.join() | ||
434 | cleaned = True | ||
435 | del self._pool[i] | ||
436 | return cleaned | ||
437 | |||
438 | def _repopulate_pool(self): | ||
439 | """Bring the number of pool processes up to the specified number, | ||
440 | for use after reaping workers which have exited. | ||
441 | """ | ||
442 | for i in range(self._processes - len(self._pool)): | ||
443 | w = self.Process(target=worker, | ||
444 | args=(self._inqueue, self._outqueue, | ||
445 | self._initializer, | ||
446 | self._initargs, self._maxtasksperchild) | ||
447 | ) | ||
448 | self._pool.append(w) | ||
449 | w.name = w.name.replace('Process', 'PoolWorker') | ||
450 | w.daemon = True | ||
451 | w.start() | ||
452 | debug('added worker') | ||
453 | |||
454 | def _maintain_pool(self): | ||
455 | """Clean up any exited workers and start replacements for them. | ||
456 | """ | ||
457 | if self._join_exited_workers(): | ||
458 | self._repopulate_pool() | ||
459 | |||
460 | def _setup_queues(self): | ||
461 | from multiprocessing.queues import SimpleQueue | ||
462 | self._inqueue = SimpleQueue() | ||
463 | self._outqueue = SimpleQueue() | ||
464 | self._quick_put = self._inqueue._writer.send | ||
465 | self._quick_get = self._outqueue._reader.recv | ||
466 | |||
467 | def apply(self, func, args=(), kwds={}): | ||
468 | ''' | ||
469 | Equivalent of `apply()` builtin | ||
470 | ''' | ||
471 | assert self._state == RUN | ||
472 | return self.apply_async(func, args, kwds).get() | ||
473 | |||
474 | def map(self, func, iterable, chunksize=None): | ||
475 | ''' | ||
476 | Equivalent of `map()` builtin | ||
477 | ''' | ||
478 | assert self._state == RUN | ||
479 | return self.map_async(func, iterable, chunksize).get() | ||
480 | |||
481 | def imap(self, func, iterable, chunksize=1): | ||
482 | ''' | ||
483 | Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` | ||
484 | ''' | ||
485 | assert self._state == RUN | ||
486 | if chunksize == 1: | ||
487 | result = IMapIterator(self._cache) | ||
488 | self._taskqueue.put((((result._job, i, func, (x,), {}) | ||
489 | for i, x in enumerate(iterable)), result._set_length)) | ||
490 | return result | ||
491 | else: | ||
492 | assert chunksize > 1 | ||
493 | task_batches = Pool._get_tasks(func, iterable, chunksize) | ||
494 | result = IMapIterator(self._cache) | ||
495 | self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | ||
496 | for i, x in enumerate(task_batches)), result._set_length)) | ||
497 | return (item for chunk in result for item in chunk) | ||
498 | |||
499 | def imap_unordered(self, func, iterable, chunksize=1): | ||
500 | ''' | ||
501 | Like `imap()` method but ordering of results is arbitrary | ||
502 | ''' | ||
503 | assert self._state == RUN | ||
504 | if chunksize == 1: | ||
505 | result = IMapUnorderedIterator(self._cache) | ||
506 | self._taskqueue.put((((result._job, i, func, (x,), {}) | ||
507 | for i, x in enumerate(iterable)), result._set_length)) | ||
508 | return result | ||
509 | else: | ||
510 | assert chunksize > 1 | ||
511 | task_batches = Pool._get_tasks(func, iterable, chunksize) | ||
512 | result = IMapUnorderedIterator(self._cache) | ||
513 | self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | ||
514 | for i, x in enumerate(task_batches)), result._set_length)) | ||
515 | return (item for chunk in result for item in chunk) | ||
516 | |||
517 | def apply_async(self, func, args=(), kwds={}, callback=None): | ||
518 | ''' | ||
519 | Asynchronous equivalent of `apply()` builtin | ||
520 | ''' | ||
521 | assert self._state == RUN | ||
522 | result = ApplyResult(self._cache, callback) | ||
523 | self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) | ||
524 | return result | ||
525 | |||
526 | def map_async(self, func, iterable, chunksize=None, callback=None): | ||
527 | ''' | ||
528 | Asynchronous equivalent of `map()` builtin | ||
529 | ''' | ||
530 | assert self._state == RUN | ||
531 | if not hasattr(iterable, '__len__'): | ||
532 | iterable = list(iterable) | ||
533 | |||
534 | if chunksize is None: | ||
535 | chunksize, extra = divmod(len(iterable), len(self._pool) * 4) | ||
536 | if extra: | ||
537 | chunksize += 1 | ||
538 | if len(iterable) == 0: | ||
539 | chunksize = 0 | ||
540 | |||
541 | task_batches = Pool._get_tasks(func, iterable, chunksize) | ||
542 | result = MapResult(self._cache, chunksize, len(iterable), callback) | ||
543 | self._taskqueue.put((((result._job, i, mapstar, (x,), {}) | ||
544 | for i, x in enumerate(task_batches)), None)) | ||
545 | return result | ||
546 | |||
547 | @staticmethod | ||
548 | def _handle_workers(pool): | ||
549 | thread = threading.current_thread() | ||
550 | |||
551 | # Keep maintaining workers until the cache gets drained, unless the pool | ||
552 | # is terminated. | ||
553 | while thread._state == RUN or (pool._cache and thread._state != TERMINATE): | ||
554 | pool._maintain_pool() | ||
555 | time.sleep(0.1) | ||
556 | # send sentinel to stop workers | ||
557 | pool._taskqueue.put(None) | ||
558 | debug('worker handler exiting') | ||
559 | |||
560 | @staticmethod | ||
561 | def _handle_tasks(taskqueue, put, outqueue, pool): | ||
562 | thread = threading.current_thread() | ||
563 | |||
564 | for taskseq, set_length in iter(taskqueue.get, None): | ||
565 | i = -1 | ||
566 | for i, task in enumerate(taskseq): | ||
567 | if thread._state: | ||
568 | debug('task handler found thread._state != RUN') | ||
569 | break | ||
570 | try: | ||
571 | put(task) | ||
572 | except IOError: | ||
573 | debug('could not put task on queue') | ||
574 | break | ||
575 | else: | ||
576 | if set_length: | ||
577 | debug('doing set_length()') | ||
578 | set_length(i+1) | ||
579 | continue | ||
580 | break | ||
581 | else: | ||
582 | debug('task handler got sentinel') | ||
583 | |||
584 | |||
585 | try: | ||
586 | # tell result handler to finish when cache is empty | ||
587 | debug('task handler sending sentinel to result handler') | ||
588 | outqueue.put(None) | ||
589 | |||
590 | # tell workers there is no more work | ||
591 | debug('task handler sending sentinel to workers') | ||
592 | for p in pool: | ||
593 | put(None) | ||
594 | except IOError: | ||
595 | debug('task handler got IOError when sending sentinels') | ||
596 | |||
597 | debug('task handler exiting') | ||
598 | |||
599 | @staticmethod | ||
600 | def _handle_results(outqueue, get, cache): | ||
601 | thread = threading.current_thread() | ||
602 | |||
603 | while 1: | ||
604 | try: | ||
605 | task = get() | ||
606 | except (IOError, EOFError): | ||
607 | debug('result handler got EOFError/IOError -- exiting') | ||
608 | return | ||
609 | |||
610 | if thread._state: | ||
611 | assert thread._state == TERMINATE | ||
612 | debug('result handler found thread._state=TERMINATE') | ||
613 | break | ||
614 | |||
615 | if task is None: | ||
616 | debug('result handler got sentinel') | ||
617 | break | ||
618 | |||
619 | job, i, obj = task | ||
620 | try: | ||
621 | cache[job]._set(i, obj) | ||
622 | except KeyError: | ||
623 | pass | ||
624 | |||
625 | while cache and thread._state != TERMINATE: | ||
626 | try: | ||
627 | task = get() | ||
628 | except (IOError, EOFError): | ||
629 | debug('result handler got EOFError/IOError -- exiting') | ||
630 | return | ||
631 | |||
632 | if task is None: | ||
633 | debug('result handler ignoring extra sentinel') | ||
634 | continue | ||
635 | job, i, obj = task | ||
636 | try: | ||
637 | cache[job]._set(i, obj) | ||
638 | except KeyError: | ||
639 | pass | ||
640 | |||
641 | if hasattr(outqueue, '_reader'): | ||
642 | debug('ensuring that outqueue is not full') | ||
643 | # If we don't make room available in outqueue then | ||
644 | # attempts to add the sentinel (None) to outqueue may | ||
645 | # block. There is guaranteed to be no more than 2 sentinels. | ||
646 | try: | ||
647 | for i in range(10): | ||
648 | if not outqueue._reader.poll(): | ||
649 | break | ||
650 | get() | ||
651 | except (IOError, EOFError): | ||
652 | pass | ||
653 | |||
654 | debug('result handler exiting: len(cache)=%s, thread._state=%s', | ||
655 | len(cache), thread._state) | ||
656 | |||
657 | @staticmethod | ||
658 | def _get_tasks(func, it, size): | ||
659 | it = iter(it) | ||
660 | while 1: | ||
661 | x = tuple(itertools.islice(it, size)) | ||
662 | if not x: | ||
663 | return | ||
664 | yield (func, x) | ||
665 | |||
666 | def __reduce__(self): | ||
667 | raise NotImplementedError( | ||
668 | 'pool objects cannot be passed between processes or pickled' | ||
669 | ) | ||
670 | |||
671 | def close(self): | ||
672 | debug('closing pool') | ||
673 | if self._state == RUN: | ||
674 | self._state = CLOSE | ||
675 | self._worker_handler._state = CLOSE | ||
676 | |||
677 | def terminate(self): | ||
678 | debug('terminating pool') | ||
679 | self._state = TERMINATE | ||
680 | self._worker_handler._state = TERMINATE | ||
681 | self._terminate() | ||
682 | |||
683 | def join(self): | ||
684 | debug('joining pool') | ||
685 | assert self._state in (CLOSE, TERMINATE) | ||
686 | self._worker_handler.join() | ||
687 | self._task_handler.join() | ||
688 | self._result_handler.join() | ||
689 | for p in self._pool: | ||
690 | p.join() | ||
691 | |||
692 | @staticmethod | ||
693 | def _help_stuff_finish(inqueue, task_handler, size): | ||
694 | # task_handler may be blocked trying to put items on inqueue | ||
695 | debug('removing tasks from inqueue until task handler finished') | ||
696 | inqueue._rlock.acquire() | ||
697 | while task_handler.is_alive() and inqueue._reader.poll(): | ||
698 | inqueue._reader.recv() | ||
699 | time.sleep(0) | ||
700 | |||
701 | @classmethod | ||
702 | def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | ||
703 | worker_handler, task_handler, result_handler, cache): | ||
704 | # this is guaranteed to only be called once | ||
705 | debug('finalizing pool') | ||
706 | |||
707 | worker_handler._state = TERMINATE | ||
708 | task_handler._state = TERMINATE | ||
709 | |||
710 | debug('helping task handler/workers to finish') | ||
711 | cls._help_stuff_finish(inqueue, task_handler, len(pool)) | ||
712 | |||
713 | assert result_handler.is_alive() or len(cache) == 0 | ||
714 | |||
715 | result_handler._state = TERMINATE | ||
716 | outqueue.put(None) # sentinel | ||
717 | |||
718 | # We must wait for the worker handler to exit before terminating | ||
719 | # workers because we don't want workers to be restarted behind our back. | ||
720 | debug('joining worker handler') | ||
721 | if threading.current_thread() is not worker_handler: | ||
722 | worker_handler.join(1e100) | ||
723 | |||
724 | # Terminate workers which haven't already finished. | ||
725 | if pool and hasattr(pool[0], 'terminate'): | ||
726 | debug('terminating workers') | ||
727 | for p in pool: | ||
728 | if p.exitcode is None: | ||
729 | p.terminate() | ||
730 | |||
731 | debug('joining task handler') | ||
732 | if threading.current_thread() is not task_handler: | ||
733 | task_handler.join(1e100) | ||
734 | |||
735 | debug('joining result handler') | ||
736 | if threading.current_thread() is not result_handler: | ||
737 | result_handler.join(1e100) | ||
738 | |||
739 | if pool and hasattr(pool[0], 'terminate'): | ||
740 | debug('joining pool workers') | ||
741 | for p in pool: | ||
742 | if p.is_alive(): | ||
743 | # worker has not yet exited | ||
744 | debug('cleaning up worker %d' % p.pid) | ||
745 | p.join() | ||
746 | |||
747 | class ApplyResult(object): | ||
748 | |||
749 | def __init__(self, cache, callback): | ||
750 | self._cond = threading.Condition(threading.Lock()) | ||
751 | self._job = multiprocessing.pool.job_counter.next() | ||
752 | self._cache = cache | ||
753 | self._ready = False | ||
754 | self._callback = callback | ||
755 | cache[self._job] = self | ||
756 | |||
757 | def ready(self): | ||
758 | return self._ready | ||
759 | |||
760 | def successful(self): | ||
761 | assert self._ready | ||
762 | return self._success | ||
763 | |||
764 | def wait(self, timeout=None): | ||
765 | self._cond.acquire() | ||
766 | try: | ||
767 | if not self._ready: | ||
768 | self._cond.wait(timeout) | ||
769 | finally: | ||
770 | self._cond.release() | ||
771 | |||
772 | def get(self, timeout=None): | ||
773 | self.wait(timeout) | ||
774 | if not self._ready: | ||
775 | raise TimeoutError | ||
776 | if self._success: | ||
777 | return self._value | ||
778 | else: | ||
779 | raise self._value | ||
780 | |||
781 | def _set(self, i, obj): | ||
782 | self._success, self._value = obj | ||
783 | if self._callback and self._success: | ||
784 | self._callback(self._value) | ||
785 | self._cond.acquire() | ||
786 | try: | ||
787 | self._ready = True | ||
788 | self._cond.notify() | ||
789 | finally: | ||
790 | self._cond.release() | ||
791 | del self._cache[self._job] | ||
792 | |||
793 | # | ||
794 | # Class whose instances are returned by `Pool.map_async()` | ||
795 | # | ||
796 | |||
797 | class MapResult(ApplyResult): | ||
798 | |||
799 | def __init__(self, cache, chunksize, length, callback): | ||
800 | ApplyResult.__init__(self, cache, callback) | ||
801 | self._success = True | ||
802 | self._value = [None] * length | ||
803 | self._chunksize = chunksize | ||
804 | if chunksize <= 0: | ||
805 | self._number_left = 0 | ||
806 | self._ready = True | ||
807 | del cache[self._job] | ||
808 | else: | ||
809 | self._number_left = length//chunksize + bool(length % chunksize) | ||
810 | |||
811 | def _set(self, i, success_result): | ||
812 | success, result = success_result | ||
813 | if success: | ||
814 | self._value[i*self._chunksize:(i+1)*self._chunksize] = result | ||
815 | self._number_left -= 1 | ||
816 | if self._number_left == 0: | ||
817 | if self._callback: | ||
818 | self._callback(self._value) | ||
819 | del self._cache[self._job] | ||
820 | self._cond.acquire() | ||
821 | try: | ||
822 | self._ready = True | ||
823 | self._cond.notify() | ||
824 | finally: | ||
825 | self._cond.release() | ||
826 | |||
827 | else: | ||
828 | self._success = False | ||
829 | self._value = result | ||
830 | del self._cache[self._job] | ||
831 | self._cond.acquire() | ||
832 | try: | ||
833 | self._ready = True | ||
834 | self._cond.notify() | ||
835 | finally: | ||
836 | self._cond.release() | ||
837 | |||
838 | # | ||
839 | # Class whose instances are returned by `Pool.imap()` | ||
840 | # | ||
841 | |||
842 | class IMapIterator(object): | ||
843 | |||
844 | def __init__(self, cache): | ||
845 | self._cond = threading.Condition(threading.Lock()) | ||
846 | self._job = multiprocessing.pool.job_counter.next() | ||
847 | self._cache = cache | ||
848 | self._items = collections.deque() | ||
849 | self._index = 0 | ||
850 | self._length = None | ||
851 | self._unsorted = {} | ||
852 | cache[self._job] = self | ||
853 | |||
854 | def __iter__(self): | ||
855 | return self | ||
856 | |||
857 | def next(self, timeout=None): | ||
858 | self._cond.acquire() | ||
859 | try: | ||
860 | try: | ||
861 | item = self._items.popleft() | ||
862 | except IndexError: | ||
863 | if self._index == self._length: | ||
864 | raise StopIteration | ||
865 | self._cond.wait(timeout) | ||
866 | try: | ||
867 | item = self._items.popleft() | ||
868 | except IndexError: | ||
869 | if self._index == self._length: | ||
870 | raise StopIteration | ||
871 | raise TimeoutError | ||
872 | finally: | ||
873 | self._cond.release() | ||
874 | |||
875 | success, value = item | ||
876 | if success: | ||
877 | return value | ||
878 | raise value | ||
879 | |||
880 | __next__ = next # XXX | ||
881 | |||
882 | def _set(self, i, obj): | ||
883 | self._cond.acquire() | ||
884 | try: | ||
885 | if self._index == i: | ||
886 | self._items.append(obj) | ||
887 | self._index += 1 | ||
888 | while self._index in self._unsorted: | ||
889 | obj = self._unsorted.pop(self._index) | ||
890 | self._items.append(obj) | ||
891 | self._index += 1 | ||
892 | self._cond.notify() | ||
893 | else: | ||
894 | self._unsorted[i] = obj | ||
895 | |||
896 | if self._index == self._length: | ||
897 | del self._cache[self._job] | ||
898 | finally: | ||
899 | self._cond.release() | ||
900 | |||
901 | def _set_length(self, length): | ||
902 | self._cond.acquire() | ||
903 | try: | ||
904 | self._length = length | ||
905 | if self._index == self._length: | ||
906 | self._cond.notify() | ||
907 | del self._cache[self._job] | ||
908 | finally: | ||
909 | self._cond.release() | ||
910 | |||
911 | # | ||
912 | # Class whose instances are returned by `Pool.imap_unordered()` | ||
913 | # | ||
914 | |||
915 | class IMapUnorderedIterator(IMapIterator): | ||
916 | |||
917 | def _set(self, i, obj): | ||
918 | self._cond.acquire() | ||
919 | try: | ||
920 | self._items.append(obj) | ||
921 | self._index += 1 | ||
922 | self._cond.notify() | ||
923 | if self._index == self._length: | ||
924 | del self._cache[self._job] | ||
925 | finally: | ||
926 | self._cond.release() | ||
927 | |||
928 | |||
diff --git a/bitbake/lib/bb/utils.py b/bitbake/lib/bb/utils.py index 7e81df5855..83159a6465 100644 --- a/bitbake/lib/bb/utils.py +++ b/bitbake/lib/bb/utils.py | |||
@@ -839,4 +839,12 @@ def process_profilelog(fn): | |||
839 | pout.flush() | 839 | pout.flush() |
840 | pout.close() | 840 | pout.close() |
841 | 841 | ||
842 | # | ||
843 | # Work around multiprocessing pool bugs in python < 2.7.3 | ||
844 | # | ||
845 | def multiprocessingpool(*args, **kwargs): | ||
846 | if sys.version_info < (2, 7, 3): | ||
847 | return bb.compat.Pool(*args, **kwargs) | ||
848 | else: | ||
849 | return multiprocessing.pool.Pool(*args, **kwargs) | ||
842 | 850 | ||