summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/concurrent/futures/thread.py
diff options
context:
space:
mode:
authorChristopher Larson <kergoth@gmail.com>2012-02-03 08:12:55 -0700
committerRichard Purdie <richard.purdie@linuxfoundation.org>2012-02-10 17:00:54 +0000
commit754847f34b1f2cfc9683b143ca9e9e6ef054037f (patch)
treef6e90272119aec4a9dc5df4f555cab7a78206387 /bitbake/lib/concurrent/futures/thread.py
parentd366c1890ee706870bbf6fec714fba9022a54329 (diff)
downloadpoky-754847f34b1f2cfc9683b143ca9e9e6ef054037f.tar.gz
Revert the switch to futures for now
Without it, we get random hangs on parse failure. With it, some folks have seen hangs even on successful cases. The former is clearly less problematic. This is temporary, until I can finish investigating the root causes of both issues. (Bitbake rev: db689a99beffea1a285cdfc74a58fe73f1666987) Signed-off-by: Christopher Larson <kergoth@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/concurrent/futures/thread.py')
-rw-r--r--bitbake/lib/concurrent/futures/thread.py144
1 files changed, 0 insertions, 144 deletions
diff --git a/bitbake/lib/concurrent/futures/thread.py b/bitbake/lib/concurrent/futures/thread.py
deleted file mode 100644
index ce0dda0c38..0000000000
--- a/bitbake/lib/concurrent/futures/thread.py
+++ /dev/null
@@ -1,144 +0,0 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ThreadPoolExecutor."""
5
6from __future__ import with_statement
7import atexit
8import threading
9import weakref
10import sys
11
12from concurrent.futures import _base
13
14try:
15 import queue
16except ImportError:
17 import Queue as queue
18
19__author__ = 'Brian Quinlan (brian@sweetapp.com)'
20
21# Workers are created as daemon threads. This is done to allow the interpreter
22# to exit when there are still idle threads in a ThreadPoolExecutor's thread
23# pool (i.e. shutdown() was not called). However, allowing workers to die with
24# the interpreter has two undesirable properties:
25# - The workers would still be running during interpretor shutdown,
26# meaning that they would fail in unpredictable ways.
27# - The workers could be killed while evaluating a work item, which could
28# be bad if the callable being evaluated has external side-effects e.g.
29# writing to a file.
30#
31# To work around this problem, an exit handler is installed which tells the
32# workers to exit when their work queues are empty and then waits until the
33# threads finish.
34
35_thread_references = set()
36_shutdown = False
37
38def _python_exit():
39 global _shutdown
40 _shutdown = True
41 for thread_reference in _thread_references:
42 thread = thread_reference()
43 if thread is not None:
44 thread.join()
45
46def _remove_dead_thread_references():
47 """Remove inactive threads from _thread_references.
48
49 Should be called periodically to prevent memory leaks in scenarios such as:
50 >>> while True:
51 ... t = ThreadPoolExecutor(max_workers=5)
52 ... t.map(int, ['1', '2', '3', '4', '5'])
53 """
54 for thread_reference in set(_thread_references):
55 if thread_reference() is None:
56 _thread_references.discard(thread_reference)
57
58atexit.register(_python_exit)
59
60class _WorkItem(object):
61 def __init__(self, future, fn, args, kwargs):
62 self.future = future
63 self.fn = fn
64 self.args = args
65 self.kwargs = kwargs
66
67 def run(self):
68 if not self.future.set_running_or_notify_cancel():
69 return
70
71 try:
72 result = self.fn(*self.args, **self.kwargs)
73 except BaseException:
74 e = sys.exc_info()[1]
75 self.future.set_exception(e)
76 else:
77 self.future.set_result(result)
78
79def _worker(executor_reference, work_queue):
80 try:
81 while True:
82 try:
83 work_item = work_queue.get(block=True, timeout=0.1)
84 except queue.Empty:
85 executor = executor_reference()
86 # Exit if:
87 # - The interpreter is shutting down OR
88 # - The executor that owns the worker has been collected OR
89 # - The executor that owns the worker has been shutdown.
90 if _shutdown or executor is None or executor._shutdown:
91 return
92 del executor
93 else:
94 work_item.run()
95 except BaseException:
96 _base.LOGGER.critical('Exception in worker', exc_info=True)
97
98class ThreadPoolExecutor(_base.Executor):
99 def __init__(self, max_workers):
100 """Initializes a new ThreadPoolExecutor instance.
101
102 Args:
103 max_workers: The maximum number of threads that can be used to
104 execute the given calls.
105 """
106 _remove_dead_thread_references()
107
108 self._max_workers = max_workers
109 self._work_queue = queue.Queue()
110 self._threads = set()
111 self._shutdown = False
112 self._shutdown_lock = threading.Lock()
113
114 def submit(self, fn, *args, **kwargs):
115 with self._shutdown_lock:
116 if self._shutdown:
117 raise RuntimeError('cannot schedule new futures after shutdown')
118
119 f = _base.Future()
120 w = _WorkItem(f, fn, args, kwargs)
121
122 self._work_queue.put(w)
123 self._adjust_thread_count()
124 return f
125 submit.__doc__ = _base.Executor.submit.__doc__
126
127 def _adjust_thread_count(self):
128 # TODO(bquinlan): Should avoid creating new threads if there are more
129 # idle threads than items in the work queue.
130 if len(self._threads) < self._max_workers:
131 t = threading.Thread(target=_worker,
132 args=(weakref.ref(self), self._work_queue))
133 t.daemon = True
134 t.start()
135 self._threads.add(t)
136 _thread_references.add(weakref.ref(t))
137
138 def shutdown(self, wait=True):
139 with self._shutdown_lock:
140 self._shutdown = True
141 if wait:
142 for t in self._threads:
143 t.join()
144 shutdown.__doc__ = _base.Executor.shutdown.__doc__