summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/concurrent/futures
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/concurrent/futures')
-rw-r--r--bitbake/lib/concurrent/futures/__init__.py18
-rw-r--r--bitbake/lib/concurrent/futures/_base.py575
-rw-r--r--bitbake/lib/concurrent/futures/_compat.py101
-rw-r--r--bitbake/lib/concurrent/futures/process.py345
-rw-r--r--bitbake/lib/concurrent/futures/thread.py144
5 files changed, 1183 insertions, 0 deletions
diff --git a/bitbake/lib/concurrent/futures/__init__.py b/bitbake/lib/concurrent/futures/__init__.py
new file mode 100644
index 0000000000..b5231f8aab
--- /dev/null
+++ b/bitbake/lib/concurrent/futures/__init__.py
@@ -0,0 +1,18 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Execute computations asynchronously using threads or processes."""
5
6__author__ = 'Brian Quinlan (brian@sweetapp.com)'
7
8from concurrent.futures._base import (FIRST_COMPLETED,
9 FIRST_EXCEPTION,
10 ALL_COMPLETED,
11 CancelledError,
12 TimeoutError,
13 Future,
14 Executor,
15 wait,
16 as_completed)
17from concurrent.futures.process import ProcessPoolExecutor
18from concurrent.futures.thread import ThreadPoolExecutor
diff --git a/bitbake/lib/concurrent/futures/_base.py b/bitbake/lib/concurrent/futures/_base.py
new file mode 100644
index 0000000000..1d90211bd0
--- /dev/null
+++ b/bitbake/lib/concurrent/futures/_base.py
@@ -0,0 +1,575 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4from __future__ import with_statement
5import functools
6import logging
7import threading
8import time
9
10try:
11 from collections import namedtuple
12except ImportError:
13 from concurrent.futures._compat import namedtuple
14
15__author__ = 'Brian Quinlan (brian@sweetapp.com)'
16
17FIRST_COMPLETED = 'FIRST_COMPLETED'
18FIRST_EXCEPTION = 'FIRST_EXCEPTION'
19ALL_COMPLETED = 'ALL_COMPLETED'
20_AS_COMPLETED = '_AS_COMPLETED'
21
22# Possible future states (for internal use by the futures package).
23PENDING = 'PENDING'
24RUNNING = 'RUNNING'
25# The future was cancelled by the user...
26CANCELLED = 'CANCELLED'
27# ...and _Waiter.add_cancelled() was called by a worker.
28CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
29FINISHED = 'FINISHED'
30
31_FUTURE_STATES = [
32 PENDING,
33 RUNNING,
34 CANCELLED,
35 CANCELLED_AND_NOTIFIED,
36 FINISHED
37]
38
39_STATE_TO_DESCRIPTION_MAP = {
40 PENDING: "pending",
41 RUNNING: "running",
42 CANCELLED: "cancelled",
43 CANCELLED_AND_NOTIFIED: "cancelled",
44 FINISHED: "finished"
45}
46
47# Logger for internal use by the futures package.
48LOGGER = logging.getLogger("concurrent.futures")
49STDERR_HANDLER = logging.StreamHandler()
50LOGGER.addHandler(STDERR_HANDLER)
51
52class Error(Exception):
53 """Base class for all future-related exceptions."""
54 pass
55
56class CancelledError(Error):
57 """The Future was cancelled."""
58 pass
59
60class TimeoutError(Error):
61 """The operation exceeded the given deadline."""
62 pass
63
64class _Waiter(object):
65 """Provides the event that wait() and as_completed() block on."""
66 def __init__(self):
67 self.event = threading.Event()
68 self.finished_futures = []
69
70 def add_result(self, future):
71 self.finished_futures.append(future)
72
73 def add_exception(self, future):
74 self.finished_futures.append(future)
75
76 def add_cancelled(self, future):
77 self.finished_futures.append(future)
78
79class _AsCompletedWaiter(_Waiter):
80 """Used by as_completed()."""
81
82 def __init__(self):
83 super(_AsCompletedWaiter, self).__init__()
84 self.lock = threading.Lock()
85
86 def add_result(self, future):
87 with self.lock:
88 super(_AsCompletedWaiter, self).add_result(future)
89 self.event.set()
90
91 def add_exception(self, future):
92 with self.lock:
93 super(_AsCompletedWaiter, self).add_exception(future)
94 self.event.set()
95
96 def add_cancelled(self, future):
97 with self.lock:
98 super(_AsCompletedWaiter, self).add_cancelled(future)
99 self.event.set()
100
101class _FirstCompletedWaiter(_Waiter):
102 """Used by wait(return_when=FIRST_COMPLETED)."""
103
104 def add_result(self, future):
105 super(_FirstCompletedWaiter, self).add_result(future)
106 self.event.set()
107
108 def add_exception(self, future):
109 super(_FirstCompletedWaiter, self).add_exception(future)
110 self.event.set()
111
112 def add_cancelled(self, future):
113 super(_FirstCompletedWaiter, self).add_cancelled(future)
114 self.event.set()
115
116class _AllCompletedWaiter(_Waiter):
117 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
118
119 def __init__(self, num_pending_calls, stop_on_exception):
120 self.num_pending_calls = num_pending_calls
121 self.stop_on_exception = stop_on_exception
122 super(_AllCompletedWaiter, self).__init__()
123
124 def _decrement_pending_calls(self):
125 self.num_pending_calls -= 1
126 if not self.num_pending_calls:
127 self.event.set()
128
129 def add_result(self, future):
130 super(_AllCompletedWaiter, self).add_result(future)
131 self._decrement_pending_calls()
132
133 def add_exception(self, future):
134 super(_AllCompletedWaiter, self).add_exception(future)
135 if self.stop_on_exception:
136 self.event.set()
137 else:
138 self._decrement_pending_calls()
139
140 def add_cancelled(self, future):
141 super(_AllCompletedWaiter, self).add_cancelled(future)
142 self._decrement_pending_calls()
143
144class _AcquireFutures(object):
145 """A context manager that does an ordered acquire of Future conditions."""
146
147 def __init__(self, futures):
148 self.futures = sorted(futures, key=id)
149
150 def __enter__(self):
151 for future in self.futures:
152 future._condition.acquire()
153
154 def __exit__(self, *args):
155 for future in self.futures:
156 future._condition.release()
157
158def _create_and_install_waiters(fs, return_when):
159 if return_when == _AS_COMPLETED:
160 waiter = _AsCompletedWaiter()
161 elif return_when == FIRST_COMPLETED:
162 waiter = _FirstCompletedWaiter()
163 else:
164 pending_count = sum(
165 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
166
167 if return_when == FIRST_EXCEPTION:
168 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
169 elif return_when == ALL_COMPLETED:
170 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
171 else:
172 raise ValueError("Invalid return condition: %r" % return_when)
173
174 for f in fs:
175 f._waiters.append(waiter)
176
177 return waiter
178
179def as_completed(fs, timeout=None):
180 """An iterator over the given futures that yields each as it completes.
181
182 Args:
183 fs: The sequence of Futures (possibly created by different Executors) to
184 iterate over.
185 timeout: The maximum number of seconds to wait. If None, then there
186 is no limit on the wait time.
187
188 Returns:
189 An iterator that yields the given Futures as they complete (finished or
190 cancelled).
191
192 Raises:
193 TimeoutError: If the entire result iterator could not be generated
194 before the given timeout.
195 """
196 if timeout is not None:
197 end_time = timeout + time.time()
198
199 with _AcquireFutures(fs):
200 finished = set(
201 f for f in fs
202 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
203 pending = set(fs) - finished
204 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
205
206 try:
207 for future in finished:
208 yield future
209
210 while pending:
211 if timeout is None:
212 wait_timeout = None
213 else:
214 wait_timeout = end_time - time.time()
215 if wait_timeout < 0:
216 raise TimeoutError(
217 '%d (of %d) futures unfinished' % (
218 len(pending), len(fs)))
219
220 waiter.event.wait(wait_timeout)
221
222 with waiter.lock:
223 finished = waiter.finished_futures
224 waiter.finished_futures = []
225 waiter.event.clear()
226
227 for future in finished:
228 yield future
229 pending.remove(future)
230
231 finally:
232 for f in fs:
233 f._waiters.remove(waiter)
234
235DoneAndNotDoneFutures = namedtuple(
236 'DoneAndNotDoneFutures', 'done not_done')
237def wait(fs, timeout=None, return_when=ALL_COMPLETED):
238 """Wait for the futures in the given sequence to complete.
239
240 Args:
241 fs: The sequence of Futures (possibly created by different Executors) to
242 wait upon.
243 timeout: The maximum number of seconds to wait. If None, then there
244 is no limit on the wait time.
245 return_when: Indicates when this function should return. The options
246 are:
247
248 FIRST_COMPLETED - Return when any future finishes or is
249 cancelled.
250 FIRST_EXCEPTION - Return when any future finishes by raising an
251 exception. If no future raises an exception
252 then it is equivalent to ALL_COMPLETED.
253 ALL_COMPLETED - Return when all futures finish or are cancelled.
254
255 Returns:
256 A named 2-tuple of sets. The first set, named 'done', contains the
257 futures that completed (is finished or cancelled) before the wait
258 completed. The second set, named 'not_done', contains uncompleted
259 futures.
260 """
261 with _AcquireFutures(fs):
262 done = set(f for f in fs
263 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
264 not_done = set(fs) - done
265
266 if (return_when == FIRST_COMPLETED) and done:
267 return DoneAndNotDoneFutures(done, not_done)
268 elif (return_when == FIRST_EXCEPTION) and done:
269 if any(f for f in done
270 if not f.cancelled() and f.exception() is not None):
271 return DoneAndNotDoneFutures(done, not_done)
272
273 if len(done) == len(fs):
274 return DoneAndNotDoneFutures(done, not_done)
275
276 waiter = _create_and_install_waiters(fs, return_when)
277
278 waiter.event.wait(timeout)
279 for f in fs:
280 f._waiters.remove(waiter)
281
282 done.update(waiter.finished_futures)
283 return DoneAndNotDoneFutures(done, set(fs) - done)
284
285class Future(object):
286 """Represents the result of an asynchronous computation."""
287
288 def __init__(self):
289 """Initializes the future. Should not be called by clients."""
290 self._condition = threading.Condition()
291 self._state = PENDING
292 self._result = None
293 self._exception = None
294 self._waiters = []
295 self._done_callbacks = []
296
297 def _invoke_callbacks(self):
298 for callback in self._done_callbacks:
299 try:
300 callback(self)
301 except Exception:
302 LOGGER.exception('exception calling callback for %r', self)
303
304 def __repr__(self):
305 with self._condition:
306 if self._state == FINISHED:
307 if self._exception:
308 return '<Future at %s state=%s raised %s>' % (
309 hex(id(self)),
310 _STATE_TO_DESCRIPTION_MAP[self._state],
311 self._exception.__class__.__name__)
312 else:
313 return '<Future at %s state=%s returned %s>' % (
314 hex(id(self)),
315 _STATE_TO_DESCRIPTION_MAP[self._state],
316 self._result.__class__.__name__)
317 return '<Future at %s state=%s>' % (
318 hex(id(self)),
319 _STATE_TO_DESCRIPTION_MAP[self._state])
320
321 def cancel(self):
322 """Cancel the future if possible.
323
324 Returns True if the future was cancelled, False otherwise. A future
325 cannot be cancelled if it is running or has already completed.
326 """
327 with self._condition:
328 if self._state in [RUNNING, FINISHED]:
329 return False
330
331 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
332 return True
333
334 self._state = CANCELLED
335 self._condition.notify_all()
336
337 self._invoke_callbacks()
338 return True
339
340 def cancelled(self):
341 """Return True if the future has cancelled."""
342 with self._condition:
343 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
344
345 def running(self):
346 """Return True if the future is currently executing."""
347 with self._condition:
348 return self._state == RUNNING
349
350 def done(self):
351 """Return True of the future was cancelled or finished executing."""
352 with self._condition:
353 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
354
355 def __get_result(self):
356 if self._exception:
357 raise self._exception
358 else:
359 return self._result
360
361 def add_done_callback(self, fn):
362 """Attaches a callable that will be called when the future finishes.
363
364 Args:
365 fn: A callable that will be called with this future as its only
366 argument when the future completes or is cancelled. The callable
367 will always be called by a thread in the same process in which
368 it was added. If the future has already completed or been
369 cancelled then the callable will be called immediately. These
370 callables are called in the order that they were added.
371 """
372 with self._condition:
373 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
374 self._done_callbacks.append(fn)
375 return
376 fn(self)
377
378 def result(self, timeout=None):
379 """Return the result of the call that the future represents.
380
381 Args:
382 timeout: The number of seconds to wait for the result if the future
383 isn't done. If None, then there is no limit on the wait time.
384
385 Returns:
386 The result of the call that the future represents.
387
388 Raises:
389 CancelledError: If the future was cancelled.
390 TimeoutError: If the future didn't finish executing before the given
391 timeout.
392 Exception: If the call raised then that exception will be raised.
393 """
394 with self._condition:
395 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
396 raise CancelledError()
397 elif self._state == FINISHED:
398 return self.__get_result()
399
400 self._condition.wait(timeout)
401
402 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
403 raise CancelledError()
404 elif self._state == FINISHED:
405 return self.__get_result()
406 else:
407 raise TimeoutError()
408
409 def exception(self, timeout=None):
410 """Return the exception raised by the call that the future represents.
411
412 Args:
413 timeout: The number of seconds to wait for the exception if the
414 future isn't done. If None, then there is no limit on the wait
415 time.
416
417 Returns:
418 The exception raised by the call that the future represents or None
419 if the call completed without raising.
420
421 Raises:
422 CancelledError: If the future was cancelled.
423 TimeoutError: If the future didn't finish executing before the given
424 timeout.
425 """
426
427 with self._condition:
428 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
429 raise CancelledError()
430 elif self._state == FINISHED:
431 return self._exception
432
433 self._condition.wait(timeout)
434
435 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
436 raise CancelledError()
437 elif self._state == FINISHED:
438 return self._exception
439 else:
440 raise TimeoutError()
441
442 # The following methods should only be used by Executors and in tests.
443 def set_running_or_notify_cancel(self):
444 """Mark the future as running or process any cancel notifications.
445
446 Should only be used by Executor implementations and unit tests.
447
448 If the future has been cancelled (cancel() was called and returned
449 True) then any threads waiting on the future completing (though calls
450 to as_completed() or wait()) are notified and False is returned.
451
452 If the future was not cancelled then it is put in the running state
453 (future calls to running() will return True) and True is returned.
454
455 This method should be called by Executor implementations before
456 executing the work associated with this future. If this method returns
457 False then the work should not be executed.
458
459 Returns:
460 False if the Future was cancelled, True otherwise.
461
462 Raises:
463 RuntimeError: if this method was already called or if set_result()
464 or set_exception() was called.
465 """
466 with self._condition:
467 if self._state == CANCELLED:
468 self._state = CANCELLED_AND_NOTIFIED
469 for waiter in self._waiters:
470 waiter.add_cancelled(self)
471 # self._condition.notify_all() is not necessary because
472 # self.cancel() triggers a notification.
473 return False
474 elif self._state == PENDING:
475 self._state = RUNNING
476 return True
477 else:
478 LOGGER.critical('Future %s in unexpected state: %s',
479 id(self.future),
480 self.future._state)
481 raise RuntimeError('Future in unexpected state')
482
483 def set_result(self, result):
484 """Sets the return value of work associated with the future.
485
486 Should only be used by Executor implementations and unit tests.
487 """
488 with self._condition:
489 self._result = result
490 self._state = FINISHED
491 for waiter in self._waiters:
492 waiter.add_result(self)
493 self._condition.notify_all()
494 self._invoke_callbacks()
495
496 def set_exception(self, exception):
497 """Sets the result of the future as being the given exception.
498
499 Should only be used by Executor implementations and unit tests.
500 """
501 with self._condition:
502 self._exception = exception
503 self._state = FINISHED
504 for waiter in self._waiters:
505 waiter.add_exception(self)
506 self._condition.notify_all()
507 self._invoke_callbacks()
508
509class Executor(object):
510 """This is an abstract base class for concrete asynchronous executors."""
511
512 def submit(self, fn, *args, **kwargs):
513 """Submits a callable to be executed with the given arguments.
514
515 Schedules the callable to be executed as fn(*args, **kwargs) and returns
516 a Future instance representing the execution of the callable.
517
518 Returns:
519 A Future representing the given call.
520 """
521 raise NotImplementedError()
522
523 def map(self, fn, *iterables, **kwargs):
524 """Returns a iterator equivalent to map(fn, iter).
525
526 Args:
527 fn: A callable that will take take as many arguments as there are
528 passed iterables.
529 timeout: The maximum number of seconds to wait. If None, then there
530 is no limit on the wait time.
531
532 Returns:
533 An iterator equivalent to: map(func, *iterables) but the calls may
534 be evaluated out-of-order.
535
536 Raises:
537 TimeoutError: If the entire result iterator could not be generated
538 before the given timeout.
539 Exception: If fn(*args) raises for any values.
540 """
541 timeout = kwargs.get('timeout')
542 if timeout is not None:
543 end_time = timeout + time.time()
544
545 fs = [self.submit(fn, *args) for args in zip(*iterables)]
546
547 try:
548 for future in fs:
549 if timeout is None:
550 yield future.result()
551 else:
552 yield future.result(end_time - time.time())
553 finally:
554 for future in fs:
555 future.cancel()
556
557 def shutdown(self, wait=True):
558 """Clean-up the resources associated with the Executor.
559
560 It is safe to call this method several times. Otherwise, no other
561 methods can be called after this one.
562
563 Args:
564 wait: If True then shutdown will not return until all running
565 futures have finished executing and the resources used by the
566 executor have been reclaimed.
567 """
568 pass
569
570 def __enter__(self):
571 return self
572
573 def __exit__(self, exc_type, exc_val, exc_tb):
574 self.shutdown(wait=True)
575 return False
diff --git a/bitbake/lib/concurrent/futures/_compat.py b/bitbake/lib/concurrent/futures/_compat.py
new file mode 100644
index 0000000000..11462326b5
--- /dev/null
+++ b/bitbake/lib/concurrent/futures/_compat.py
@@ -0,0 +1,101 @@
1from keyword import iskeyword as _iskeyword
2from operator import itemgetter as _itemgetter
3import sys as _sys
4
5
6def namedtuple(typename, field_names):
7 """Returns a new subclass of tuple with named fields.
8
9 >>> Point = namedtuple('Point', 'x y')
10 >>> Point.__doc__ # docstring for the new class
11 'Point(x, y)'
12 >>> p = Point(11, y=22) # instantiate with positional args or keywords
13 >>> p[0] + p[1] # indexable like a plain tuple
14 33
15 >>> x, y = p # unpack like a regular tuple
16 >>> x, y
17 (11, 22)
18 >>> p.x + p.y # fields also accessable by name
19 33
20 >>> d = p._asdict() # convert to a dictionary
21 >>> d['x']
22 11
23 >>> Point(**d) # convert from a dictionary
24 Point(x=11, y=22)
25 >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
26 Point(x=100, y=22)
27
28 """
29
30 # Parse and validate the field names. Validation serves two purposes,
31 # generating informative error messages and preventing template injection attacks.
32 if isinstance(field_names, basestring):
33 field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas
34 field_names = tuple(map(str, field_names))
35 for name in (typename,) + field_names:
36 if not all(c.isalnum() or c=='_' for c in name):
37 raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name)
38 if _iskeyword(name):
39 raise ValueError('Type names and field names cannot be a keyword: %r' % name)
40 if name[0].isdigit():
41 raise ValueError('Type names and field names cannot start with a number: %r' % name)
42 seen_names = set()
43 for name in field_names:
44 if name.startswith('_'):
45 raise ValueError('Field names cannot start with an underscore: %r' % name)
46 if name in seen_names:
47 raise ValueError('Encountered duplicate field name: %r' % name)
48 seen_names.add(name)
49
50 # Create and fill-in the class template
51 numfields = len(field_names)
52 argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes
53 reprtxt = ', '.join('%s=%%r' % name for name in field_names)
54 dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names))
55 template = '''class %(typename)s(tuple):
56 '%(typename)s(%(argtxt)s)' \n
57 __slots__ = () \n
58 _fields = %(field_names)r \n
59 def __new__(_cls, %(argtxt)s):
60 return _tuple.__new__(_cls, (%(argtxt)s)) \n
61 @classmethod
62 def _make(cls, iterable, new=tuple.__new__, len=len):
63 'Make a new %(typename)s object from a sequence or iterable'
64 result = new(cls, iterable)
65 if len(result) != %(numfields)d:
66 raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result))
67 return result \n
68 def __repr__(self):
69 return '%(typename)s(%(reprtxt)s)' %% self \n
70 def _asdict(t):
71 'Return a new dict which maps field names to their values'
72 return {%(dicttxt)s} \n
73 def _replace(_self, **kwds):
74 'Return a new %(typename)s object replacing specified fields with new values'
75 result = _self._make(map(kwds.pop, %(field_names)r, _self))
76 if kwds:
77 raise ValueError('Got unexpected field names: %%r' %% kwds.keys())
78 return result \n
79 def __getnewargs__(self):
80 return tuple(self) \n\n''' % locals()
81 for i, name in enumerate(field_names):
82 template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
83
84 # Execute the template string in a temporary namespace and
85 # support tracing utilities by setting a value for frame.f_globals['__name__']
86 namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
87 _property=property, _tuple=tuple)
88 try:
89 exec(template, namespace)
90 except SyntaxError:
91 e = _sys.exc_info()[1]
92 raise SyntaxError(e.message + ':\n' + template)
93 result = namespace[typename]
94
95 # For pickling to work, the __module__ variable needs to be set to the frame
96 # where the named tuple is created. Bypass this step in enviroments where
97 # sys._getframe is not defined (Jython for example).
98 if hasattr(_sys, '_getframe'):
99 result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
100
101 return result
diff --git a/bitbake/lib/concurrent/futures/process.py b/bitbake/lib/concurrent/futures/process.py
new file mode 100644
index 0000000000..87dc789433
--- /dev/null
+++ b/bitbake/lib/concurrent/futures/process.py
@@ -0,0 +1,345 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46from __future__ import with_statement
47import atexit
48import multiprocessing
49import threading
50import weakref
51import sys
52
53from concurrent.futures import _base
54
55try:
56 import queue
57except ImportError:
58 import Queue as queue
59
60__author__ = 'Brian Quinlan (brian@sweetapp.com)'
61
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
66# - The workers would still be running during interpretor shutdown,
67# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
76_thread_references = set()
77_shutdown = False
78
79def _python_exit():
80 global _shutdown
81 _shutdown = True
82 for thread_reference in _thread_references:
83 thread = thread_reference()
84 if thread is not None:
85 thread.join()
86
87def _remove_dead_thread_references():
88 """Remove inactive threads from _thread_references.
89
90 Should be called periodically to prevent memory leaks in scenarios such as:
91 >>> while True:
92 >>> ... t = ThreadPoolExecutor(max_workers=5)
93 >>> ... t.map(int, ['1', '2', '3', '4', '5'])
94 """
95 for thread_reference in set(_thread_references):
96 if thread_reference() is None:
97 _thread_references.discard(thread_reference)
98
99# Controls how many more calls than processes will be queued in the call queue.
100# A smaller number will mean that processes spend more time idle waiting for
101# work while a larger number will make Future.cancel() succeed less frequently
102# (Futures in the call queue cannot be cancelled).
103EXTRA_QUEUED_CALLS = 1
104
105class _WorkItem(object):
106 def __init__(self, future, fn, args, kwargs):
107 self.future = future
108 self.fn = fn
109 self.args = args
110 self.kwargs = kwargs
111
112class _ResultItem(object):
113 def __init__(self, work_id, exception=None, result=None):
114 self.work_id = work_id
115 self.exception = exception
116 self.result = result
117
118class _CallItem(object):
119 def __init__(self, work_id, fn, args, kwargs):
120 self.work_id = work_id
121 self.fn = fn
122 self.args = args
123 self.kwargs = kwargs
124
125def _process_worker(call_queue, result_queue, shutdown):
126 """Evaluates calls from call_queue and places the results in result_queue.
127
128 This worker is run in a seperate process.
129
130 Args:
131 call_queue: A multiprocessing.Queue of _CallItems that will be read and
132 evaluated by the worker.
133 result_queue: A multiprocessing.Queue of _ResultItems that will written
134 to by the worker.
135 shutdown: A multiprocessing.Event that will be set as a signal to the
136 worker that it should exit when call_queue is empty.
137 """
138 while True:
139 try:
140 call_item = call_queue.get(block=True, timeout=0.1)
141 except queue.Empty:
142 if shutdown.is_set():
143 return
144 else:
145 try:
146 r = call_item.fn(*call_item.args, **call_item.kwargs)
147 except BaseException:
148 e = sys.exc_info()[1]
149 result_queue.put(_ResultItem(call_item.work_id,
150 exception=e))
151 else:
152 result_queue.put(_ResultItem(call_item.work_id,
153 result=r))
154
155def _add_call_item_to_queue(pending_work_items,
156 work_ids,
157 call_queue):
158 """Fills call_queue with _WorkItems from pending_work_items.
159
160 This function never blocks.
161
162 Args:
163 pending_work_items: A dict mapping work ids to _WorkItems e.g.
164 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
165 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
166 are consumed and the corresponding _WorkItems from
167 pending_work_items are transformed into _CallItems and put in
168 call_queue.
169 call_queue: A multiprocessing.Queue that will be filled with _CallItems
170 derived from _WorkItems.
171 """
172 while True:
173 if call_queue.full():
174 return
175 try:
176 work_id = work_ids.get(block=False)
177 except queue.Empty:
178 return
179 else:
180 work_item = pending_work_items[work_id]
181
182 if work_item.future.set_running_or_notify_cancel():
183 call_queue.put(_CallItem(work_id,
184 work_item.fn,
185 work_item.args,
186 work_item.kwargs),
187 block=True)
188 else:
189 del pending_work_items[work_id]
190 continue
191
192def _queue_manangement_worker(executor_reference,
193 processes,
194 pending_work_items,
195 work_ids_queue,
196 call_queue,
197 result_queue,
198 shutdown_process_event):
199 """Manages the communication between this process and the worker processes.
200
201 This function is run in a local thread.
202
203 Args:
204 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
205 this thread. Used to determine if the ProcessPoolExecutor has been
206 garbage collected and that this function can exit.
207 process: A list of the multiprocessing.Process instances used as
208 workers.
209 pending_work_items: A dict mapping work ids to _WorkItems e.g.
210 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
211 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
212 call_queue: A multiprocessing.Queue that will be filled with _CallItems
213 derived from _WorkItems for processing by the process workers.
214 result_queue: A multiprocessing.Queue of _ResultItems generated by the
215 process workers.
216 shutdown_process_event: A multiprocessing.Event used to signal the
217 process workers that they should exit when their work queue is
218 empty.
219 """
220 while True:
221 _add_call_item_to_queue(pending_work_items,
222 work_ids_queue,
223 call_queue)
224
225 try:
226 result_item = result_queue.get(block=True, timeout=0.1)
227 except queue.Empty:
228 executor = executor_reference()
229 # No more work items can be added if:
230 # - The interpreter is shutting down OR
231 # - The executor that owns this worker has been collected OR
232 # - The executor that owns this worker has been shutdown.
233 if _shutdown or executor is None or executor._shutdown_thread:
234 # Since no new work items can be added, it is safe to shutdown
235 # this thread if there are no pending work items.
236 if not pending_work_items:
237 shutdown_process_event.set()
238
239 # If .join() is not called on the created processes then
240 # some multiprocessing.Queue methods may deadlock on Mac OS
241 # X.
242 for p in processes:
243 p.join()
244 return
245 del executor
246 else:
247 work_item = pending_work_items[result_item.work_id]
248 del pending_work_items[result_item.work_id]
249
250 if result_item.exception:
251 work_item.future.set_exception(result_item.exception)
252 else:
253 work_item.future.set_result(result_item.result)
254
255class ProcessPoolExecutor(_base.Executor):
256 def __init__(self, max_workers=None):
257 """Initializes a new ProcessPoolExecutor instance.
258
259 Args:
260 max_workers: The maximum number of processes that can be used to
261 execute the given calls. If None or not given then as many
262 worker processes will be created as the machine has processors.
263 """
264 _remove_dead_thread_references()
265
266 if max_workers is None:
267 self._max_workers = multiprocessing.cpu_count()
268 else:
269 self._max_workers = max_workers
270
271 # Make the call queue slightly larger than the number of processes to
272 # prevent the worker processes from idling. But don't make it too big
273 # because futures in the call queue cannot be cancelled.
274 self._call_queue = multiprocessing.Queue(self._max_workers +
275 EXTRA_QUEUED_CALLS)
276 self._result_queue = multiprocessing.Queue()
277 self._work_ids = queue.Queue()
278 self._queue_management_thread = None
279 self._processes = set()
280
281 # Shutdown is a two-step process.
282 self._shutdown_thread = False
283 self._shutdown_process_event = multiprocessing.Event()
284 self._shutdown_lock = threading.Lock()
285 self._queue_count = 0
286 self._pending_work_items = {}
287
288 def _start_queue_management_thread(self):
289 if self._queue_management_thread is None:
290 self._queue_management_thread = threading.Thread(
291 target=_queue_manangement_worker,
292 args=(weakref.ref(self),
293 self._processes,
294 self._pending_work_items,
295 self._work_ids,
296 self._call_queue,
297 self._result_queue,
298 self._shutdown_process_event))
299 self._queue_management_thread.daemon = True
300 self._queue_management_thread.start()
301 _thread_references.add(weakref.ref(self._queue_management_thread))
302
303 def _adjust_process_count(self):
304 for _ in range(len(self._processes), self._max_workers):
305 p = multiprocessing.Process(
306 target=_process_worker,
307 args=(self._call_queue,
308 self._result_queue,
309 self._shutdown_process_event))
310 p.start()
311 self._processes.add(p)
312
313 def submit(self, fn, *args, **kwargs):
314 with self._shutdown_lock:
315 if self._shutdown_thread:
316 raise RuntimeError('cannot schedule new futures after shutdown')
317
318 f = _base.Future()
319 w = _WorkItem(f, fn, args, kwargs)
320
321 self._pending_work_items[self._queue_count] = w
322 self._work_ids.put(self._queue_count)
323 self._queue_count += 1
324
325 self._start_queue_management_thread()
326 self._adjust_process_count()
327 return f
328 submit.__doc__ = _base.Executor.submit.__doc__
329
330 def shutdown(self, wait=True):
331 with self._shutdown_lock:
332 self._shutdown_thread = True
333 if wait:
334 if self._queue_management_thread:
335 self._queue_management_thread.join()
336 # To reduce the risk of openning too many files, remove references to
337 # objects that use file descriptors.
338 self._queue_management_thread = None
339 self._call_queue = None
340 self._result_queue = None
341 self._shutdown_process_event = None
342 self._processes = None
343 shutdown.__doc__ = _base.Executor.shutdown.__doc__
344
345atexit.register(_python_exit)
diff --git a/bitbake/lib/concurrent/futures/thread.py b/bitbake/lib/concurrent/futures/thread.py
new file mode 100644
index 0000000000..ce0dda0c38
--- /dev/null
+++ b/bitbake/lib/concurrent/futures/thread.py
@@ -0,0 +1,144 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ThreadPoolExecutor."""
5
6from __future__ import with_statement
7import atexit
8import threading
9import weakref
10import sys
11
12from concurrent.futures import _base
13
14try:
15 import queue
16except ImportError:
17 import Queue as queue
18
19__author__ = 'Brian Quinlan (brian@sweetapp.com)'
20
21# Workers are created as daemon threads. This is done to allow the interpreter
22# to exit when there are still idle threads in a ThreadPoolExecutor's thread
23# pool (i.e. shutdown() was not called). However, allowing workers to die with
24# the interpreter has two undesirable properties:
25# - The workers would still be running during interpretor shutdown,
26# meaning that they would fail in unpredictable ways.
27# - The workers could be killed while evaluating a work item, which could
28# be bad if the callable being evaluated has external side-effects e.g.
29# writing to a file.
30#
31# To work around this problem, an exit handler is installed which tells the
32# workers to exit when their work queues are empty and then waits until the
33# threads finish.
34
35_thread_references = set()
36_shutdown = False
37
38def _python_exit():
39 global _shutdown
40 _shutdown = True
41 for thread_reference in _thread_references:
42 thread = thread_reference()
43 if thread is not None:
44 thread.join()
45
46def _remove_dead_thread_references():
47 """Remove inactive threads from _thread_references.
48
49 Should be called periodically to prevent memory leaks in scenarios such as:
50 >>> while True:
51 ... t = ThreadPoolExecutor(max_workers=5)
52 ... t.map(int, ['1', '2', '3', '4', '5'])
53 """
54 for thread_reference in set(_thread_references):
55 if thread_reference() is None:
56 _thread_references.discard(thread_reference)
57
58atexit.register(_python_exit)
59
60class _WorkItem(object):
61 def __init__(self, future, fn, args, kwargs):
62 self.future = future
63 self.fn = fn
64 self.args = args
65 self.kwargs = kwargs
66
67 def run(self):
68 if not self.future.set_running_or_notify_cancel():
69 return
70
71 try:
72 result = self.fn(*self.args, **self.kwargs)
73 except BaseException:
74 e = sys.exc_info()[1]
75 self.future.set_exception(e)
76 else:
77 self.future.set_result(result)
78
79def _worker(executor_reference, work_queue):
80 try:
81 while True:
82 try:
83 work_item = work_queue.get(block=True, timeout=0.1)
84 except queue.Empty:
85 executor = executor_reference()
86 # Exit if:
87 # - The interpreter is shutting down OR
88 # - The executor that owns the worker has been collected OR
89 # - The executor that owns the worker has been shutdown.
90 if _shutdown or executor is None or executor._shutdown:
91 return
92 del executor
93 else:
94 work_item.run()
95 except BaseException:
96 _base.LOGGER.critical('Exception in worker', exc_info=True)
97
98class ThreadPoolExecutor(_base.Executor):
99 def __init__(self, max_workers):
100 """Initializes a new ThreadPoolExecutor instance.
101
102 Args:
103 max_workers: The maximum number of threads that can be used to
104 execute the given calls.
105 """
106 _remove_dead_thread_references()
107
108 self._max_workers = max_workers
109 self._work_queue = queue.Queue()
110 self._threads = set()
111 self._shutdown = False
112 self._shutdown_lock = threading.Lock()
113
114 def submit(self, fn, *args, **kwargs):
115 with self._shutdown_lock:
116 if self._shutdown:
117 raise RuntimeError('cannot schedule new futures after shutdown')
118
119 f = _base.Future()
120 w = _WorkItem(f, fn, args, kwargs)
121
122 self._work_queue.put(w)
123 self._adjust_thread_count()
124 return f
125 submit.__doc__ = _base.Executor.submit.__doc__
126
127 def _adjust_thread_count(self):
128 # TODO(bquinlan): Should avoid creating new threads if there are more
129 # idle threads than items in the work queue.
130 if len(self._threads) < self._max_workers:
131 t = threading.Thread(target=_worker,
132 args=(weakref.ref(self), self._work_queue))
133 t.daemon = True
134 t.start()
135 self._threads.add(t)
136 _thread_references.add(weakref.ref(t))
137
138 def shutdown(self, wait=True):
139 with self._shutdown_lock:
140 self._shutdown = True
141 if wait:
142 for t in self._threads:
143 t.join()
144 shutdown.__doc__ = _base.Executor.shutdown.__doc__