From 164a4cb2fc64e76836182ad2d412343a7b26b964 Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Wed, 6 Mar 2013 15:32:35 +0000 Subject: bitbake: Revert "cooker: parse using bb.compat.Pool" Reverting the pool changes, terminate does not work reliably on bb.compat.Pool :( [YOCTO #3978] This reverts commit 8af519a49a3374bd9004864ef31ca8aa328e9f34. Signed-off-by: Richard Purdie --- bitbake/lib/bb/cooker.py | 161 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 134 insertions(+), 27 deletions(-) (limited to 'bitbake/lib') diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 9f7121fefc..9d051fa30f 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -34,7 +34,7 @@ from cStringIO import StringIO from contextlib import closing from functools import wraps from collections import defaultdict -import bb, bb.exceptions, bb.command, bb.compat +import bb, bb.exceptions, bb.command from bb import utils, data, parse, event, cache, providers, taskdata, runqueue import Queue import prserv.serv @@ -1556,19 +1556,87 @@ class ParsingFailure(Exception): self.recipe = recipe Exception.__init__(self, realexception, recipe) -def parse_file((filename, appends, caches_array)): - try: - return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) - except Exception as exc: - tb = sys.exc_info()[2] - exc.recipe = filename - exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) - return True, exc - # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown - # and for example a worker thread doesn't just exit on its own in response to - # a SystemExit event for example. - except BaseException as exc: - return True, ParsingFailure(exc, filename) +class Feeder(multiprocessing.Process): + def __init__(self, jobs, to_parsers, quit): + self.quit = quit + self.jobs = jobs + self.to_parsers = to_parsers + multiprocessing.Process.__init__(self) + + def run(self): + while True: + try: + quit = self.quit.get_nowait() + except Queue.Empty: + pass + else: + if quit == 'cancel': + self.to_parsers.cancel_join_thread() + break + + try: + job = self.jobs.pop() + except IndexError: + break + + try: + self.to_parsers.put(job, timeout=0.5) + except Queue.Full: + self.jobs.insert(0, job) + continue + +class Parser(multiprocessing.Process): + def __init__(self, jobs, results, quit, init): + self.jobs = jobs + self.results = results + self.quit = quit + self.init = init + multiprocessing.Process.__init__(self) + + def run(self): + if self.init: + self.init() + + pending = [] + while True: + try: + self.quit.get_nowait() + except Queue.Empty: + pass + else: + self.results.cancel_join_thread() + break + + if pending: + result = pending.pop() + else: + try: + job = self.jobs.get(timeout=0.25) + except Queue.Empty: + continue + + if job is None: + break + result = self.parse(*job) + + try: + self.results.put(result, timeout=0.25) + except Queue.Full: + pending.append(result) + + def parse(self, filename, appends, caches_array): + try: + return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array) + except Exception as exc: + tb = sys.exc_info()[2] + exc.recipe = filename + exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) + return True, exc + # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown + # and for example a worker thread doesn't just exit on its own in response to + # a SystemExit event for example. + except BaseException as exc: + return True, ParsingFailure(exc, filename) class CookerParser(object): def __init__(self, cooker, filelist, masked): @@ -1602,25 +1670,32 @@ class CookerParser(object): self.fromcache.append((filename, appends)) self.toparse = self.total - len(self.fromcache) self.progress_chunk = max(self.toparse / 100, 1) - self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1) self.start() self.haveshutdown = False def start(self): self.results = self.load_cached() + self.processes = [] if self.toparse: - def process_init(): - parse_file.cfg = self.cfgdata - multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1) - multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1) - bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) - - self.pool = bb.compat.Pool(self.num_processes, process_init) - parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk) - self.pool.close() - self.results = itertools.chain(self.results, parsed) + def init(): + Parser.cfg = self.cfgdata + multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1) + multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1) + + self.feeder_quit = multiprocessing.Queue(maxsize=1) + self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) + self.jobs = multiprocessing.Queue(maxsize=self.num_processes) + self.result_queue = multiprocessing.Queue() + self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit) + self.feeder.start() + for i in range(0, self.num_processes): + parser = Parser(self.jobs, self.result_queue, self.parser_quit, init) + parser.start() + self.processes.append(parser) + + self.results = itertools.chain(self.results, self.parse_generator()) def shutdown(self, clean=True, force=False): if not self.toparse: @@ -1636,9 +1711,25 @@ class CookerParser(object): self.total) bb.event.fire(event, self.cfgdata) + self.feeder_quit.put(None) + for process in self.processes: + self.jobs.put(None) else: - self.pool.terminate() - self.pool.join() + self.feeder_quit.put('cancel') + + self.parser_quit.cancel_join_thread() + for process in self.processes: + self.parser_quit.put(None) + + self.jobs.cancel_join_thread() + + for process in self.processes: + if force: + process.join(.1) + process.terminate() + else: + process.join() + self.feeder.join() sync = threading.Thread(target=self.bb_cache.sync) sync.start() @@ -1651,6 +1742,22 @@ class CookerParser(object): cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) yield not cached, infos + def parse_generator(self): + while True: + if self.parsed >= self.toparse: + break + + try: + result = self.result_queue.get(timeout=0.25) + except Queue.Empty: + pass + else: + value = result[1] + if isinstance(value, BaseException): + raise value + else: + yield result + def parse_next(self): result = [] parsed = None -- cgit v1.2.3-54-g00ecf