From 32ea7668712a50d8f8b67d5e4558039e5092a485 Mon Sep 17 00:00:00 2001 From: Chris Larson Date: Thu, 18 Nov 2010 20:21:54 -0700 Subject: Implement parallel parsing support This utilizes python's multiprocessing module. The default number of threads to be used is the same as the number of available processor cores, however, you can manually set this with the BB_NUMBER_PARSE_THREADS variable. (Bitbake rev: c7b3ec819549e51e438d293969e205883fee725f) Signed-off-by: Chris Larson Signed-off-by: Richard Purdie --- bitbake/lib/bb/cache.py | 98 +++++++++++++++++------------ bitbake/lib/bb/cooker.py | 135 +++++++++++++++++++++++++++++----------- bitbake/lib/bb/pysh/pyshyacc.py | 1 + 3 files changed, 157 insertions(+), 77 deletions(-) diff --git a/bitbake/lib/bb/cache.py b/bitbake/lib/bb/cache.py index 23845bc07b..93dccf21f1 100644 --- a/bitbake/lib/bb/cache.py +++ b/bitbake/lib/bb/cache.py @@ -232,48 +232,57 @@ class Cache(object): bb_data = cls.load_bbfile(fn, appends, cfgData) return bb_data[virtual] - def loadData(self, fn, appends, cfgData, cacheData): - """ - Load a subset of data for fn. - If the cached data is valid we do nothing, - To do this, we need to parse the file and set the system - to record the variables accessed. - Return the cache status and whether the file was skipped when parsed - """ - skipped, virtuals = 0, 0 + @classmethod + def parse(cls, filename, appends, configdata): + """Parse the specified filename, returning the recipe information""" + infos = [] + datastores = cls.load_bbfile(filename, appends, configdata) + depends = set() + for variant, data in sorted(datastores.iteritems(), + key=lambda i: i[0], + reverse=True): + virtualfn = cls.realfn2virtual(filename, variant) + depends |= (data.getVar("__depends", False) or set()) + if depends and not variant: + data.setVar("__depends", depends) + info = RecipeInfo.from_metadata(filename, data) + infos.append((virtualfn, info)) + return infos + + def load(self, filename, appends, configdata): + """Obtain the recipe information for the specified filename, + using cached values if available, otherwise parsing. + + Note that if it does parse to obtain the info, it will not + automatically add the information to the cache or to your + CacheData. Use the add or add_info method to do so after + running this, or use loadData instead.""" + cached = self.cacheValid(filename) + if cached: + infos = [] + info = self.depends_cache[filename] + for variant in info.variants: + virtualfn = self.realfn2virtual(filename, variant) + infos.append((virtualfn, self.depends_cache[virtualfn])) + else: + logger.debug(1, "Parsing %s", filename) + return self.parse(filename, appends, configdata) - if fn not in self.checked: - self.cacheValidUpdate(fn) + return cached, infos - cached = self.cacheValid(fn) - if not cached: - logger.debug(1, "Parsing %s", fn) - datastores = self.load_bbfile(fn, appends, cfgData) - depends = set() - for variant, data in sorted(datastores.iteritems(), - key=lambda i: i[0], - reverse=True): - virtualfn = self.realfn2virtual(fn, variant) - depends |= (data.getVar("__depends", False) or set()) - if depends and not variant: - data.setVar("__depends", depends) - info = RecipeInfo.from_metadata(fn, data) - if not info.nocache: - # The recipe was parsed, and is not marked as being - # uncacheable, so we need to ensure that we write out the - # new cache data. - self.cacheclean = False - self.depends_cache[virtualfn] = info + def loadData(self, fn, appends, cfgData, cacheData): + """Load the recipe info for the specified filename, + parsing and adding to the cache if necessary, and adding + the recipe information to the supplied CacheData instance.""" + skipped, virtuals = 0, 0 - info = self.depends_cache[fn] - for variant in info.variants: - virtualfn = self.realfn2virtual(fn, variant) - vinfo = self.depends_cache[virtualfn] - if vinfo.skipped: + cached, infos = self.load(fn, appends, cfgData) + for virtualfn, info in infos: + if info.skipped: logger.debug(1, "Skipping %s", virtualfn) skipped += 1 else: - cacheData.add_from_recipeinfo(virtualfn, vinfo) + self.add_info(virtualfn, info, cacheData, not cached) virtuals += 1 return cached, skipped, virtuals @@ -283,6 +292,9 @@ class Cache(object): Is the cache valid for fn? Fast version, no timestamps checked. """ + if fn not in self.checked: + self.cacheValidUpdate(fn) + # Is cache enabled? if not self.has_cache: return False @@ -412,14 +424,22 @@ class Cache(object): def mtime(cachefile): return bb.parse.cached_mtime_noerror(cachefile) - def add(self, file_name, data, cacheData): + def add_info(self, filename, info, cacheData, parsed=None): + self.depends_cache[filename] = info + cacheData.add_from_recipeinfo(filename, info) + if parsed and not info.nocache: + # The recipe was parsed, and is not marked as being + # uncacheable, so we need to ensure that we write out the + # new cache data. + self.cacheclean = False + + def add(self, file_name, data, cacheData, parsed=None): """ Save data we need into the cache """ realfn = self.virtualfn2realfn(file_name)[0] info = RecipeInfo.from_metadata(realfn, data) - self.depends_cache[file_name] = info - cacheData.add_from_recipeinfo(file_name, info) + self.add_info(file_name, info, cacheData, parsed) @staticmethod def load_bbfile(bbfile, appends, config): diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 6194919e4c..0143c149b8 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py @@ -25,6 +25,8 @@ from __future__ import print_function import sys, os, glob, os.path, re, time import logging import sre_constants +import multiprocessing +import signal from cStringIO import StringIO from contextlib import closing import bb @@ -976,7 +978,7 @@ class CookerExit(bb.event.Event): def __init__(self): bb.event.Event.__init__(self) -class CookerParser: +class CookerParser(object): def __init__(self, cooker, filelist, masked): # Internal data self.filelist = filelist @@ -987,49 +989,106 @@ class CookerParser: self.cached = 0 self.error = 0 self.masked = masked - self.total = len(filelist) self.skipped = 0 self.virtuals = 0 + self.total = len(filelist) - # Pointer to the next file to parse - self.pointer = 0 - - def parse_next(self): - cooker = self.cooker - if self.pointer < len(self.filelist): - f = self.filelist[self.pointer] - - try: - fromCache, skipped, virtuals = cooker.bb_cache.loadData(f, cooker.get_file_appends(f), cooker.configuration.data, cooker.status) - if fromCache: - self.cached += 1 - else: - self.parsed += 1 - - self.skipped += skipped - self.virtuals += virtuals + # current to the next file to parse + self.current = 0 + self.result_queue = None + self.fromcache = None - except KeyboardInterrupt: - cooker.bb_cache.remove(f) - cooker.bb_cache.sync() - raise - except Exception as e: - self.error += 1 - cooker.bb_cache.remove(f) - parselog.exception("Unable to open %s", f) - except: - cooker.bb_cache.remove(f) - raise - finally: - bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, self.skipped, self.masked, self.virtuals, self.error, self.total), cooker.configuration.event_data) + self.launch_processes() - self.pointer += 1 + def launch_processes(self): + self.task_queue = multiprocessing.Queue() + self.result_queue = multiprocessing.Queue() + + self.fromcache = [] + cfgdata = self.cooker.configuration.data + for filename in self.filelist: + appends = self.cooker.get_file_appends(filename) + if not self.cooker.bb_cache.cacheValid(filename): + self.task_queue.put((filename, appends)) + else: + self.fromcache.append((filename, appends)) + + def worker(input, output, cfgdata): + signal.signal(signal.SIGINT, signal.SIG_IGN) + for filename, appends in iter(input.get, 'STOP'): + infos = bb.cache.Cache.parse(filename, appends, cfgdata) + output.put(infos) + + self.processes = [] + num_processes = int(cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or + multiprocessing.cpu_count()) + for i in xrange(num_processes): + process = multiprocessing.Process(target=worker, + args=(self.task_queue, + self.result_queue, + cfgdata)) + process.start() + self.processes.append(process) + + def shutdown(self, clean=True): + self.result_queue.close() + for process in self.processes: + if clean: + self.task_queue.put('STOP') + else: + process.terminate() + self.task_queue.close() + for process in self.processes: + process.join() + self.cooker.bb_cache.sync() + bb.codeparser.parser_cache_save(self.cooker.configuration.data) + if self.error > 0: + raise ParsingErrorsFound() + + def progress(self): + bb.event.fire(bb.event.ParseProgress(self.cached, self.parsed, + self.skipped, self.masked, + self.virtuals, self.error, + self.total), + self.cooker.configuration.event_data) - if self.pointer >= self.total: - cooker.bb_cache.sync() - bb.codeparser.parser_cache_save(cooker.configuration.data) - if self.error > 0: - raise ParsingErrorsFound + def parse_next(self): + cooker = self.cooker + if self.current >= self.total: + self.shutdown() return False + + try: + if self.result_queue.empty() and self.fromcache: + filename, appends = self.fromcache.pop() + _, infos = cooker.bb_cache.load(filename, appends, + self.cooker.configuration.data) + parsed = False + else: + infos = self.result_queue.get() + parsed = True + except KeyboardInterrupt: + self.shutdown(clean=False) + raise + except Exception as e: + self.error += 1 + parselog.critical(str(e)) + else: + if parsed: + self.parsed += 1 + else: + self.cached += 1 + self.virtuals += len(infos) + + for virtualfn, info in infos: + cooker.bb_cache.add_info(virtualfn, info, cooker.status, + parsed=parsed) + if info.skipped: + self.skipped += 1 + finally: + self.progress() + + self.current += 1 return True + diff --git a/bitbake/lib/bb/pysh/pyshyacc.py b/bitbake/lib/bb/pysh/pyshyacc.py index 8bb9927321..3d6f54a58c 100644 --- a/bitbake/lib/bb/pysh/pyshyacc.py +++ b/bitbake/lib/bb/pysh/pyshyacc.py @@ -648,6 +648,7 @@ def p_error(p): try: import pyshtables except ImportError: + import os outputdir = os.path.dirname(__file__) if not os.access(outputdir, os.W_OK): outputdir = '' -- cgit v1.2.3-54-g00ecf