From ac4d926f413b127810130ad7a442e6daf1fdd1b9 Mon Sep 17 00:00:00 2001 From: Chris Larson Date: Tue, 7 Dec 2010 13:00:22 -0500 Subject: cooker: use a pool, abort on first parse error (Bitbake rev: 9caf65e79f95fe0045e727391e974c4c1e7411ff) Signed-off-by: Chris Larson Signed-off-by: Richard Purdie --- bitbake/lib/bb/cooker.py | 144 ++++++++++++++++++++--------------------------- 1 file changed, 60 insertions(+), 84 deletions(-) (limited to 'bitbake/lib/bb/cooker.py') diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 18a22de711..bf896e9428 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -23,12 +23,13 @@ from __future__ import print_function import sys, os, glob, os.path, re, time +import atexit +import itertools import logging -import sre_constants -import threading import multiprocessing import signal -import atexit +import sre_constants +import threading from cStringIO import StringIO from contextlib import closing import bb @@ -45,11 +46,6 @@ class MultipleMatches(Exception): Exception raised when multiple file matches are found """ -class ParsingErrorsFound(Exception): - """ - Exception raised when parsing errors are found - """ - class NothingToBuild(Exception): """ Exception raised when there is nothing to build @@ -976,6 +972,10 @@ class CookerExit(bb.event.Event): def __init__(self): bb.event.Event.__init__(self) +def parse_file(task): + filename, appends = task + return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg) + class CookerParser(object): def __init__(self, cooker, filelist, masked): self.filelist = filelist @@ -993,113 +993,89 @@ class CookerParser(object): self.total = len(filelist) self.current = 0 - self.bb_cache = None - self.task_queue = None - self.result_queue = None - self.fromcache = None self.num_processes = int(self.cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or multiprocessing.cpu_count()) - def launch_processes(self): - self.task_queue = multiprocessing.Queue() - self.result_queue = multiprocessing.Queue() - + self.bb_cache = bb.cache.Cache(self.cfgdata) self.fromcache = [] + self.willparse = [] for filename in self.filelist: appends = self.cooker.get_file_appends(filename) if not self.bb_cache.cacheValid(filename): - self.task_queue.put((filename, appends)) + self.willparse.append((filename, appends)) else: self.fromcache.append((filename, appends)) self.toparse = self.total - len(self.fromcache) self.progress_chunk = max(self.toparse / 100, 1) - def worker(input, output, cfgdata): + self.start() + + def start(self): + def init(cfg): signal.signal(signal.SIGINT, signal.SIG_IGN) - for filename, appends in iter(input.get, 'STOP'): - try: - infos = bb.cache.Cache.parse(filename, appends, cfgdata) - except bb.parse.ParseError as exc: - output.put(exc) - else: - output.put(infos) + parse_file.cfg = cfg - self.processes = [] - for i in xrange(self.num_processes): - process = multiprocessing.Process(target=worker, - args=(self.task_queue, - self.result_queue, - self.cfgdata)) - process.start() - self.processes.append(process) + bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) + + self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata]) + parsed = self.pool.imap(parse_file, self.willparse) + self.pool.close() + + self.results = itertools.chain(self.load_cached(), parsed) def shutdown(self, clean=True): - self.result_queue.close() - for process in self.processes: - if clean: - self.task_queue.put('STOP') - else: - process.terminate() - self.task_queue.close() - for process in self.processes: - process.join() + if clean: + event = bb.event.ParseCompleted(self.cached, self.parsed, + self.skipped, self.masked, + self.virtuals, self.error, + self.total) + bb.event.fire(event, self.cfgdata) + else: + self.pool.terminate() + self.pool.join() + sync = threading.Thread(target=self.bb_cache.sync) sync.start() atexit.register(lambda: sync.join()) + codesync = threading.Thread(target=bb.codeparser.parser_cache_save(self.cooker.configuration.data)) codesync.start() atexit.register(lambda: codesync.join()) - if self.error > 0: - raise ParsingErrorsFound() + + def load_cached(self): + for filename, appends in self.fromcache: + cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) + yield not cached, infos def parse_next(self): - if self.current >= self.total: - event = bb.event.ParseCompleted(self.cached, self.parsed, - self.skipped, self.masked, - self.virtuals, self.error, - self.total) - bb.event.fire(event, self.cfgdata) + try: + parsed, result = self.results.next() + except StopIteration: self.shutdown() return False - elif not self.bb_cache: - self.bb_cache = bb.cache.Cache(self.cfgdata) - self.launch_processes() - bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) - return True - - try: - if self.result_queue.empty() and self.fromcache: - filename, appends = self.fromcache.pop() - _, result = self.bb_cache.load(filename, appends, self.cfgdata) - parsed = False - self.cached += 1 - else: - result = self.result_queue.get() - if isinstance(result, Exception): - raise result - - parsed = True - self.parsed += 1 - if self.parsed % self.progress_chunk == 0: - bb.event.fire(bb.event.ParseProgress(self.parsed), - self.cfgdata) except KeyboardInterrupt: self.shutdown(clean=False) raise - except Exception as e: - self.error += 1 - parselog.critical(str(e)) - else: - self.virtuals += len(result) - - for virtualfn, info in result: - if info.skipped: - self.skipped += 1 - else: - self.bb_cache.add_info(virtualfn, info, self.cooker.status, - parsed=parsed) + except Exception as exc: + self.shutdown(clean=False) + sys.exit(1) self.current += 1 + self.virtuals += len(result) + if parsed: + self.parsed += 1 + if self.parsed % self.progress_chunk == 0: + bb.event.fire(bb.event.ParseProgress(self.parsed), + self.cfgdata) + else: + self.cached += 1 + + for virtualfn, info in result: + if info.skipped: + self.skipped += 1 + else: + self.bb_cache.add_info(virtualfn, info, self.cooker.status, + parsed=parsed) return True def reparse(self, filename): -- cgit v1.2.3-54-g00ecf