summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristopher Larson <chris_larson@mentor.com>2013-02-12 12:28:47 -0500
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-02-19 08:47:36 -0800
commit325410e2b20920a9903d0cafcb992b3452dcdb15 (patch)
treecdaa461da446b301c8c88051718deab4943da1b7
parentdde7a481354d5b0539762109bdfaaba6f85f879b (diff)
downloadpoky-325410e2b20920a9903d0cafcb992b3452dcdb15.tar.gz
bitbake: cooker: parse using bb.compat.Pool
(Bitbake rev: 8af519a49a3374bd9004864ef31ca8aa328e9f34) Signed-off-by: Christopher Larson <chris_larson@mentor.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--bitbake/lib/bb/cooker.py161
1 files changed, 27 insertions, 134 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 9d051fa30f..9f7121fefc 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -34,7 +34,7 @@ from cStringIO import StringIO
34from contextlib import closing 34from contextlib import closing
35from functools import wraps 35from functools import wraps
36from collections import defaultdict 36from collections import defaultdict
37import bb, bb.exceptions, bb.command 37import bb, bb.exceptions, bb.command, bb.compat
38from bb import utils, data, parse, event, cache, providers, taskdata, runqueue 38from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
39import Queue 39import Queue
40import prserv.serv 40import prserv.serv
@@ -1556,87 +1556,19 @@ class ParsingFailure(Exception):
1556 self.recipe = recipe 1556 self.recipe = recipe
1557 Exception.__init__(self, realexception, recipe) 1557 Exception.__init__(self, realexception, recipe)
1558 1558
1559class Feeder(multiprocessing.Process): 1559def parse_file((filename, appends, caches_array)):
1560 def __init__(self, jobs, to_parsers, quit): 1560 try:
1561 self.quit = quit 1561 return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array)
1562 self.jobs = jobs 1562 except Exception as exc:
1563 self.to_parsers = to_parsers 1563 tb = sys.exc_info()[2]
1564 multiprocessing.Process.__init__(self) 1564 exc.recipe = filename
1565 1565 exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
1566 def run(self): 1566 return True, exc
1567 while True: 1567 # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
1568 try: 1568 # and for example a worker thread doesn't just exit on its own in response to
1569 quit = self.quit.get_nowait() 1569 # a SystemExit event for example.
1570 except Queue.Empty: 1570 except BaseException as exc:
1571 pass 1571 return True, ParsingFailure(exc, filename)
1572 else:
1573 if quit == 'cancel':
1574 self.to_parsers.cancel_join_thread()
1575 break
1576
1577 try:
1578 job = self.jobs.pop()
1579 except IndexError:
1580 break
1581
1582 try:
1583 self.to_parsers.put(job, timeout=0.5)
1584 except Queue.Full:
1585 self.jobs.insert(0, job)
1586 continue
1587
1588class Parser(multiprocessing.Process):
1589 def __init__(self, jobs, results, quit, init):
1590 self.jobs = jobs
1591 self.results = results
1592 self.quit = quit
1593 self.init = init
1594 multiprocessing.Process.__init__(self)
1595
1596 def run(self):
1597 if self.init:
1598 self.init()
1599
1600 pending = []
1601 while True:
1602 try:
1603 self.quit.get_nowait()
1604 except Queue.Empty:
1605 pass
1606 else:
1607 self.results.cancel_join_thread()
1608 break
1609
1610 if pending:
1611 result = pending.pop()
1612 else:
1613 try:
1614 job = self.jobs.get(timeout=0.25)
1615 except Queue.Empty:
1616 continue
1617
1618 if job is None:
1619 break
1620 result = self.parse(*job)
1621
1622 try:
1623 self.results.put(result, timeout=0.25)
1624 except Queue.Full:
1625 pending.append(result)
1626
1627 def parse(self, filename, appends, caches_array):
1628 try:
1629 return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
1630 except Exception as exc:
1631 tb = sys.exc_info()[2]
1632 exc.recipe = filename
1633 exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
1634 return True, exc
1635 # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
1636 # and for example a worker thread doesn't just exit on its own in response to
1637 # a SystemExit event for example.
1638 except BaseException as exc:
1639 return True, ParsingFailure(exc, filename)
1640 1572
1641class CookerParser(object): 1573class CookerParser(object):
1642 def __init__(self, cooker, filelist, masked): 1574 def __init__(self, cooker, filelist, masked):
@@ -1670,32 +1602,25 @@ class CookerParser(object):
1670 self.fromcache.append((filename, appends)) 1602 self.fromcache.append((filename, appends))
1671 self.toparse = self.total - len(self.fromcache) 1603 self.toparse = self.total - len(self.fromcache)
1672 self.progress_chunk = max(self.toparse / 100, 1) 1604 self.progress_chunk = max(self.toparse / 100, 1)
1605 self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
1673 1606
1674 self.start() 1607 self.start()
1675 self.haveshutdown = False 1608 self.haveshutdown = False
1676 1609
1677 def start(self): 1610 def start(self):
1678 self.results = self.load_cached() 1611 self.results = self.load_cached()
1679 self.processes = []
1680 if self.toparse: 1612 if self.toparse:
1613 def process_init():
1614 parse_file.cfg = self.cfgdata
1615 multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
1616 multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
1617
1681 bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) 1618 bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
1682 def init(): 1619
1683 Parser.cfg = self.cfgdata 1620 self.pool = bb.compat.Pool(self.num_processes, process_init)
1684 multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1) 1621 parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk)
1685 multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1) 1622 self.pool.close()
1686 1623 self.results = itertools.chain(self.results, parsed)
1687 self.feeder_quit = multiprocessing.Queue(maxsize=1)
1688 self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
1689 self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
1690 self.result_queue = multiprocessing.Queue()
1691 self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
1692 self.feeder.start()
1693 for i in range(0, self.num_processes):
1694 parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
1695 parser.start()
1696 self.processes.append(parser)
1697
1698 self.results = itertools.chain(self.results, self.parse_generator())
1699 1624
1700 def shutdown(self, clean=True, force=False): 1625 def shutdown(self, clean=True, force=False):
1701 if not self.toparse: 1626 if not self.toparse:
@@ -1711,25 +1636,9 @@ class CookerParser(object):
1711 self.total) 1636 self.total)
1712 1637
1713 bb.event.fire(event, self.cfgdata) 1638 bb.event.fire(event, self.cfgdata)
1714 self.feeder_quit.put(None)
1715 for process in self.processes:
1716 self.jobs.put(None)
1717 else: 1639 else:
1718 self.feeder_quit.put('cancel') 1640 self.pool.terminate()
1719 1641 self.pool.join()
1720 self.parser_quit.cancel_join_thread()
1721 for process in self.processes:
1722 self.parser_quit.put(None)
1723
1724 self.jobs.cancel_join_thread()
1725
1726 for process in self.processes:
1727 if force:
1728 process.join(.1)
1729 process.terminate()
1730 else:
1731 process.join()
1732 self.feeder.join()
1733 1642
1734 sync = threading.Thread(target=self.bb_cache.sync) 1643 sync = threading.Thread(target=self.bb_cache.sync)
1735 sync.start() 1644 sync.start()
@@ -1742,22 +1651,6 @@ class CookerParser(object):
1742 cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) 1651 cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
1743 yield not cached, infos 1652 yield not cached, infos
1744 1653
1745 def parse_generator(self):
1746 while True:
1747 if self.parsed >= self.toparse:
1748 break
1749
1750 try:
1751 result = self.result_queue.get(timeout=0.25)
1752 except Queue.Empty:
1753 pass
1754 else:
1755 value = result[1]
1756 if isinstance(value, BaseException):
1757 raise value
1758 else:
1759 yield result
1760
1761 def parse_next(self): 1654 def parse_next(self):
1762 result = [] 1655 result = []
1763 parsed = None 1656 parsed = None