summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--meta/classes/sstate.bbclass3
-rw-r--r--meta/lib/oe/utils.py26
2 files changed, 22 insertions, 7 deletions
diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
index 1e5e98a..a80d1ce 100644
--- a/meta/classes/sstate.bbclass
+++ b/meta/classes/sstate.bbclass
@@ -771,9 +771,10 @@ def sstate_checkhashes(sq_fn, sq_task, sq_hash, sq_hashfn, d, siginfo=False):
771 bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist)) 771 bb.note("Checking sstate mirror object availability (for %s objects)" % len(tasklist))
772 import multiprocessing 772 import multiprocessing
773 nproc = min(multiprocessing.cpu_count(), len(tasklist)) 773 nproc = min(multiprocessing.cpu_count(), len(tasklist))
774 pool = oe.utils.ThreadedPool(nproc) 774 pool = oe.utils.ThreadedPool(nproc, len(tasklist))
775 for t in tasklist: 775 for t in tasklist:
776 pool.add_task(checkstatus, t) 776 pool.add_task(checkstatus, t)
777 pool.start()
777 pool.wait_completion() 778 pool.wait_completion()
778 779
779 inheritlist = d.getVar("INHERIT", True) 780 inheritlist = d.getVar("INHERIT", True)
diff --git a/meta/lib/oe/utils.py b/meta/lib/oe/utils.py
index 0de8800..f0d3c14 100644
--- a/meta/lib/oe/utils.py
+++ b/meta/lib/oe/utils.py
@@ -222,11 +222,16 @@ class ThreadedWorker(Thread):
222 Thread.__init__(self) 222 Thread.__init__(self)
223 self.tasks = tasks 223 self.tasks = tasks
224 self.daemon = True 224 self.daemon = True
225 self.start()
226 225
227 def run(self): 226 def run(self):
227 from Queue import Empty
228
228 while True: 229 while True:
229 func, args, kargs = self.tasks.get() 230 try:
231 func, args, kargs = self.tasks.get(block=False)
232 except Empty:
233 break
234
230 try: 235 try:
231 func(*args, **kargs) 236 func(*args, **kargs)
232 except Exception, e: 237 except Exception, e:
@@ -236,9 +241,17 @@ class ThreadedWorker(Thread):
236 241
237class ThreadedPool: 242class ThreadedPool:
238 """Pool of threads consuming tasks from a queue""" 243 """Pool of threads consuming tasks from a queue"""
239 def __init__(self, num_threads): 244 def __init__(self, num_workers, num_tasks):
240 self.tasks = Queue(num_threads) 245 self.tasks = Queue(num_tasks)
241 for _ in range(num_threads): ThreadedWorker(self.tasks) 246 self.workers = []
247
248 for _ in range(num_workers):
249 worker = ThreadedWorker(self.tasks)
250 self.workers.append(worker)
251
252 def start(self):
253 for worker in self.workers:
254 worker.start()
242 255
243 def add_task(self, func, *args, **kargs): 256 def add_task(self, func, *args, **kargs):
244 """Add a task to the queue""" 257 """Add a task to the queue"""
@@ -247,4 +260,5 @@ class ThreadedPool:
247 def wait_completion(self): 260 def wait_completion(self):
248 """Wait for completion of all the tasks in the queue""" 261 """Wait for completion of all the tasks in the queue"""
249 self.tasks.join() 262 self.tasks.join()
250 263 for worker in self.workers:
264 worker.join()