summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-14 16:22:51 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-14 17:26:30 +0100
commit441c699acbb4b443c705933c1fccee0d906a1262 (patch)
tree63816b0140acf3052a1b673ecd8d2400f3a528b3 /bitbake
parent6c058341f9b0bee6af2554d897a2623a2ea9a479 (diff)
downloadpoky-441c699acbb4b443c705933c1fccee0d906a1262.tar.gz
bitbake: compat/server/utils: Jettison pre python 2.7.3 workarounds
Now we've moved to require python 2.7.3, we can jettison the compatibility workarounds/hacks for older python versions. (Bitbake rev: a51c402304f2080a76720f9b31d6dfdbed393bba) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/bb/compat.py926
-rw-r--r--bitbake/lib/bb/server/process.py46
-rw-r--r--bitbake/lib/bb/server/xmlrpc.py104
-rw-r--r--bitbake/lib/bb/utils.py7
4 files changed, 15 insertions, 1068 deletions
diff --git a/bitbake/lib/bb/compat.py b/bitbake/lib/bb/compat.py
index 440a2fbc8b..de1923d28a 100644
--- a/bitbake/lib/bb/compat.py
+++ b/bitbake/lib/bb/compat.py
@@ -1,928 +1,6 @@
1"""Code pulled from future python versions, here for compatibility""" 1"""Code pulled from future python versions, here for compatibility"""
2 2
3from collections import MutableMapping, KeysView, ValuesView, ItemsView 3from collections import MutableMapping, KeysView, ValuesView, ItemsView, OrderedDict
4try: 4from functools import total_ordering
5 from thread import get_ident as _get_ident
6except ImportError:
7 from dummy_thread import get_ident as _get_ident
8
9def total_ordering(cls):
10 """Class decorator that fills in missing ordering methods"""
11 convert = {
12 '__lt__': [('__gt__', lambda self, other: other < self),
13 ('__le__', lambda self, other: not other < self),
14 ('__ge__', lambda self, other: not self < other)],
15 '__le__': [('__ge__', lambda self, other: other <= self),
16 ('__lt__', lambda self, other: not other <= self),
17 ('__gt__', lambda self, other: not self <= other)],
18 '__gt__': [('__lt__', lambda self, other: other > self),
19 ('__ge__', lambda self, other: not other > self),
20 ('__le__', lambda self, other: not self > other)],
21 '__ge__': [('__le__', lambda self, other: other >= self),
22 ('__gt__', lambda self, other: not other >= self),
23 ('__lt__', lambda self, other: not self >= other)]
24 }
25 roots = set(dir(cls)) & set(convert)
26 if not roots:
27 raise ValueError('must define at least one ordering operation: < > <= >=')
28 root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__
29 for opname, opfunc in convert[root]:
30 if opname not in roots:
31 opfunc.__name__ = opname
32 opfunc.__doc__ = getattr(int, opname).__doc__
33 setattr(cls, opname, opfunc)
34 return cls
35
36class OrderedDict(dict):
37 'Dictionary that remembers insertion order'
38 # An inherited dict maps keys to values.
39 # The inherited dict provides __getitem__, __len__, __contains__, and get.
40 # The remaining methods are order-aware.
41 # Big-O running times for all methods are the same as regular dictionaries.
42
43 # The internal self.__map dict maps keys to links in a doubly linked list.
44 # The circular doubly linked list starts and ends with a sentinel element.
45 # The sentinel element never gets deleted (this simplifies the algorithm).
46 # Each link is stored as a list of length three: [PREV, NEXT, KEY].
47
48 def __init__(self, *args, **kwds):
49 '''Initialize an ordered dictionary. The signature is the same as
50 regular dictionaries, but keyword arguments are not recommended because
51 their insertion order is arbitrary.
52
53 '''
54 if len(args) > 1:
55 raise TypeError('expected at most 1 arguments, got %d' % len(args))
56 try:
57 self.__root
58 except AttributeError:
59 self.__root = root = [] # sentinel node
60 root[:] = [root, root, None]
61 self.__map = {}
62 self.__update(*args, **kwds)
63
64 def __setitem__(self, key, value, PREV=0, NEXT=1, dict_setitem=dict.__setitem__):
65 'od.__setitem__(i, y) <==> od[i]=y'
66 # Setting a new item creates a new link at the end of the linked list,
67 # and the inherited dictionary is updated with the new key/value pair.
68 if key not in self:
69 root = self.__root
70 last = root[PREV]
71 last[NEXT] = root[PREV] = self.__map[key] = [last, root, key]
72 dict_setitem(self, key, value)
73
74 def __delitem__(self, key, PREV=0, NEXT=1, dict_delitem=dict.__delitem__):
75 'od.__delitem__(y) <==> del od[y]'
76 # Deleting an existing item uses self.__map to find the link which gets
77 # removed by updating the links in the predecessor and successor nodes.
78 dict_delitem(self, key)
79 link_prev, link_next, key = self.__map.pop(key)
80 link_prev[NEXT] = link_next
81 link_next[PREV] = link_prev
82
83 def __iter__(self):
84 'od.__iter__() <==> iter(od)'
85 # Traverse the linked list in order.
86 NEXT, KEY = 1, 2
87 root = self.__root
88 curr = root[NEXT]
89 while curr is not root:
90 yield curr[KEY]
91 curr = curr[NEXT]
92
93 def __reversed__(self):
94 'od.__reversed__() <==> reversed(od)'
95 # Traverse the linked list in reverse order.
96 PREV, KEY = 0, 2
97 root = self.__root
98 curr = root[PREV]
99 while curr is not root:
100 yield curr[KEY]
101 curr = curr[PREV]
102
103 def clear(self):
104 'od.clear() -> None. Remove all items from od.'
105 for node in self.__map.itervalues():
106 del node[:]
107 root = self.__root
108 root[:] = [root, root, None]
109 self.__map.clear()
110 dict.clear(self)
111
112 # -- the following methods do not depend on the internal structure --
113
114 def keys(self):
115 'od.keys() -> list of keys in od'
116 return list(self)
117
118 def values(self):
119 'od.values() -> list of values in od'
120 return [self[key] for key in self]
121
122 def items(self):
123 'od.items() -> list of (key, value) pairs in od'
124 return [(key, self[key]) for key in self]
125
126 def iterkeys(self):
127 'od.iterkeys() -> an iterator over the keys in od'
128 return iter(self)
129
130 def itervalues(self):
131 'od.itervalues -> an iterator over the values in od'
132 for k in self:
133 yield self[k]
134
135 def iteritems(self):
136 'od.iteritems -> an iterator over the (key, value) pairs in od'
137 for k in self:
138 yield (k, self[k])
139
140 update = MutableMapping.update
141
142 __update = update # let subclasses override update without breaking __init__
143
144 __marker = object()
145
146 def pop(self, key, default=__marker):
147 '''od.pop(k[,d]) -> v, remove specified key and return the corresponding
148 value. If key is not found, d is returned if given, otherwise KeyError
149 is raised.
150
151 '''
152 if key in self:
153 result = self[key]
154 del self[key]
155 return result
156 if default is self.__marker:
157 raise KeyError(key)
158 return default
159
160 def setdefault(self, key, default=None):
161 'od.setdefault(k[,d]) -> od.get(k,d), also set od[k]=d if k not in od'
162 if key in self:
163 return self[key]
164 self[key] = default
165 return default
166
167 def popitem(self, last=True):
168 '''od.popitem() -> (k, v), return and remove a (key, value) pair.
169 Pairs are returned in LIFO order if last is true or FIFO order if false.
170
171 '''
172 if not self:
173 raise KeyError('dictionary is empty')
174 key = next(reversed(self) if last else iter(self))
175 value = self.pop(key)
176 return key, value
177
178 def __repr__(self, _repr_running={}):
179 'od.__repr__() <==> repr(od)'
180 call_key = id(self), _get_ident()
181 if call_key in _repr_running:
182 return '...'
183 _repr_running[call_key] = 1
184 try:
185 if not self:
186 return '%s()' % (self.__class__.__name__,)
187 return '%s(%r)' % (self.__class__.__name__, self.items())
188 finally:
189 del _repr_running[call_key]
190
191 def __reduce__(self):
192 'Return state information for pickling'
193 items = [[k, self[k]] for k in self]
194 inst_dict = vars(self).copy()
195 for k in vars(OrderedDict()):
196 inst_dict.pop(k, None)
197 if inst_dict:
198 return (self.__class__, (items,), inst_dict)
199 return self.__class__, (items,)
200
201 def copy(self):
202 'od.copy() -> a shallow copy of od'
203 return self.__class__(self)
204
205 @classmethod
206 def fromkeys(cls, iterable, value=None):
207 '''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S.
208 If not specified, the value defaults to None.
209
210 '''
211 self = cls()
212 for key in iterable:
213 self[key] = value
214 return self
215
216 def __eq__(self, other):
217 '''od.__eq__(y) <==> od==y. Comparison to another OD is order-sensitive
218 while comparison to a regular mapping is order-insensitive.
219
220 '''
221 if isinstance(other, OrderedDict):
222 return len(self)==len(other) and self.items() == other.items()
223 return dict.__eq__(self, other)
224
225 def __ne__(self, other):
226 'od.__ne__(y) <==> od!=y'
227 return not self == other
228
229 # -- the following methods support python 3.x style dictionary views --
230
231 def viewkeys(self):
232 "od.viewkeys() -> a set-like object providing a view on od's keys"
233 return KeysView(self)
234
235 def viewvalues(self):
236 "od.viewvalues() -> an object providing a view on od's values"
237 return ValuesView(self)
238
239 def viewitems(self):
240 "od.viewitems() -> a set-like object providing a view on od's items"
241 return ItemsView(self)
242
243# Multiprocessing pool code imported from python 2.7.3. Previous versions of
244# python have issues in this code which hang pool usage
245
246#
247# Module providing the `Pool` class for managing a process pool
248#
249# multiprocessing/pool.py
250#
251# Copyright (c) 2006-2008, R Oudkerk
252# All rights reserved.
253#
254# Redistribution and use in source and binary forms, with or without
255# modification, are permitted provided that the following conditions
256# are met:
257#
258# 1. Redistributions of source code must retain the above copyright
259# notice, this list of conditions and the following disclaimer.
260# 2. Redistributions in binary form must reproduce the above copyright
261# notice, this list of conditions and the following disclaimer in the
262# documentation and/or other materials provided with the distribution.
263# 3. Neither the name of author nor the names of any contributors may be
264# used to endorse or promote products derived from this software
265# without specific prior written permission.
266#
267# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
268# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
269# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
270# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
271# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
272# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
273# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
274# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
275# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
276# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
277# SUCH DAMAGE.
278#
279import threading
280import Queue
281import itertools
282import collections
283import time
284
285import multiprocessing
286from multiprocessing import Process, cpu_count, TimeoutError, pool
287from multiprocessing.util import Finalize, debug
288
289#
290# Constants representing the state of a pool
291#
292
293RUN = 0
294CLOSE = 1
295TERMINATE = 2
296
297#
298# Miscellaneous
299#
300
301def mapstar(args):
302 return map(*args)
303
304class MaybeEncodingError(Exception):
305 """Wraps possible unpickleable errors, so they can be
306 safely sent through the socket."""
307
308 def __init__(self, exc, value):
309 self.exc = repr(exc)
310 self.value = repr(value)
311 super(MaybeEncodingError, self).__init__(self.exc, self.value)
312
313 def __str__(self):
314 return "Error sending result: '%s'. Reason: '%s'" % (self.value,
315 self.exc)
316
317 def __repr__(self):
318 return "<MaybeEncodingError: %s>" % str(self)
319
320def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
321 assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
322 put = outqueue.put
323 get = inqueue.get
324 if hasattr(inqueue, '_writer'):
325 inqueue._writer.close()
326 outqueue._reader.close()
327
328 if initializer is not None:
329 initializer(*initargs)
330
331 completed = 0
332 while maxtasks is None or (maxtasks and completed < maxtasks):
333 try:
334 task = get()
335 except (EOFError, IOError):
336 debug('worker got EOFError or IOError -- exiting')
337 break
338
339 if task is None:
340 debug('worker got sentinel -- exiting')
341 break
342
343 job, i, func, args, kwds = task
344 try:
345 result = (True, func(*args, **kwds))
346 except Exception as e:
347 result = (False, e)
348 try:
349 put((job, i, result))
350 except Exception as e:
351 wrapped = MaybeEncodingError(e, result[1])
352 debug("Possible encoding error while sending result: %s" % (
353 wrapped))
354 put((job, i, (False, wrapped)))
355 completed += 1
356 debug('worker exiting after %d tasks' % completed)
357
358
359class Pool(object):
360 '''
361 Class which supports an async version of the `apply()` builtin
362 '''
363 Process = Process
364
365 def __init__(self, processes=None, initializer=None, initargs=(),
366 maxtasksperchild=None):
367 self._setup_queues()
368 self._taskqueue = Queue.Queue()
369 self._cache = {}
370 self._state = RUN
371 self._maxtasksperchild = maxtasksperchild
372 self._initializer = initializer
373 self._initargs = initargs
374
375 if processes is None:
376 try:
377 processes = cpu_count()
378 except NotImplementedError:
379 processes = 1
380 if processes < 1:
381 raise ValueError("Number of processes must be at least 1")
382
383 if initializer is not None and not hasattr(initializer, '__call__'):
384 raise TypeError('initializer must be a callable')
385
386 self._processes = processes
387 self._pool = []
388 self._repopulate_pool()
389
390 self._worker_handler = threading.Thread(
391 target=Pool._handle_workers,
392 args=(self, )
393 )
394 self._worker_handler.daemon = True
395 self._worker_handler._state = RUN
396 self._worker_handler.start()
397
398
399 self._task_handler = threading.Thread(
400 target=Pool._handle_tasks,
401 args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
402 )
403 self._task_handler.daemon = True
404 self._task_handler._state = RUN
405 self._task_handler.start()
406
407 self._result_handler = threading.Thread(
408 target=Pool._handle_results,
409 args=(self._outqueue, self._quick_get, self._cache)
410 )
411 self._result_handler.daemon = True
412 self._result_handler._state = RUN
413 self._result_handler.start()
414
415 self._terminate = Finalize(
416 self, self._terminate_pool,
417 args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
418 self._worker_handler, self._task_handler,
419 self._result_handler, self._cache),
420 exitpriority=15
421 )
422
423 def _join_exited_workers(self):
424 """Cleanup after any worker processes which have exited due to reaching
425 their specified lifetime. Returns True if any workers were cleaned up.
426 """
427 cleaned = False
428 for i in reversed(range(len(self._pool))):
429 worker = self._pool[i]
430 if worker.exitcode is not None:
431 # worker exited
432 debug('cleaning up worker %d' % i)
433 worker.join()
434 cleaned = True
435 del self._pool[i]
436 return cleaned
437
438 def _repopulate_pool(self):
439 """Bring the number of pool processes up to the specified number,
440 for use after reaping workers which have exited.
441 """
442 for i in range(self._processes - len(self._pool)):
443 w = self.Process(target=worker,
444 args=(self._inqueue, self._outqueue,
445 self._initializer,
446 self._initargs, self._maxtasksperchild)
447 )
448 self._pool.append(w)
449 w.name = w.name.replace('Process', 'PoolWorker')
450 w.daemon = True
451 w.start()
452 debug('added worker')
453
454 def _maintain_pool(self):
455 """Clean up any exited workers and start replacements for them.
456 """
457 if self._join_exited_workers():
458 self._repopulate_pool()
459
460 def _setup_queues(self):
461 from multiprocessing.queues import SimpleQueue
462 self._inqueue = SimpleQueue()
463 self._outqueue = SimpleQueue()
464 self._quick_put = self._inqueue._writer.send
465 self._quick_get = self._outqueue._reader.recv
466
467 def apply(self, func, args=(), kwds={}):
468 '''
469 Equivalent of `apply()` builtin
470 '''
471 assert self._state == RUN
472 return self.apply_async(func, args, kwds).get()
473
474 def map(self, func, iterable, chunksize=None):
475 '''
476 Equivalent of `map()` builtin
477 '''
478 assert self._state == RUN
479 return self.map_async(func, iterable, chunksize).get()
480
481 def imap(self, func, iterable, chunksize=1):
482 '''
483 Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
484 '''
485 assert self._state == RUN
486 if chunksize == 1:
487 result = IMapIterator(self._cache)
488 self._taskqueue.put((((result._job, i, func, (x,), {})
489 for i, x in enumerate(iterable)), result._set_length))
490 return result
491 else:
492 assert chunksize > 1
493 task_batches = Pool._get_tasks(func, iterable, chunksize)
494 result = IMapIterator(self._cache)
495 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
496 for i, x in enumerate(task_batches)), result._set_length))
497 return (item for chunk in result for item in chunk)
498
499 def imap_unordered(self, func, iterable, chunksize=1):
500 '''
501 Like `imap()` method but ordering of results is arbitrary
502 '''
503 assert self._state == RUN
504 if chunksize == 1:
505 result = IMapUnorderedIterator(self._cache)
506 self._taskqueue.put((((result._job, i, func, (x,), {})
507 for i, x in enumerate(iterable)), result._set_length))
508 return result
509 else:
510 assert chunksize > 1
511 task_batches = Pool._get_tasks(func, iterable, chunksize)
512 result = IMapUnorderedIterator(self._cache)
513 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
514 for i, x in enumerate(task_batches)), result._set_length))
515 return (item for chunk in result for item in chunk)
516
517 def apply_async(self, func, args=(), kwds={}, callback=None):
518 '''
519 Asynchronous equivalent of `apply()` builtin
520 '''
521 assert self._state == RUN
522 result = ApplyResult(self._cache, callback)
523 self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
524 return result
525
526 def map_async(self, func, iterable, chunksize=None, callback=None):
527 '''
528 Asynchronous equivalent of `map()` builtin
529 '''
530 assert self._state == RUN
531 if not hasattr(iterable, '__len__'):
532 iterable = list(iterable)
533
534 if chunksize is None:
535 chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
536 if extra:
537 chunksize += 1
538 if len(iterable) == 0:
539 chunksize = 0
540
541 task_batches = Pool._get_tasks(func, iterable, chunksize)
542 result = MapResult(self._cache, chunksize, len(iterable), callback)
543 self._taskqueue.put((((result._job, i, mapstar, (x,), {})
544 for i, x in enumerate(task_batches)), None))
545 return result
546
547 @staticmethod
548 def _handle_workers(pool):
549 thread = threading.current_thread()
550
551 # Keep maintaining workers until the cache gets drained, unless the pool
552 # is terminated.
553 while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
554 pool._maintain_pool()
555 time.sleep(0.1)
556 # send sentinel to stop workers
557 pool._taskqueue.put(None)
558 debug('worker handler exiting')
559
560 @staticmethod
561 def _handle_tasks(taskqueue, put, outqueue, pool):
562 thread = threading.current_thread()
563
564 for taskseq, set_length in iter(taskqueue.get, None):
565 i = -1
566 for i, task in enumerate(taskseq):
567 if thread._state:
568 debug('task handler found thread._state != RUN')
569 break
570 try:
571 put(task)
572 except IOError:
573 debug('could not put task on queue')
574 break
575 else:
576 if set_length:
577 debug('doing set_length()')
578 set_length(i+1)
579 continue
580 break
581 else:
582 debug('task handler got sentinel')
583
584
585 try:
586 # tell result handler to finish when cache is empty
587 debug('task handler sending sentinel to result handler')
588 outqueue.put(None)
589
590 # tell workers there is no more work
591 debug('task handler sending sentinel to workers')
592 for p in pool:
593 put(None)
594 except IOError:
595 debug('task handler got IOError when sending sentinels')
596
597 debug('task handler exiting')
598
599 @staticmethod
600 def _handle_results(outqueue, get, cache):
601 thread = threading.current_thread()
602
603 while 1:
604 try:
605 task = get()
606 except (IOError, EOFError):
607 debug('result handler got EOFError/IOError -- exiting')
608 return
609
610 if thread._state:
611 assert thread._state == TERMINATE
612 debug('result handler found thread._state=TERMINATE')
613 break
614
615 if task is None:
616 debug('result handler got sentinel')
617 break
618
619 job, i, obj = task
620 try:
621 cache[job]._set(i, obj)
622 except KeyError:
623 pass
624
625 while cache and thread._state != TERMINATE:
626 try:
627 task = get()
628 except (IOError, EOFError):
629 debug('result handler got EOFError/IOError -- exiting')
630 return
631
632 if task is None:
633 debug('result handler ignoring extra sentinel')
634 continue
635 job, i, obj = task
636 try:
637 cache[job]._set(i, obj)
638 except KeyError:
639 pass
640
641 if hasattr(outqueue, '_reader'):
642 debug('ensuring that outqueue is not full')
643 # If we don't make room available in outqueue then
644 # attempts to add the sentinel (None) to outqueue may
645 # block. There is guaranteed to be no more than 2 sentinels.
646 try:
647 for i in range(10):
648 if not outqueue._reader.poll():
649 break
650 get()
651 except (IOError, EOFError):
652 pass
653
654 debug('result handler exiting: len(cache)=%s, thread._state=%s',
655 len(cache), thread._state)
656
657 @staticmethod
658 def _get_tasks(func, it, size):
659 it = iter(it)
660 while 1:
661 x = tuple(itertools.islice(it, size))
662 if not x:
663 return
664 yield (func, x)
665
666 def __reduce__(self):
667 raise NotImplementedError(
668 'pool objects cannot be passed between processes or pickled'
669 )
670
671 def close(self):
672 debug('closing pool')
673 if self._state == RUN:
674 self._state = CLOSE
675 self._worker_handler._state = CLOSE
676
677 def terminate(self):
678 debug('terminating pool')
679 self._state = TERMINATE
680 self._worker_handler._state = TERMINATE
681 self._terminate()
682
683 def join(self):
684 debug('joining pool')
685 assert self._state in (CLOSE, TERMINATE)
686 self._worker_handler.join()
687 self._task_handler.join()
688 self._result_handler.join()
689 for p in self._pool:
690 p.join()
691
692 @staticmethod
693 def _help_stuff_finish(inqueue, task_handler, size):
694 # task_handler may be blocked trying to put items on inqueue
695 debug('removing tasks from inqueue until task handler finished')
696 inqueue._rlock.acquire()
697 while task_handler.is_alive() and inqueue._reader.poll():
698 inqueue._reader.recv()
699 time.sleep(0)
700
701 @classmethod
702 def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
703 worker_handler, task_handler, result_handler, cache):
704 # this is guaranteed to only be called once
705 debug('finalizing pool')
706
707 worker_handler._state = TERMINATE
708 task_handler._state = TERMINATE
709
710 debug('helping task handler/workers to finish')
711 cls._help_stuff_finish(inqueue, task_handler, len(pool))
712
713 assert result_handler.is_alive() or len(cache) == 0
714
715 result_handler._state = TERMINATE
716 outqueue.put(None) # sentinel
717
718 # We must wait for the worker handler to exit before terminating
719 # workers because we don't want workers to be restarted behind our back.
720 debug('joining worker handler')
721 if threading.current_thread() is not worker_handler:
722 worker_handler.join(1e100)
723
724 # Terminate workers which haven't already finished.
725 if pool and hasattr(pool[0], 'terminate'):
726 debug('terminating workers')
727 for p in pool:
728 if p.exitcode is None:
729 p.terminate()
730
731 debug('joining task handler')
732 if threading.current_thread() is not task_handler:
733 task_handler.join(1e100)
734
735 debug('joining result handler')
736 if threading.current_thread() is not result_handler:
737 result_handler.join(1e100)
738
739 if pool and hasattr(pool[0], 'terminate'):
740 debug('joining pool workers')
741 for p in pool:
742 if p.is_alive():
743 # worker has not yet exited
744 debug('cleaning up worker %d' % p.pid)
745 p.join()
746
747class ApplyResult(object):
748
749 def __init__(self, cache, callback):
750 self._cond = threading.Condition(threading.Lock())
751 self._job = multiprocessing.pool.job_counter.next()
752 self._cache = cache
753 self._ready = False
754 self._callback = callback
755 cache[self._job] = self
756
757 def ready(self):
758 return self._ready
759
760 def successful(self):
761 assert self._ready
762 return self._success
763
764 def wait(self, timeout=None):
765 self._cond.acquire()
766 try:
767 if not self._ready:
768 self._cond.wait(timeout)
769 finally:
770 self._cond.release()
771
772 def get(self, timeout=None):
773 self.wait(timeout)
774 if not self._ready:
775 raise TimeoutError
776 if self._success:
777 return self._value
778 else:
779 raise self._value
780
781 def _set(self, i, obj):
782 self._success, self._value = obj
783 if self._callback and self._success:
784 self._callback(self._value)
785 self._cond.acquire()
786 try:
787 self._ready = True
788 self._cond.notify()
789 finally:
790 self._cond.release()
791 del self._cache[self._job]
792
793#
794# Class whose instances are returned by `Pool.map_async()`
795#
796
797class MapResult(ApplyResult):
798
799 def __init__(self, cache, chunksize, length, callback):
800 ApplyResult.__init__(self, cache, callback)
801 self._success = True
802 self._value = [None] * length
803 self._chunksize = chunksize
804 if chunksize <= 0:
805 self._number_left = 0
806 self._ready = True
807 del cache[self._job]
808 else:
809 self._number_left = length//chunksize + bool(length % chunksize)
810
811 def _set(self, i, success_result):
812 success, result = success_result
813 if success:
814 self._value[i*self._chunksize:(i+1)*self._chunksize] = result
815 self._number_left -= 1
816 if self._number_left == 0:
817 if self._callback:
818 self._callback(self._value)
819 del self._cache[self._job]
820 self._cond.acquire()
821 try:
822 self._ready = True
823 self._cond.notify()
824 finally:
825 self._cond.release()
826
827 else:
828 self._success = False
829 self._value = result
830 del self._cache[self._job]
831 self._cond.acquire()
832 try:
833 self._ready = True
834 self._cond.notify()
835 finally:
836 self._cond.release()
837
838#
839# Class whose instances are returned by `Pool.imap()`
840#
841
842class IMapIterator(object):
843
844 def __init__(self, cache):
845 self._cond = threading.Condition(threading.Lock())
846 self._job = multiprocessing.pool.job_counter.next()
847 self._cache = cache
848 self._items = collections.deque()
849 self._index = 0
850 self._length = None
851 self._unsorted = {}
852 cache[self._job] = self
853
854 def __iter__(self):
855 return self
856
857 def next(self, timeout=None):
858 self._cond.acquire()
859 try:
860 try:
861 item = self._items.popleft()
862 except IndexError:
863 if self._index == self._length:
864 raise StopIteration
865 self._cond.wait(timeout)
866 try:
867 item = self._items.popleft()
868 except IndexError:
869 if self._index == self._length:
870 raise StopIteration
871 raise TimeoutError
872 finally:
873 self._cond.release()
874
875 success, value = item
876 if success:
877 return value
878 raise value
879
880 __next__ = next # XXX
881
882 def _set(self, i, obj):
883 self._cond.acquire()
884 try:
885 if self._index == i:
886 self._items.append(obj)
887 self._index += 1
888 while self._index in self._unsorted:
889 obj = self._unsorted.pop(self._index)
890 self._items.append(obj)
891 self._index += 1
892 self._cond.notify()
893 else:
894 self._unsorted[i] = obj
895
896 if self._index == self._length:
897 del self._cache[self._job]
898 finally:
899 self._cond.release()
900
901 def _set_length(self, length):
902 self._cond.acquire()
903 try:
904 self._length = length
905 if self._index == self._length:
906 self._cond.notify()
907 del self._cache[self._job]
908 finally:
909 self._cond.release()
910
911#
912# Class whose instances are returned by `Pool.imap_unordered()`
913#
914
915class IMapUnorderedIterator(IMapIterator):
916
917 def _set(self, i, obj):
918 self._cond.acquire()
919 try:
920 self._items.append(obj)
921 self._index += 1
922 self._cond.notify()
923 if self._index == self._length:
924 del self._cache[self._job]
925 finally:
926 self._cond.release()
927 5
928 6
diff --git a/bitbake/lib/bb/server/process.py b/bitbake/lib/bb/server/process.py
index d73fe827e4..0d4a26ced0 100644
--- a/bitbake/lib/bb/server/process.py
+++ b/bitbake/lib/bb/server/process.py
@@ -142,52 +142,6 @@ class ProcessServer(Process, BaseImplServer):
142 def stop(self): 142 def stop(self):
143 self.keep_running.clear() 143 self.keep_running.clear()
144 144
145 def bootstrap_2_6_6(self):
146 """Pulled from python 2.6.6. Needed to ensure we have the fix from
147 http://bugs.python.org/issue5313 when running on python version 2.6.2
148 or lower."""
149
150 try:
151 self._children = set()
152 self._counter = itertools.count(1)
153 try:
154 sys.stdin.close()
155 sys.stdin = open(os.devnull)
156 except (OSError, ValueError):
157 pass
158 multiprocessing._current_process = self
159 util._finalizer_registry.clear()
160 util._run_after_forkers()
161 util.info('child process calling self.run()')
162 try:
163 self.run()
164 exitcode = 0
165 finally:
166 util._exit_function()
167 except SystemExit as e:
168 if not e.args:
169 exitcode = 1
170 elif type(e.args[0]) is int:
171 exitcode = e.args[0]
172 else:
173 sys.stderr.write(e.args[0] + '\n')
174 sys.stderr.flush()
175 exitcode = 1
176 except:
177 exitcode = 1
178 import traceback
179 sys.stderr.write('Process %s:\n' % self.name)
180 sys.stderr.flush()
181 traceback.print_exc()
182
183 util.info('process exiting with exitcode %d' % exitcode)
184 return exitcode
185
186 # Python versions 2.6.0 through 2.6.2 suffer from a multiprocessing bug
187 # which can result in a bitbake server hang during the parsing process
188 if (2, 6, 0) <= sys.version_info < (2, 6, 3):
189 _bootstrap = bootstrap_2_6_6
190
191class BitBakeProcessServerConnection(BitBakeBaseServerConnection): 145class BitBakeProcessServerConnection(BitBakeBaseServerConnection):
192 def __init__(self, serverImpl, ui_channel, event_queue): 146 def __init__(self, serverImpl, ui_channel, event_queue):
193 self.procserver = serverImpl 147 self.procserver = serverImpl
diff --git a/bitbake/lib/bb/server/xmlrpc.py b/bitbake/lib/bb/server/xmlrpc.py
index 359d5adb67..5045e55ae2 100644
--- a/bitbake/lib/bb/server/xmlrpc.py
+++ b/bitbake/lib/bb/server/xmlrpc.py
@@ -51,100 +51,18 @@ import inspect, select
51 51
52from . import BitBakeBaseServer, BitBakeBaseServerConnection, BaseImplServer 52from . import BitBakeBaseServer, BitBakeBaseServerConnection, BaseImplServer
53 53
54if sys.hexversion < 0x020600F0: 54class BBTransport(xmlrpclib.Transport):
55 print("Sorry, python 2.6 or later is required for bitbake's XMLRPC mode") 55 def __init__(self):
56 sys.exit(1) 56 self.connection_token = None
57 57 xmlrpclib.Transport.__init__(self)
58##
59# The xmlrpclib.Transport class has undergone various changes in Python 2.7
60# which break BitBake's XMLRPC implementation.
61# To work around this we subclass Transport and have a copy/paste of method
62# implementations from Python 2.6.6's xmlrpclib.
63#
64# Upstream Python bug is #8194 (http://bugs.python.org/issue8194)
65# This bug is relevant for Python 2.7.0 and 2.7.1 but was fixed for
66# Python > 2.7.2
67#
68# To implement a simple form of client control, we use a special transport
69# that adds a HTTP header field ("Bitbake-token") to ensure that a server
70# can communicate with only a client at a given time (the client must use
71# the same token).
72##
73if (2, 7, 0) <= sys.version_info < (2, 7, 2):
74 class BBTransport(xmlrpclib.Transport):
75 def __init__(self):
76 self.connection_token = None
77 xmlrpclib.Transport.__init__(self)
78
79 def request(self, host, handler, request_body, verbose=0):
80 h = self.make_connection(host)
81 if verbose:
82 h.set_debuglevel(1)
83
84 self.send_request(h, handler, request_body)
85 self.send_host(h, host)
86 self.send_user_agent(h)
87 if self.connection_token:
88 h.putheader("Bitbake-token", self.connection_token)
89 self.send_content(h, request_body)
90
91 errcode, errmsg, headers = h.getreply()
92
93 if errcode != 200:
94 raise ProtocolError(
95 host + handler,
96 errcode, errmsg,
97 headers
98 )
99
100 self.verbose = verbose
101 58
102 try: 59 def set_connection_token(self, token):
103 sock = h._conn.sock 60 self.connection_token = token
104 except AttributeError: 61
105 sock = None 62 def send_content(self, h, body):
106 63 if self.connection_token:
107 return self._parse_response(h.getfile(), sock) 64 h.putheader("Bitbake-token", self.connection_token)
108 65 xmlrpclib.Transport.send_content(self, h, body)
109 def make_connection(self, host):
110 import httplib
111 host, extra_headers, x509 = self.get_host_info(host)
112 return httplib.HTTP(host)
113
114 def _parse_response(self, file, sock):
115 p, u = self.getparser()
116
117 while 1:
118 if sock:
119 response = sock.recv(1024)
120 else:
121 response = file.read(1024)
122 if not response:
123 break
124 if self.verbose:
125 print("body:", repr(response))
126 p.feed(response)
127
128 file.close()
129 p.close()
130
131 return u.close()
132
133 def set_connection_token(self, token):
134 self.connection_token = token
135else:
136 class BBTransport(xmlrpclib.Transport):
137 def __init__(self):
138 self.connection_token = None
139 xmlrpclib.Transport.__init__(self)
140
141 def set_connection_token(self, token):
142 self.connection_token = token
143
144 def send_content(self, h, body):
145 if self.connection_token:
146 h.putheader("Bitbake-token", self.connection_token)
147 xmlrpclib.Transport.send_content(self, h, body)
148 66
149def _create_server(host, port): 67def _create_server(host, port):
150 t = BBTransport() 68 t = BBTransport()
diff --git a/bitbake/lib/bb/utils.py b/bitbake/lib/bb/utils.py
index 7db6e3862f..5301ebaa97 100644
--- a/bitbake/lib/bb/utils.py
+++ b/bitbake/lib/bb/utils.py
@@ -860,11 +860,8 @@ def process_profilelog(fn):
860 pout.close() 860 pout.close()
861 861
862# 862#
863# Work around multiprocessing pool bugs in python < 2.7.3 863# Was present to work around multiprocessing pool bugs in python < 2.7.3
864# 864#
865def multiprocessingpool(*args, **kwargs): 865def multiprocessingpool(*args, **kwargs):
866 if sys.version_info < (2, 7, 3): 866 return multiprocessing.pool.Pool(*args, **kwargs)
867 return bb.compat.Pool(*args, **kwargs)
868 else:
869 return multiprocessing.pool.Pool(*args, **kwargs)
870 867