diff options
Diffstat (limited to 'bitbake/lib/bb')
-rw-r--r-- | bitbake/lib/bb/cooker.py | 34 |
1 files changed, 16 insertions, 18 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 90022f3c35..dc131939ed 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py | |||
@@ -26,6 +26,7 @@ import json | |||
26 | import pickle | 26 | import pickle |
27 | import codecs | 27 | import codecs |
28 | import hashserv | 28 | import hashserv |
29 | import ctypes | ||
29 | 30 | ||
30 | logger = logging.getLogger("BitBake") | 31 | logger = logging.getLogger("BitBake") |
31 | collectlog = logging.getLogger("BitBake.Collection") | 32 | collectlog = logging.getLogger("BitBake.Collection") |
@@ -1998,9 +1999,9 @@ class ParsingFailure(Exception): | |||
1998 | Exception.__init__(self, realexception, recipe) | 1999 | Exception.__init__(self, realexception, recipe) |
1999 | 2000 | ||
2000 | class Parser(multiprocessing.Process): | 2001 | class Parser(multiprocessing.Process): |
2001 | def __init__(self, jobs, jobid_queue, results, quit, profile): | 2002 | def __init__(self, jobs, next_job_id, results, quit, profile): |
2002 | self.jobs = jobs | 2003 | self.jobs = jobs |
2003 | self.jobid_queue = jobid_queue | 2004 | self.next_job_id = next_job_id |
2004 | self.results = results | 2005 | self.results = results |
2005 | self.quit = quit | 2006 | self.quit = quit |
2006 | multiprocessing.Process.__init__(self) | 2007 | multiprocessing.Process.__init__(self) |
@@ -2059,20 +2060,22 @@ class Parser(multiprocessing.Process): | |||
2059 | multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, exitpriority=1) | 2060 | multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, exitpriority=1) |
2060 | 2061 | ||
2061 | pending = [] | 2062 | pending = [] |
2063 | havejobs = True | ||
2062 | try: | 2064 | try: |
2063 | while pending or not self.exit: | 2065 | while (havejobs or pending) and not self.exit: |
2064 | if self.quit.is_set(): | 2066 | if self.quit.is_set(): |
2065 | break | 2067 | break |
2066 | 2068 | ||
2067 | jobid = None | 2069 | job = None |
2068 | try: | 2070 | if havejobs: |
2069 | # Have to wait for all parsers to have forked | 2071 | with self.next_job_id.get_lock(): |
2070 | jobid = self.jobid_queue.get(True, 0.1) | 2072 | if self.next_job_id.value < len(self.jobs): |
2071 | except (ValueError, OSError, queue.Empty) as e: | 2073 | job = self.jobs[self.next_job_id.value] |
2072 | pass | 2074 | self.next_job_id.value += 1 |
2075 | else: | ||
2076 | havejobs = False | ||
2073 | 2077 | ||
2074 | if jobid is not None: | 2078 | if job: |
2075 | job = self.jobs[jobid] | ||
2076 | result = self.parse(*job) | 2079 | result = self.parse(*job) |
2077 | # Clear the siggen cache after parsing to control memory usage, its huge | 2080 | # Clear the siggen cache after parsing to control memory usage, its huge |
2078 | bb.parse.siggen.postparsing_clean_cache() | 2081 | bb.parse.siggen.postparsing_clean_cache() |
@@ -2085,7 +2088,6 @@ class Parser(multiprocessing.Process): | |||
2085 | except queue.Full: | 2088 | except queue.Full: |
2086 | pending.append(result) | 2089 | pending.append(result) |
2087 | finally: | 2090 | finally: |
2088 | self.jobs.close() | ||
2089 | self.results.close() | 2091 | self.results.close() |
2090 | self.results.join_thread() | 2092 | self.results.join_thread() |
2091 | 2093 | ||
@@ -2167,22 +2169,18 @@ class CookerParser(object): | |||
2167 | if self.toparse: | 2169 | if self.toparse: |
2168 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) | 2170 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) |
2169 | 2171 | ||
2170 | self.toparse_queue = multiprocessing.Queue(len(self.willparse)) | 2172 | next_job_id = multiprocessing.Value(ctypes.c_int, 0) |
2171 | self.parser_quit = multiprocessing.Event() | 2173 | self.parser_quit = multiprocessing.Event() |
2172 | self.result_queue = multiprocessing.Queue() | 2174 | self.result_queue = multiprocessing.Queue() |
2173 | 2175 | ||
2174 | # Have to pass in willparse at fork time so all parsing processes have the unpickleable data | 2176 | # Have to pass in willparse at fork time so all parsing processes have the unpickleable data |
2175 | # then access it by index from the parse queue. | 2177 | # then access it by index from the parse queue. |
2176 | for i in range(0, self.num_processes): | 2178 | for i in range(0, self.num_processes): |
2177 | parser = Parser(self.willparse, self.toparse_queue, self.result_queue, self.parser_quit, self.cooker.configuration.profile) | 2179 | parser = Parser(self.willparse, next_job_id, self.result_queue, self.parser_quit, self.cooker.configuration.profile) |
2178 | parser.start() | 2180 | parser.start() |
2179 | self.process_names.append(parser.name) | 2181 | self.process_names.append(parser.name) |
2180 | self.processes.append(parser) | 2182 | self.processes.append(parser) |
2181 | 2183 | ||
2182 | for jobid in range(len(self.willparse)): | ||
2183 | self.toparse_queue.put(jobid) | ||
2184 | self.toparse_queue.close() | ||
2185 | |||
2186 | self.results = itertools.chain(self.results, self.parse_generator()) | 2184 | self.results = itertools.chain(self.results, self.parse_generator()) |
2187 | 2185 | ||
2188 | def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"): | 2186 | def shutdown(self, clean=True, eventmsg="Parsing halted due to errors"): |