summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-02-06 00:28:08 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-02-06 13:13:01 +0000
commit1f192a7ade00b3e1dac8af603e9b48d254100fb6 (patch)
tree981fb6648a323659935bad6d79a01a73f789c3ae /bitbake
parentbc8150d9d4db81006d1f778dedc4e167f5b89a8d (diff)
downloadpoky-1f192a7ade00b3e1dac8af603e9b48d254100fb6.tar.gz
bitbake: compat/utils: Add copy of python multiprocessing pool for pre 2.7.3 issues
python 2.7 shows hangs with issues in its pool implmenetation. Rather than try and hack around these, add a copy of the working pool implementation to the compat module from 2.7.3. (Bitbake rev: c9eb742637131e8dbd526d2ad9b458abea0a2d87) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/bb/compat.py687
-rw-r--r--bitbake/lib/bb/utils.py8
2 files changed, 695 insertions, 0 deletions
diff --git a/bitbake/lib/bb/compat.py b/bitbake/lib/bb/compat.py
index 1466da2379..ea4e23a008 100644
--- a/bitbake/lib/bb/compat.py
+++ b/bitbake/lib/bb/compat.py
@@ -239,3 +239,690 @@ class OrderedDict(dict):
239 def viewitems(self): 239 def viewitems(self):
240 "od.viewitems() -> a set-like object providing a view on od's items" 240 "od.viewitems() -> a set-like object providing a view on od's items"
241 return ItemsView(self) 241 return ItemsView(self)
242
243# Multiprocessing pool code imported from python 2.7.3. Previous versions of
244# python have issues in this code which hang pool usage
245
246#
247# Module providing the `Pool` class for managing a process pool
248#
249# multiprocessing/pool.py
250#
251# Copyright (c) 2006-2008, R Oudkerk
252# All rights reserved.
253#
254# Redistribution and use in source and binary forms, with or without
255# modification, are permitted provided that the following conditions
256# are met:
257#
258# 1. Redistributions of source code must retain the above copyright
259# notice, this list of conditions and the following disclaimer.
260# 2. Redistributions in binary form must reproduce the above copyright
261# notice, this list of conditions and the following disclaimer in the
262# documentation and/or other materials provided with the distribution.
263# 3. Neither the name of author nor the names of any contributors may be
264# used to endorse or promote products derived from this software
265# without specific prior written permission.
266#
267# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
268# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
269# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
270# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
271# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
272# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
273# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
274# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
275# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
276# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
277# SUCH DAMAGE.
278#
279import threading
280import Queue
281import itertools
282import collections
283import time
284
285import multiprocessing
286from multiprocessing import Process, cpu_count, TimeoutError, pool
287from multiprocessing.util import Finalize, debug
288
289#
290# Constants representing the state of a pool
291#
292
293RUN = 0
294CLOSE = 1
295TERMINATE = 2
296
297#
298# Miscellaneous
299#
300
301def mapstar(args):
302 return map(*args)
303
304class MaybeEncodingError(Exception):
305 """Wraps possible unpickleable errors, so they can be
306 safely sent through the socket."""
307
308 def __init__(self, exc, value):
309 self.exc = repr(exc)
310 self.value = repr(value)
311 super(MaybeEncodingError, self).__init__(self.exc, self.value)
312
313 def __str__(self):
314 return "Error sending result: '%s'. Reason: '%s'" % (self.value,
315 self.exc)
316
317 def __repr__(self):
318 return "<MaybeEncodingError: %s>" % str(self)
319
320def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
321 assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
322 put = outqueue.put
323 get = inqueue.get
324 if hasattr(inqueue, '_writer'):
325 inqueue._writer.close()
326 outqueue._reader.close()
327
328 if initializer is not None:
329 initializer(*initargs)
330
331 completed = 0
332 while maxtasks is None or (maxtasks and completed < maxtasks):
333 try:
334 task = get()
335 except (EOFError, IOError):
336 debug('worker got EOFError or IOError -- exiting')
337 break
338
339 if task is None:
340 debug('worker got sentinel -- exiting')
341 break
342
343 job, i, func, args, kwds = task
344 try:
345 result = (True, func(*args, **kwds))
346 except Exception, e:
347 result = (False, e)
348 try:
349 put((job, i, result))
350 except Exception as e:
351 wrapped = MaybeEncodingError(e, result[1])
352 debug("Possible encoding error while sending result: %s" % (
353 wrapped))
354 put((job, i, (False, wrapped)))
355 completed += 1
356 debug('worker exiting after %d tasks' % completed)
357
358
359class Pool(object):
360 '''
361 Class which supports an async version of the `apply()` builtin
362 '''
363 Process = Process
364
365 def __init__(self, processes=None, initializer=None, initargs=(),
366 maxtasksperchild=None):
367 self._setup_queues()
368 self._taskqueue = Queue.Queue()
369 self._cache = {}
370 self._state = RUN
371 self._maxtasksperchild = maxtasksperchild
372 self._initializer = initializer
373 self._initargs = initargs
374
375 if processes is None:
376 try:
377 processes = cpu_count()
378 except NotImplementedError:
379 processes = 1
380 if processes < 1:
381 raise ValueError("Number of processes must be at least 1")
382
383 if initializer is not None and not hasattr(initializer, '__call__'):
384 raise TypeError('initializer must be a callable')
385
386 self._processes = processes
387 self._pool = []
388 self._repopulate_pool()
389
390 self._worker_handler = threading.Thread(
391 target=Pool._handle_workers,
392 args=(self, )
393 )
394 self._worker_handler.daemon = True
395 self._worker_handler._state = RUN
396 self._worker_handler.start()
397
398
399 self._task_handler = threading.Thread(
400 target=Pool._handle_tasks,
401 args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
402 )
403 self._task_handler.daemon = True
404 self._task_handler._state = RUN
405 self._task_handler.start()
406
407 self._result_handler = threading.Thread(
408 target=Pool._handle_results,
409 args=(self._outqueue, self._quick_get, self._cache)
410 )
411 self._result_handler.daemon = True
412 self._result_handler._state = RUN
413 self._result_handler.start()
414
415 self._terminate = Finalize(
416 self, self._terminate_pool,
417 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
418 self._worker_handler, self._task_handler,
419 self._result_handler, self._cache),
420 exitpriority=15
421 )
422
423 def _join_exited_workers(self):
424 """Cleanup after any worker processes which have exited due to reaching
425 their specified lifetime. Returns True if any workers were cleaned up.
426 """
427 cleaned = False
428 for i in reversed(range(len(self._pool))):
429 worker = self._pool[i]
430 if worker.exitcode is not None:
431 # worker exited
432 debug('cleaning up worker %d' % i)
433 worker.join()
434 cleaned = True
435 del self._pool[i]
436 return cleaned
437
438 def _repopulate_pool(self):
439 """Bring the number of pool processes up to the specified number,
440 for use after reaping workers which have exited.
441 """
442 for i in range(self._processes - len(self._pool)):
443 w = self.Process(target=worker,
444 args=(self._inqueue, self._outqueue,
445 self._initializer,
446 self._initargs, self._maxtasksperchild)
447 )
448 self._pool.append(w)
449 w.name = w.name.replace('Process', 'PoolWorker')
450 w.daemon = True
451 w.start()
452 debug('added worker')
453
454 def _maintain_pool(self):
455 """Clean up any exited workers and start replacements for them.
456 """
457 if self._join_exited_workers():
458 self._repopulate_pool()
459
460 def _setup_queues(self):
461 from multiprocessing.queues import SimpleQueue
462 self._inqueue = SimpleQueue()
463 self._outqueue = SimpleQueue()
464 self._quick_put = self._inqueue._writer.send
465 self._quick_get = self._outqueue._reader.recv
466
467 def apply(self, func, args=(), kwds={}):
468 '''
469 Equivalent of `apply()` builtin
470 '''
471 assert self._state == RUN
472 return self.apply_async(func, args, kwds).get()
473
474 def map(self, func, iterable, chunksize=None):
475 '''
476 Equivalent of `map()` builtin
477 '''
478 assert self._state == RUN
479 return self.map_async(func, iterable, chunksize).get()
480
481 def imap(self, func, iterable, chunksize=1):
482 '''
483 Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
484 '''
485 assert self._state == RUN
486 if chunksize == 1:
487 result = IMapIterator(self._cache)
488 self._taskqueue.put((((result._job, i, func, (x,), {})
489 for i, x in enumerate(iterable)), result._set_length))
490 return result
491 else:
492 assert chunksize > 1
493 task_batches = Pool._get_tasks(func, iterable, chunksize)
494 result = IMapIterator(self._cache)
495 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
496 for i, x in enumerate(task_batches)), result._set_length))
497 return (item for chunk in result for item in chunk)
498
499 def imap_unordered(self, func, iterable, chunksize=1):
500 '''
501 Like `imap()` method but ordering of results is arbitrary
502 '''
503 assert self._state == RUN
504 if chunksize == 1:
505 result = IMapUnorderedIterator(self._cache)
506 self._taskqueue.put((((result._job, i, func, (x,), {})
507 for i, x in enumerate(iterable)), result._set_length))
508 return result
509 else:
510 assert chunksize > 1
511 task_batches = Pool._get_tasks(func, iterable, chunksize)
512 result = IMapUnorderedIterator(self._cache)
513 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
514 for i, x in enumerate(task_batches)), result._set_length))
515 return (item for chunk in result for item in chunk)
516
517 def apply_async(self, func, args=(), kwds={}, callback=None):
518 '''
519 Asynchronous equivalent of `apply()` builtin
520 '''
521 assert self._state == RUN
522 result = ApplyResult(self._cache, callback)
523 self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
524 return result
525
526 def map_async(self, func, iterable, chunksize=None, callback=None):
527 '''
528 Asynchronous equivalent of `map()` builtin
529 '''
530 assert self._state == RUN
531 if not hasattr(iterable, '__len__'):
532 iterable = list(iterable)
533
534 if chunksize is None:
535 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
536 if extra:
537 chunksize += 1
538 if len(iterable) == 0:
539 chunksize = 0
540
541 task_batches = Pool._get_tasks(func, iterable, chunksize)
542 result = MapResult(self._cache, chunksize, len(iterable), callback)
543 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
544 for i, x in enumerate(task_batches)), None))
545 return result
546
547 @staticmethod
548 def _handle_workers(pool):
549 thread = threading.current_thread()
550
551 # Keep maintaining workers until the cache gets drained, unless the pool
552 # is terminated.
553 while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
554 pool._maintain_pool()
555 time.sleep(0.1)
556 # send sentinel to stop workers
557 pool._taskqueue.put(None)
558 debug('worker handler exiting')
559
560 @staticmethod
561 def _handle_tasks(taskqueue, put, outqueue, pool):
562 thread = threading.current_thread()
563
564 for taskseq, set_length in iter(taskqueue.get, None):
565 i = -1
566 for i, task in enumerate(taskseq):
567 if thread._state:
568 debug('task handler found thread._state != RUN')
569 break
570 try:
571 put(task)
572 except IOError:
573 debug('could not put task on queue')
574 break
575 else:
576 if set_length:
577 debug('doing set_length()')
578 set_length(i+1)
579 continue
580 break
581 else:
582 debug('task handler got sentinel')
583
584
585 try:
586 # tell result handler to finish when cache is empty
587 debug('task handler sending sentinel to result handler')
588 outqueue.put(None)
589
590 # tell workers there is no more work
591 debug('task handler sending sentinel to workers')
592 for p in pool:
593 put(None)
594 except IOError:
595 debug('task handler got IOError when sending sentinels')
596
597 debug('task handler exiting')
598
599 @staticmethod
600 def _handle_results(outqueue, get, cache):
601 thread = threading.current_thread()
602
603 while 1:
604 try:
605 task = get()
606 except (IOError, EOFError):
607 debug('result handler got EOFError/IOError -- exiting')
608 return
609
610 if thread._state:
611 assert thread._state == TERMINATE
612 debug('result handler found thread._state=TERMINATE')
613 break
614
615 if task is None:
616 debug('result handler got sentinel')
617 break
618
619 job, i, obj = task
620 try:
621 cache[job]._set(i, obj)
622 except KeyError:
623 pass
624
625 while cache and thread._state != TERMINATE:
626 try:
627 task = get()
628 except (IOError, EOFError):
629 debug('result handler got EOFError/IOError -- exiting')
630 return
631
632 if task is None:
633 debug('result handler ignoring extra sentinel')
634 continue
635 job, i, obj = task
636 try:
637 cache[job]._set(i, obj)
638 except KeyError:
639 pass
640
641 if hasattr(outqueue, '_reader'):
642 debug('ensuring that outqueue is not full')
643 # If we don't make room available in outqueue then
644 # attempts to add the sentinel (None) to outqueue may
645 # block. There is guaranteed to be no more than 2 sentinels.
646 try:
647 for i in range(10):
648 if not outqueue._reader.poll():
649 break
650 get()
651 except (IOError, EOFError):
652 pass
653
654 debug('result handler exiting: len(cache)=%s, thread._state=%s',
655 len(cache), thread._state)
656
657 @staticmethod
658 def _get_tasks(func, it, size):
659 it = iter(it)
660 while 1:
661 x = tuple(itertools.islice(it, size))
662 if not x:
663 return
664 yield (func, x)
665
666 def __reduce__(self):
667 raise NotImplementedError(
668 'pool objects cannot be passed between processes or pickled'
669 )
670
671 def close(self):
672 debug('closing pool')
673 if self._state == RUN:
674 self._state = CLOSE
675 self._worker_handler._state = CLOSE
676
677 def terminate(self):
678 debug('terminating pool')
679 self._state = TERMINATE
680 self._worker_handler._state = TERMINATE
681 self._terminate()
682
683 def join(self):
684 debug('joining pool')
685 assert self._state in (CLOSE, TERMINATE)
686 self._worker_handler.join()
687 self._task_handler.join()
688 self._result_handler.join()
689 for p in self._pool:
690 p.join()
691
692 @staticmethod
693 def _help_stuff_finish(inqueue, task_handler, size):
694 # task_handler may be blocked trying to put items on inqueue
695 debug('removing tasks from inqueue until task handler finished')
696 inqueue._rlock.acquire()
697 while task_handler.is_alive() and inqueue._reader.poll():
698 inqueue._reader.recv()
699 time.sleep(0)
700
701 @classmethod
702 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
703 worker_handler, task_handler, result_handler, cache):
704 # this is guaranteed to only be called once
705 debug('finalizing pool')
706
707 worker_handler._state = TERMINATE
708 task_handler._state = TERMINATE
709
710 debug('helping task handler/workers to finish')
711 cls._help_stuff_finish(inqueue, task_handler, len(pool))
712
713 assert result_handler.is_alive() or len(cache) == 0
714
715 result_handler._state = TERMINATE
716 outqueue.put(None) # sentinel
717
718 # We must wait for the worker handler to exit before terminating
719 # workers because we don't want workers to be restarted behind our back.
720 debug('joining worker handler')
721 if threading.current_thread() is not worker_handler:
722 worker_handler.join(1e100)
723
724 # Terminate workers which haven't already finished.
725 if pool and hasattr(pool[0], 'terminate'):
726 debug('terminating workers')
727 for p in pool:
728 if p.exitcode is None:
729 p.terminate()
730
731 debug('joining task handler')
732 if threading.current_thread() is not task_handler:
733 task_handler.join(1e100)
734
735 debug('joining result handler')
736 if threading.current_thread() is not result_handler:
737 result_handler.join(1e100)
738
739 if pool and hasattr(pool[0], 'terminate'):
740 debug('joining pool workers')
741 for p in pool:
742 if p.is_alive():
743 # worker has not yet exited
744 debug('cleaning up worker %d' % p.pid)
745 p.join()
746
747class ApplyResult(object):
748
749 def __init__(self, cache, callback):
750 self._cond = threading.Condition(threading.Lock())
751 self._job = multiprocessing.pool.job_counter.next()
752 self._cache = cache
753 self._ready = False
754 self._callback = callback
755 cache[self._job] = self
756
757 def ready(self):
758 return self._ready
759
760 def successful(self):
761 assert self._ready
762 return self._success
763
764 def wait(self, timeout=None):
765 self._cond.acquire()
766 try:
767 if not self._ready:
768 self._cond.wait(timeout)
769 finally:
770 self._cond.release()
771
772 def get(self, timeout=None):
773 self.wait(timeout)
774 if not self._ready:
775 raise TimeoutError
776 if self._success:
777 return self._value
778 else:
779 raise self._value
780
781 def _set(self, i, obj):
782 self._success, self._value = obj
783 if self._callback and self._success:
784 self._callback(self._value)
785 self._cond.acquire()
786 try:
787 self._ready = True
788 self._cond.notify()
789 finally:
790 self._cond.release()
791 del self._cache[self._job]
792
793#
794# Class whose instances are returned by `Pool.map_async()`
795#
796
797class MapResult(ApplyResult):
798
799 def __init__(self, cache, chunksize, length, callback):
800 ApplyResult.__init__(self, cache, callback)
801 self._success = True
802 self._value = [None] * length
803 self._chunksize = chunksize
804 if chunksize <= 0:
805 self._number_left = 0
806 self._ready = True
807 del cache[self._job]
808 else:
809 self._number_left = length//chunksize + bool(length % chunksize)
810
811 def _set(self, i, success_result):
812 success, result = success_result
813 if success:
814 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
815 self._number_left -= 1
816 if self._number_left == 0:
817 if self._callback:
818 self._callback(self._value)
819 del self._cache[self._job]
820 self._cond.acquire()
821 try:
822 self._ready = True
823 self._cond.notify()
824 finally:
825 self._cond.release()
826
827 else:
828 self._success = False
829 self._value = result
830 del self._cache[self._job]
831 self._cond.acquire()
832 try:
833 self._ready = True
834 self._cond.notify()
835 finally:
836 self._cond.release()
837
838#
839# Class whose instances are returned by `Pool.imap()`
840#
841
842class IMapIterator(object):
843
844 def __init__(self, cache):
845 self._cond = threading.Condition(threading.Lock())
846 self._job = multiprocessing.pool.job_counter.next()
847 self._cache = cache
848 self._items = collections.deque()
849 self._index = 0
850 self._length = None
851 self._unsorted = {}
852 cache[self._job] = self
853
854 def __iter__(self):
855 return self
856
857 def next(self, timeout=None):
858 self._cond.acquire()
859 try:
860 try:
861 item = self._items.popleft()
862 except IndexError:
863 if self._index == self._length:
864 raise StopIteration
865 self._cond.wait(timeout)
866 try:
867 item = self._items.popleft()
868 except IndexError:
869 if self._index == self._length:
870 raise StopIteration
871 raise TimeoutError
872 finally:
873 self._cond.release()
874
875 success, value = item
876 if success:
877 return value
878 raise value
879
880 __next__ = next # XXX
881
882 def _set(self, i, obj):
883 self._cond.acquire()
884 try:
885 if self._index == i:
886 self._items.append(obj)
887 self._index += 1
888 while self._index in self._unsorted:
889 obj = self._unsorted.pop(self._index)
890 self._items.append(obj)
891 self._index += 1
892 self._cond.notify()
893 else:
894 self._unsorted[i] = obj
895
896 if self._index == self._length:
897 del self._cache[self._job]
898 finally:
899 self._cond.release()
900
901 def _set_length(self, length):
902 self._cond.acquire()
903 try:
904 self._length = length
905 if self._index == self._length:
906 self._cond.notify()
907 del self._cache[self._job]
908 finally:
909 self._cond.release()
910
911#
912# Class whose instances are returned by `Pool.imap_unordered()`
913#
914
915class IMapUnorderedIterator(IMapIterator):
916
917 def _set(self, i, obj):
918 self._cond.acquire()
919 try:
920 self._items.append(obj)
921 self._index += 1
922 self._cond.notify()
923 if self._index == self._length:
924 del self._cache[self._job]
925 finally:
926 self._cond.release()
927
928
diff --git a/bitbake/lib/bb/utils.py b/bitbake/lib/bb/utils.py
index 7e81df5855..83159a6465 100644
--- a/bitbake/lib/bb/utils.py
+++ b/bitbake/lib/bb/utils.py
@@ -839,4 +839,12 @@ def process_profilelog(fn):
839 pout.flush() 839 pout.flush()
840 pout.close() 840 pout.close()
841 841
842#
843# Work around multiprocessing pool bugs in python < 2.7.3
844#
845def multiprocessingpool(*args, **kwargs):
846 if sys.version_info < (2, 7, 3):
847 return bb.compat.Pool(*args, **kwargs)
848 else:
849 return multiprocessing.pool.Pool(*args, **kwargs)
842 850