diff options
author | Aníbal Limón <anibal.limon@linux.intel.com> | 2015-06-23 11:49:53 -0500 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2015-06-26 09:27:30 +0100 |
commit | 3fa32158c45edf276197fa56466ed942ab70be09 (patch) | |
tree | 478c1a26e5ce6e7d2456c0d895ebf4ff170e5c61 /meta | |
parent | 18e902b2dc17746af453869bcc1fba4dbae7c94f (diff) | |
download | poky-3fa32158c45edf276197fa56466ed942ab70be09.tar.gz |
oe/utils.py: Fix thread leakage in ThreadPool
In order to fix Thread leakage caused by not call join() in Threads,
Pass num_tasks in ThreadPool for add all the tasks into a Queue this
enable catch of Queue.Empty exception and exit the threads.
classes/sstate.bbclass: Change checkstatus function to match new
ThreadPool operation.
(From OE-Core rev: 524d92ed7b53bef933527095e82f378b934f25ef)
Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'meta')
-rw-r--r-- | meta/classes/sstate.bbclass | 3 | ||||
-rw-r--r-- | meta/lib/oe/utils.py | 26 |
2 files changed, 22 insertions, 7 deletions
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass index 1e5e98a1da..a80d1ced72 100644 --- a/meta/classes/sstate.bbclass +++ b/meta/classes/sstate.bbclass | |||
@@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False): | |||
771 | bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist)) | 771 | bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist)) |
772 | import multiprocessing | 772 | import multiprocessing |
773 | nproc = min(multiprocessing.cpu_count(), len(tasklist)) | 773 | nproc = min(multiprocessing.cpu_count(), len(tasklist)) |
774 | pool = oe.utils.ThreadedPool(nproc) | 774 | pool = oe.utils.ThreadedPool(nproc, len(tasklist)) |
775 | for t in tasklist: | 775 | for t in tasklist: |
776 | pool.add_task(checkstatus, t) | 776 | pool.add_task(checkstatus, t) |
777 | pool.start() | ||
777 | pool.wait_completion() | 778 | pool.wait_completion() |
778 | 779 | ||
779 | inheritlist = d.getVar("INHERIT", True) | 780 | inheritlist = d.getVar("INHERIT", True) |
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py index 0de880013a..f0d3c14137 100644 --- a/meta/lib/oe/utils.py +++ b/meta/lib/oe/utils.py | |||
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread): | |||
222 | Thread.__init__(self) | 222 | Thread.__init__(self) |
223 | self.tasks = tasks | 223 | self.tasks = tasks |
224 | self.daemon = True | 224 | self.daemon = True |
225 | self.start() | ||
226 | 225 | ||
227 | def run(self): | 226 | def run(self): |
227 | from Queue import Empty | ||
228 | |||
228 | while True: | 229 | while True: |
229 | func, args, kargs = self.tasks.get() | 230 | try: |
231 | func, args, kargs = self.tasks.get(block=False) | ||
232 | except Empty: | ||
233 | break | ||
234 | |||
230 | try: | 235 | try: |
231 | func(*args, **kargs) | 236 | func(*args, **kargs) |
232 | except Exception, e: | 237 | except Exception, e: |
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread): | |||
236 | 241 | ||
237 | class ThreadedPool: | 242 | class ThreadedPool: |
238 | """Pool of threads consuming tasks from a queue""" | 243 | """Pool of threads consuming tasks from a queue""" |
239 | def __init__(self, num_threads): | 244 | def __init__(self, num_workers, num_tasks): |
240 | self.tasks = Queue(num_threads) | 245 | self.tasks = Queue(num_tasks) |
241 | for _ in range(num_threads): ThreadedWorker(self.tasks) | 246 | self.workers = [] |
247 | |||
248 | for _ in range(num_workers): | ||
249 | worker = ThreadedWorker(self.tasks) | ||
250 | self.workers.append(worker) | ||
251 | |||
252 | def start(self): | ||
253 | for worker in self.workers: | ||
254 | worker.start() | ||
242 | 255 | ||
243 | def add_task(self, func, *args, **kargs): | 256 | def add_task(self, func, *args, **kargs): |
244 | """Add a task to the queue""" | 257 | """Add a task to the queue""" |
@@ -247,4 +260,5 @@ class ThreadedPool: | |||
247 | def wait_completion(self): | 260 | def wait_completion(self): |
248 | """Wait for completion of all the tasks in the queue""" | 261 | """Wait for completion of all the tasks in the queue""" |
249 | self.tasks.join() | 262 | self.tasks.join() |
250 | 263 | for worker in self.workers: | |
264 | worker.join() | ||