summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/concurrent/futures/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/concurrent/futures/process.py')
-rw-r--r--bitbake/lib/concurrent/futures/process.py345
1 files changed, 0 insertions, 345 deletions
diff --git a/bitbake/lib/concurrent/futures/process.py b/bitbake/lib/concurrent/futures/process.py
deleted file mode 100644
index 87dc789433..0000000000
--- a/bitbake/lib/concurrent/futures/process.py
+++ /dev/null
@@ -1,345 +0,0 @@
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+ +----------+ +--------+ +-----------+ +---------+
11| | => | Work Ids | => | | => | Call Q | => | |
12| | +----------+ | | +-----------+ | |
13| | | ... | | | | ... | | |
14| | | 6 | | | | 5, call() | | |
15| | | 7 | | | | ... | | |
16| Process | | ... | | Local | +-----------+ | Process |
17| Pool | +----------+ | Worker | | #1..n |
18| Executor | | Thread | | |
19| | +----------- + | | +-----------+ | |
20| | <=> | Work Items | <=> | | <= | Result Q | <= | |
21| | +------------+ | | +-----------+ | |
22| | | 6: call() | | | | ... | | |
23| | | future | | | | 4, result | | |
24| | | ... | | | | 3, except | | |
25+----------+ +------------+ +--------+ +-----------+ +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44"""
45
46from __future__ import with_statement
47import atexit
48import multiprocessing
49import threading
50import weakref
51import sys
52
53from concurrent.futures import _base
54
55try:
56 import queue
57except ImportError:
58 import Queue as queue
59
60__author__ = 'Brian Quinlan (brian@sweetapp.com)'
61
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
66# - The workers would still be running during interpretor shutdown,
67# meaning that they would fail in unpredictable ways.
68# - The workers could be killed while evaluating a work item, which could
69# be bad if the callable being evaluated has external side-effects e.g.
70# writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
76_thread_references = set()
77_shutdown = False
78
79def _python_exit():
80 global _shutdown
81 _shutdown = True
82 for thread_reference in _thread_references:
83 thread = thread_reference()
84 if thread is not None:
85 thread.join()
86
87def _remove_dead_thread_references():
88 """Remove inactive threads from _thread_references.
89
90 Should be called periodically to prevent memory leaks in scenarios such as:
91 >>> while True:
92 >>> ... t = ThreadPoolExecutor(max_workers=5)
93 >>> ... t.map(int, ['1', '2', '3', '4', '5'])
94 """
95 for thread_reference in set(_thread_references):
96 if thread_reference() is None:
97 _thread_references.discard(thread_reference)
98
99# Controls how many more calls than processes will be queued in the call queue.
100# A smaller number will mean that processes spend more time idle waiting for
101# work while a larger number will make Future.cancel() succeed less frequently
102# (Futures in the call queue cannot be cancelled).
103EXTRA_QUEUED_CALLS = 1
104
105class _WorkItem(object):
106 def __init__(self, future, fn, args, kwargs):
107 self.future = future
108 self.fn = fn
109 self.args = args
110 self.kwargs = kwargs
111
112class _ResultItem(object):
113 def __init__(self, work_id, exception=None, result=None):
114 self.work_id = work_id
115 self.exception = exception
116 self.result = result
117
118class _CallItem(object):
119 def __init__(self, work_id, fn, args, kwargs):
120 self.work_id = work_id
121 self.fn = fn
122 self.args = args
123 self.kwargs = kwargs
124
125def _process_worker(call_queue, result_queue, shutdown):
126 """Evaluates calls from call_queue and places the results in result_queue.
127
128 This worker is run in a seperate process.
129
130 Args:
131 call_queue: A multiprocessing.Queue of _CallItems that will be read and
132 evaluated by the worker.
133 result_queue: A multiprocessing.Queue of _ResultItems that will written
134 to by the worker.
135 shutdown: A multiprocessing.Event that will be set as a signal to the
136 worker that it should exit when call_queue is empty.
137 """
138 while True:
139 try:
140 call_item = call_queue.get(block=True, timeout=0.1)
141 except queue.Empty:
142 if shutdown.is_set():
143 return
144 else:
145 try:
146 r = call_item.fn(*call_item.args, **call_item.kwargs)
147 except BaseException:
148 e = sys.exc_info()[1]
149 result_queue.put(_ResultItem(call_item.work_id,
150 exception=e))
151 else:
152 result_queue.put(_ResultItem(call_item.work_id,
153 result=r))
154
155def _add_call_item_to_queue(pending_work_items,
156 work_ids,
157 call_queue):
158 """Fills call_queue with _WorkItems from pending_work_items.
159
160 This function never blocks.
161
162 Args:
163 pending_work_items: A dict mapping work ids to _WorkItems e.g.
164 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
165 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
166 are consumed and the corresponding _WorkItems from
167 pending_work_items are transformed into _CallItems and put in
168 call_queue.
169 call_queue: A multiprocessing.Queue that will be filled with _CallItems
170 derived from _WorkItems.
171 """
172 while True:
173 if call_queue.full():
174 return
175 try:
176 work_id = work_ids.get(block=False)
177 except queue.Empty:
178 return
179 else:
180 work_item = pending_work_items[work_id]
181
182 if work_item.future.set_running_or_notify_cancel():
183 call_queue.put(_CallItem(work_id,
184 work_item.fn,
185 work_item.args,
186 work_item.kwargs),
187 block=True)
188 else:
189 del pending_work_items[work_id]
190 continue
191
192def _queue_manangement_worker(executor_reference,
193 processes,
194 pending_work_items,
195 work_ids_queue,
196 call_queue,
197 result_queue,
198 shutdown_process_event):
199 """Manages the communication between this process and the worker processes.
200
201 This function is run in a local thread.
202
203 Args:
204 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
205 this thread. Used to determine if the ProcessPoolExecutor has been
206 garbage collected and that this function can exit.
207 process: A list of the multiprocessing.Process instances used as
208 workers.
209 pending_work_items: A dict mapping work ids to _WorkItems e.g.
210 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
211 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
212 call_queue: A multiprocessing.Queue that will be filled with _CallItems
213 derived from _WorkItems for processing by the process workers.
214 result_queue: A multiprocessing.Queue of _ResultItems generated by the
215 process workers.
216 shutdown_process_event: A multiprocessing.Event used to signal the
217 process workers that they should exit when their work queue is
218 empty.
219 """
220 while True:
221 _add_call_item_to_queue(pending_work_items,
222 work_ids_queue,
223 call_queue)
224
225 try:
226 result_item = result_queue.get(block=True, timeout=0.1)
227 except queue.Empty:
228 executor = executor_reference()
229 # No more work items can be added if:
230 # - The interpreter is shutting down OR
231 # - The executor that owns this worker has been collected OR
232 # - The executor that owns this worker has been shutdown.
233 if _shutdown or executor is None or executor._shutdown_thread:
234 # Since no new work items can be added, it is safe to shutdown
235 # this thread if there are no pending work items.
236 if not pending_work_items:
237 shutdown_process_event.set()
238
239 # If .join() is not called on the created processes then
240 # some multiprocessing.Queue methods may deadlock on Mac OS
241 # X.
242 for p in processes:
243 p.join()
244 return
245 del executor
246 else:
247 work_item = pending_work_items[result_item.work_id]
248 del pending_work_items[result_item.work_id]
249
250 if result_item.exception:
251 work_item.future.set_exception(result_item.exception)
252 else:
253 work_item.future.set_result(result_item.result)
254
255class ProcessPoolExecutor(_base.Executor):
256 def __init__(self, max_workers=None):
257 """Initializes a new ProcessPoolExecutor instance.
258
259 Args:
260 max_workers: The maximum number of processes that can be used to
261 execute the given calls. If None or not given then as many
262 worker processes will be created as the machine has processors.
263 """
264 _remove_dead_thread_references()
265
266 if max_workers is None:
267 self._max_workers = multiprocessing.cpu_count()
268 else:
269 self._max_workers = max_workers
270
271 # Make the call queue slightly larger than the number of processes to
272 # prevent the worker processes from idling. But don't make it too big
273 # because futures in the call queue cannot be cancelled.
274 self._call_queue = multiprocessing.Queue(self._max_workers +
275 EXTRA_QUEUED_CALLS)
276 self._result_queue = multiprocessing.Queue()
277 self._work_ids = queue.Queue()
278 self._queue_management_thread = None
279 self._processes = set()
280
281 # Shutdown is a two-step process.
282 self._shutdown_thread = False
283 self._shutdown_process_event = multiprocessing.Event()
284 self._shutdown_lock = threading.Lock()
285 self._queue_count = 0
286 self._pending_work_items = {}
287
288 def _start_queue_management_thread(self):
289 if self._queue_management_thread is None:
290 self._queue_management_thread = threading.Thread(
291 target=_queue_manangement_worker,
292 args=(weakref.ref(self),
293 self._processes,
294 self._pending_work_items,
295 self._work_ids,
296 self._call_queue,
297 self._result_queue,
298 self._shutdown_process_event))
299 self._queue_management_thread.daemon = True
300 self._queue_management_thread.start()
301 _thread_references.add(weakref.ref(self._queue_management_thread))
302
303 def _adjust_process_count(self):
304 for _ in range(len(self._processes), self._max_workers):
305 p = multiprocessing.Process(
306 target=_process_worker,
307 args=(self._call_queue,
308 self._result_queue,
309 self._shutdown_process_event))
310 p.start()
311 self._processes.add(p)
312
313 def submit(self, fn, *args, **kwargs):
314 with self._shutdown_lock:
315 if self._shutdown_thread:
316 raise RuntimeError('cannot schedule new futures after shutdown')
317
318 f = _base.Future()
319 w = _WorkItem(f, fn, args, kwargs)
320
321 self._pending_work_items[self._queue_count] = w
322 self._work_ids.put(self._queue_count)
323 self._queue_count += 1
324
325 self._start_queue_management_thread()
326 self._adjust_process_count()
327 return f
328 submit.__doc__ = _base.Executor.submit.__doc__
329
330 def shutdown(self, wait=True):
331 with self._shutdown_lock:
332 self._shutdown_thread = True
333 if wait:
334 if self._queue_management_thread:
335 self._queue_management_thread.join()
336 # To reduce the risk of openning too many files, remove references to
337 # objects that use file descriptors.
338 self._queue_management_thread = None
339 self._call_queue = None
340 self._result_queue = None
341 self._shutdown_process_event = None
342 self._processes = None
343 shutdown.__doc__ = _base.Executor.shutdown.__doc__
344
345atexit.register(_python_exit)