summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/concurrent/futures/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/concurrent/futures/_base.py')
-rw-r--r--bitbake/lib/concurrent/futures/_base.py575
1 files changed, 575 insertions, 0 deletions
diff --git a/bitbake/lib/concurrent/futures/_base.py b/bitbake/lib/concurrent/futures/_base.py
new file mode 100644
index 0000000000..1d90211bd0
--- /dev/null
+++ b/bitbake/lib/concurrent/futures/_base.py
@@ -0,0 +1,575 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4from __future__ import with_statement
5import functools
6import logging
7import threading
8import time
9
10try:
11 from collections import namedtuple
12except ImportError:
13 from concurrent.futures._compat import namedtuple
14
15__author__ = 'Brian Quinlan (brian@sweetapp.com)'
16
17FIRST_COMPLETED = 'FIRST_COMPLETED'
18FIRST_EXCEPTION = 'FIRST_EXCEPTION'
19ALL_COMPLETED = 'ALL_COMPLETED'
20_AS_COMPLETED = '_AS_COMPLETED'
21
22# Possible future states (for internal use by the futures package).
23PENDING = 'PENDING'
24RUNNING = 'RUNNING'
25# The future was cancelled by the user...
26CANCELLED = 'CANCELLED'
27# ...and _Waiter.add_cancelled() was called by a worker.
28CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
29FINISHED = 'FINISHED'
30
31_FUTURE_STATES = [
32 PENDING,
33 RUNNING,
34 CANCELLED,
35 CANCELLED_AND_NOTIFIED,
36 FINISHED
37]
38
39_STATE_TO_DESCRIPTION_MAP = {
40 PENDING: "pending",
41 RUNNING: "running",
42 CANCELLED: "cancelled",
43 CANCELLED_AND_NOTIFIED: "cancelled",
44 FINISHED: "finished"
45}
46
47# Logger for internal use by the futures package.
48LOGGER = logging.getLogger("concurrent.futures")
49STDERR_HANDLER = logging.StreamHandler()
50LOGGER.addHandler(STDERR_HANDLER)
51
52class Error(Exception):
53 """Base class for all future-related exceptions."""
54 pass
55
56class CancelledError(Error):
57 """The Future was cancelled."""
58 pass
59
60class TimeoutError(Error):
61 """The operation exceeded the given deadline."""
62 pass
63
64class _Waiter(object):
65 """Provides the event that wait() and as_completed() block on."""
66 def __init__(self):
67 self.event = threading.Event()
68 self.finished_futures = []
69
70 def add_result(self, future):
71 self.finished_futures.append(future)
72
73 def add_exception(self, future):
74 self.finished_futures.append(future)
75
76 def add_cancelled(self, future):
77 self.finished_futures.append(future)
78
79class _AsCompletedWaiter(_Waiter):
80 """Used by as_completed()."""
81
82 def __init__(self):
83 super(_AsCompletedWaiter, self).__init__()
84 self.lock = threading.Lock()
85
86 def add_result(self, future):
87 with self.lock:
88 super(_AsCompletedWaiter, self).add_result(future)
89 self.event.set()
90
91 def add_exception(self, future):
92 with self.lock:
93 super(_AsCompletedWaiter, self).add_exception(future)
94 self.event.set()
95
96 def add_cancelled(self, future):
97 with self.lock:
98 super(_AsCompletedWaiter, self).add_cancelled(future)
99 self.event.set()
100
101class _FirstCompletedWaiter(_Waiter):
102 """Used by wait(return_when=FIRST_COMPLETED)."""
103
104 def add_result(self, future):
105 super(_FirstCompletedWaiter, self).add_result(future)
106 self.event.set()
107
108 def add_exception(self, future):
109 super(_FirstCompletedWaiter, self).add_exception(future)
110 self.event.set()
111
112 def add_cancelled(self, future):
113 super(_FirstCompletedWaiter, self).add_cancelled(future)
114 self.event.set()
115
116class _AllCompletedWaiter(_Waiter):
117 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
118
119 def __init__(self, num_pending_calls, stop_on_exception):
120 self.num_pending_calls = num_pending_calls
121 self.stop_on_exception = stop_on_exception
122 super(_AllCompletedWaiter, self).__init__()
123
124 def _decrement_pending_calls(self):
125 self.num_pending_calls -= 1
126 if not self.num_pending_calls:
127 self.event.set()
128
129 def add_result(self, future):
130 super(_AllCompletedWaiter, self).add_result(future)
131 self._decrement_pending_calls()
132
133 def add_exception(self, future):
134 super(_AllCompletedWaiter, self).add_exception(future)
135 if self.stop_on_exception:
136 self.event.set()
137 else:
138 self._decrement_pending_calls()
139
140 def add_cancelled(self, future):
141 super(_AllCompletedWaiter, self).add_cancelled(future)
142 self._decrement_pending_calls()
143
144class _AcquireFutures(object):
145 """A context manager that does an ordered acquire of Future conditions."""
146
147 def __init__(self, futures):
148 self.futures = sorted(futures, key=id)
149
150 def __enter__(self):
151 for future in self.futures:
152 future._condition.acquire()
153
154 def __exit__(self, *args):
155 for future in self.futures:
156 future._condition.release()
157
158def _create_and_install_waiters(fs, return_when):
159 if return_when == _AS_COMPLETED:
160 waiter = _AsCompletedWaiter()
161 elif return_when == FIRST_COMPLETED:
162 waiter = _FirstCompletedWaiter()
163 else:
164 pending_count = sum(
165 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
166
167 if return_when == FIRST_EXCEPTION:
168 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
169 elif return_when == ALL_COMPLETED:
170 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
171 else:
172 raise ValueError("Invalid return condition: %r" % return_when)
173
174 for f in fs:
175 f._waiters.append(waiter)
176
177 return waiter
178
179def as_completed(fs, timeout=None):
180 """An iterator over the given futures that yields each as it completes.
181
182 Args:
183 fs: The sequence of Futures (possibly created by different Executors) to
184 iterate over.
185 timeout: The maximum number of seconds to wait. If None, then there
186 is no limit on the wait time.
187
188 Returns:
189 An iterator that yields the given Futures as they complete (finished or
190 cancelled).
191
192 Raises:
193 TimeoutError: If the entire result iterator could not be generated
194 before the given timeout.
195 """
196 if timeout is not None:
197 end_time = timeout + time.time()
198
199 with _AcquireFutures(fs):
200 finished = set(
201 f for f in fs
202 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
203 pending = set(fs) - finished
204 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
205
206 try:
207 for future in finished:
208 yield future
209
210 while pending:
211 if timeout is None:
212 wait_timeout = None
213 else:
214 wait_timeout = end_time - time.time()
215 if wait_timeout < 0:
216 raise TimeoutError(
217 '%d (of %d) futures unfinished' % (
218 len(pending), len(fs)))
219
220 waiter.event.wait(wait_timeout)
221
222 with waiter.lock:
223 finished = waiter.finished_futures
224 waiter.finished_futures = []
225 waiter.event.clear()
226
227 for future in finished:
228 yield future
229 pending.remove(future)
230
231 finally:
232 for f in fs:
233 f._waiters.remove(waiter)
234
235DoneAndNotDoneFutures = namedtuple(
236 'DoneAndNotDoneFutures', 'done not_done')
237def wait(fs, timeout=None, return_when=ALL_COMPLETED):
238 """Wait for the futures in the given sequence to complete.
239
240 Args:
241 fs: The sequence of Futures (possibly created by different Executors) to
242 wait upon.
243 timeout: The maximum number of seconds to wait. If None, then there
244 is no limit on the wait time.
245 return_when: Indicates when this function should return. The options
246 are:
247
248 FIRST_COMPLETED - Return when any future finishes or is
249 cancelled.
250 FIRST_EXCEPTION - Return when any future finishes by raising an
251 exception. If no future raises an exception
252 then it is equivalent to ALL_COMPLETED.
253 ALL_COMPLETED - Return when all futures finish or are cancelled.
254
255 Returns:
256 A named 2-tuple of sets. The first set, named 'done', contains the
257 futures that completed (is finished or cancelled) before the wait
258 completed. The second set, named 'not_done', contains uncompleted
259 futures.
260 """
261 with _AcquireFutures(fs):
262 done = set(f for f in fs
263 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
264 not_done = set(fs) - done
265
266 if (return_when == FIRST_COMPLETED) and done:
267 return DoneAndNotDoneFutures(done, not_done)
268 elif (return_when == FIRST_EXCEPTION) and done:
269 if any(f for f in done
270 if not f.cancelled() and f.exception() is not None):
271 return DoneAndNotDoneFutures(done, not_done)
272
273 if len(done) == len(fs):
274 return DoneAndNotDoneFutures(done, not_done)
275
276 waiter = _create_and_install_waiters(fs, return_when)
277
278 waiter.event.wait(timeout)
279 for f in fs:
280 f._waiters.remove(waiter)
281
282 done.update(waiter.finished_futures)
283 return DoneAndNotDoneFutures(done, set(fs) - done)
284
285class Future(object):
286 """Represents the result of an asynchronous computation."""
287
288 def __init__(self):
289 """Initializes the future. Should not be called by clients."""
290 self._condition = threading.Condition()
291 self._state = PENDING
292 self._result = None
293 self._exception = None
294 self._waiters = []
295 self._done_callbacks = []
296
297 def _invoke_callbacks(self):
298 for callback in self._done_callbacks:
299 try:
300 callback(self)
301 except Exception:
302 LOGGER.exception('exception calling callback for %r', self)
303
304 def __repr__(self):
305 with self._condition:
306 if self._state == FINISHED:
307 if self._exception:
308 return '<Future at %s state=%s raised %s>' % (
309 hex(id(self)),
310 _STATE_TO_DESCRIPTION_MAP[self._state],
311 self._exception.__class__.__name__)
312 else:
313 return '<Future at %s state=%s returned %s>' % (
314 hex(id(self)),
315 _STATE_TO_DESCRIPTION_MAP[self._state],
316 self._result.__class__.__name__)
317 return '<Future at %s state=%s>' % (
318 hex(id(self)),
319 _STATE_TO_DESCRIPTION_MAP[self._state])
320
321 def cancel(self):
322 """Cancel the future if possible.
323
324 Returns True if the future was cancelled, False otherwise. A future
325 cannot be cancelled if it is running or has already completed.
326 """
327 with self._condition:
328 if self._state in [RUNNING, FINISHED]:
329 return False
330
331 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
332 return True
333
334 self._state = CANCELLED
335 self._condition.notify_all()
336
337 self._invoke_callbacks()
338 return True
339
340 def cancelled(self):
341 """Return True if the future has cancelled."""
342 with self._condition:
343 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
344
345 def running(self):
346 """Return True if the future is currently executing."""
347 with self._condition:
348 return self._state == RUNNING
349
350 def done(self):
351 """Return True of the future was cancelled or finished executing."""
352 with self._condition:
353 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
354
355 def __get_result(self):
356 if self._exception:
357 raise self._exception
358 else:
359 return self._result
360
361 def add_done_callback(self, fn):
362 """Attaches a callable that will be called when the future finishes.
363
364 Args:
365 fn: A callable that will be called with this future as its only
366 argument when the future completes or is cancelled. The callable
367 will always be called by a thread in the same process in which
368 it was added. If the future has already completed or been
369 cancelled then the callable will be called immediately. These
370 callables are called in the order that they were added.
371 """
372 with self._condition:
373 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
374 self._done_callbacks.append(fn)
375 return
376 fn(self)
377
378 def result(self, timeout=None):
379 """Return the result of the call that the future represents.
380
381 Args:
382 timeout: The number of seconds to wait for the result if the future
383 isn't done. If None, then there is no limit on the wait time.
384
385 Returns:
386 The result of the call that the future represents.
387
388 Raises:
389 CancelledError: If the future was cancelled.
390 TimeoutError: If the future didn't finish executing before the given
391 timeout.
392 Exception: If the call raised then that exception will be raised.
393 """
394 with self._condition:
395 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
396 raise CancelledError()
397 elif self._state == FINISHED:
398 return self.__get_result()
399
400 self._condition.wait(timeout)
401
402 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
403 raise CancelledError()
404 elif self._state == FINISHED:
405 return self.__get_result()
406 else:
407 raise TimeoutError()
408
409 def exception(self, timeout=None):
410 """Return the exception raised by the call that the future represents.
411
412 Args:
413 timeout: The number of seconds to wait for the exception if the
414 future isn't done. If None, then there is no limit on the wait
415 time.
416
417 Returns:
418 The exception raised by the call that the future represents or None
419 if the call completed without raising.
420
421 Raises:
422 CancelledError: If the future was cancelled.
423 TimeoutError: If the future didn't finish executing before the given
424 timeout.
425 """
426
427 with self._condition:
428 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
429 raise CancelledError()
430 elif self._state == FINISHED:
431 return self._exception
432
433 self._condition.wait(timeout)
434
435 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
436 raise CancelledError()
437 elif self._state == FINISHED:
438 return self._exception
439 else:
440 raise TimeoutError()
441
442 # The following methods should only be used by Executors and in tests.
443 def set_running_or_notify_cancel(self):
444 """Mark the future as running or process any cancel notifications.
445
446 Should only be used by Executor implementations and unit tests.
447
448 If the future has been cancelled (cancel() was called and returned
449 True) then any threads waiting on the future completing (though calls
450 to as_completed() or wait()) are notified and False is returned.
451
452 If the future was not cancelled then it is put in the running state
453 (future calls to running() will return True) and True is returned.
454
455 This method should be called by Executor implementations before
456 executing the work associated with this future. If this method returns
457 False then the work should not be executed.
458
459 Returns:
460 False if the Future was cancelled, True otherwise.
461
462 Raises:
463 RuntimeError: if this method was already called or if set_result()
464 or set_exception() was called.
465 """
466 with self._condition:
467 if self._state == CANCELLED:
468 self._state = CANCELLED_AND_NOTIFIED
469 for waiter in self._waiters:
470 waiter.add_cancelled(self)
471 # self._condition.notify_all() is not necessary because
472 # self.cancel() triggers a notification.
473 return False
474 elif self._state == PENDING:
475 self._state = RUNNING
476 return True
477 else:
478 LOGGER.critical('Future %s in unexpected state: %s',
479 id(self.future),
480 self.future._state)
481 raise RuntimeError('Future in unexpected state')
482
483 def set_result(self, result):
484 """Sets the return value of work associated with the future.
485
486 Should only be used by Executor implementations and unit tests.
487 """
488 with self._condition:
489 self._result = result
490 self._state = FINISHED
491 for waiter in self._waiters:
492 waiter.add_result(self)
493 self._condition.notify_all()
494 self._invoke_callbacks()
495
496 def set_exception(self, exception):
497 """Sets the result of the future as being the given exception.
498
499 Should only be used by Executor implementations and unit tests.
500 """
501 with self._condition:
502 self._exception = exception
503 self._state = FINISHED
504 for waiter in self._waiters:
505 waiter.add_exception(self)
506 self._condition.notify_all()
507 self._invoke_callbacks()
508
509class Executor(object):
510 """This is an abstract base class for concrete asynchronous executors."""
511
512 def submit(self, fn, *args, **kwargs):
513 """Submits a callable to be executed with the given arguments.
514
515 Schedules the callable to be executed as fn(*args, **kwargs) and returns
516 a Future instance representing the execution of the callable.
517
518 Returns:
519 A Future representing the given call.
520 """
521 raise NotImplementedError()
522
523 def map(self, fn, *iterables, **kwargs):
524 """Returns a iterator equivalent to map(fn, iter).
525
526 Args:
527 fn: A callable that will take take as many arguments as there are
528 passed iterables.
529 timeout: The maximum number of seconds to wait. If None, then there
530 is no limit on the wait time.
531
532 Returns:
533 An iterator equivalent to: map(func, *iterables) but the calls may
534 be evaluated out-of-order.
535
536 Raises:
537 TimeoutError: If the entire result iterator could not be generated
538 before the given timeout.
539 Exception: If fn(*args) raises for any values.
540 """
541 timeout = kwargs.get('timeout')
542 if timeout is not None:
543 end_time = timeout + time.time()
544
545 fs = [self.submit(fn, *args) for args in zip(*iterables)]
546
547 try:
548 for future in fs:
549 if timeout is None:
550 yield future.result()
551 else:
552 yield future.result(end_time - time.time())
553 finally:
554 for future in fs:
555 future.cancel()
556
557 def shutdown(self, wait=True):
558 """Clean-up the resources associated with the Executor.
559
560 It is safe to call this method several times. Otherwise, no other
561 methods can be called after this one.
562
563 Args:
564 wait: If True then shutdown will not return until all running
565 futures have finished executing and the resources used by the
566 executor have been reclaimed.
567 """
568 pass
569
570 def __enter__(self):
571 return self
572
573 def __exit__(self, exc_type, exc_val, exc_tb):
574 self.shutdown(wait=True)
575 return False