summaryrefslogtreecommitdiffstats
path: root/bitbake/lib
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-03-06 15:32:35 +0000
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-03-06 15:40:56 +0000
commit164a4cb2fc64e76836182ad2d412343a7b26b964 (patch)
treec4246277b4148eec0b77ba8b89ac234b114cad6b /bitbake/lib
parented76a48e6832c4a2015b9675e758f8c31229938e (diff)
downloadpoky-164a4cb2fc64e76836182ad2d412343a7b26b964.tar.gz
bitbake: Revert "cooker: parse using bb.compat.Pool"
Reverting the pool changes, terminate does not work reliably on bb.compat.Pool :( [YOCTO #3978] This reverts commit 8af519a49a3374bd9004864ef31ca8aa328e9f34. Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib')
-rw-r--r--bitbake/lib/bb/cooker.py161
1 files changed, 134 insertions, 27 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index 9f7121fefc..9d051fa30f 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -34,7 +34,7 @@ from cStringIO import StringIO
34from contextlib import closing 34from contextlib import closing
35from functools import wraps 35from functools import wraps
36from collections import defaultdict 36from collections import defaultdict
37import bb, bb.exceptions, bb.command, bb.compat 37import bb, bb.exceptions, bb.command
38from bb import utils, data, parse, event, cache, providers, taskdata, runqueue 38from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
39import Queue 39import Queue
40import prserv.serv 40import prserv.serv
@@ -1556,19 +1556,87 @@ class ParsingFailure(Exception):
1556 self.recipe = recipe 1556 self.recipe = recipe
1557 Exception.__init__(self, realexception, recipe) 1557 Exception.__init__(self, realexception, recipe)
1558 1558
1559def parse_file((filename, appends, caches_array)): 1559class Feeder(multiprocessing.Process):
1560 try: 1560 def __init__(self, jobs, to_parsers, quit):
1561 return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) 1561 self.quit = quit
1562 except Exception as exc: 1562 self.jobs = jobs
1563 tb = sys.exc_info()[2] 1563 self.to_parsers = to_parsers
1564 exc.recipe = filename 1564 multiprocessing.Process.__init__(self)
1565 exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) 1565
1566 return True, exc 1566 def run(self):
1567 # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown 1567 while True:
1568 # and for example a worker thread doesn't just exit on its own in response to 1568 try:
1569 # a SystemExit event for example. 1569 quit = self.quit.get_nowait()
1570 except BaseException as exc: 1570 except Queue.Empty:
1571 return True, ParsingFailure(exc, filename) 1571 pass
1572 else:
1573 if quit == 'cancel':
1574 self.to_parsers.cancel_join_thread()
1575 break
1576
1577 try:
1578 job = self.jobs.pop()
1579 except IndexError:
1580 break
1581
1582 try:
1583 self.to_parsers.put(job, timeout=0.5)
1584 except Queue.Full:
1585 self.jobs.insert(0, job)
1586 continue
1587
1588class Parser(multiprocessing.Process):
1589 def __init__(self, jobs, results, quit, init):
1590 self.jobs = jobs
1591 self.results = results
1592 self.quit = quit
1593 self.init = init
1594 multiprocessing.Process.__init__(self)
1595
1596 def run(self):
1597 if self.init:
1598 self.init()
1599
1600 pending = []
1601 while True:
1602 try:
1603 self.quit.get_nowait()
1604 except Queue.Empty:
1605 pass
1606 else:
1607 self.results.cancel_join_thread()
1608 break
1609
1610 if pending:
1611 result = pending.pop()
1612 else:
1613 try:
1614 job = self.jobs.get(timeout=0.25)
1615 except Queue.Empty:
1616 continue
1617
1618 if job is None:
1619 break
1620 result = self.parse(*job)
1621
1622 try:
1623 self.results.put(result, timeout=0.25)
1624 except Queue.Full:
1625 pending.append(result)
1626
1627 def parse(self, filename, appends, caches_array):
1628 try:
1629 return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
1630 except Exception as exc:
1631 tb = sys.exc_info()[2]
1632 exc.recipe = filename
1633 exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
1634 return True, exc
1635 # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
1636 # and for example a worker thread doesn't just exit on its own in response to
1637 # a SystemExit event for example.
1638 except BaseException as exc:
1639 return True, ParsingFailure(exc, filename)
1572 1640
1573class CookerParser(object): 1641class CookerParser(object):
1574 def __init__(self, cooker, filelist, masked): 1642 def __init__(self, cooker, filelist, masked):
@@ -1602,25 +1670,32 @@ class CookerParser(object):
1602 self.fromcache.append((filename, appends)) 1670 self.fromcache.append((filename, appends))
1603 self.toparse = self.total - len(self.fromcache) 1671 self.toparse = self.total - len(self.fromcache)
1604 self.progress_chunk = max(self.toparse / 100, 1) 1672 self.progress_chunk = max(self.toparse / 100, 1)
1605 self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
1606 1673
1607 self.start() 1674 self.start()
1608 self.haveshutdown = False 1675 self.haveshutdown = False
1609 1676
1610 def start(self): 1677 def start(self):
1611 self.results = self.load_cached() 1678 self.results = self.load_cached()
1679 self.processes = []
1612 if self.toparse: 1680 if self.toparse:
1613 def process_init():
1614 parse_file.cfg = self.cfgdata
1615 multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
1616 multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
1617
1618 bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) 1681 bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
1619 1682 def init():
1620 self.pool = bb.compat.Pool(self.num_processes, process_init) 1683 Parser.cfg = self.cfgdata
1621 parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk) 1684 multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
1622 self.pool.close() 1685 multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1)
1623 self.results = itertools.chain(self.results, parsed) 1686
1687 self.feeder_quit = multiprocessing.Queue(maxsize=1)
1688 self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
1689 self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
1690 self.result_queue = multiprocessing.Queue()
1691 self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
1692 self.feeder.start()
1693 for i in range(0, self.num_processes):
1694 parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
1695 parser.start()
1696 self.processes.append(parser)
1697
1698 self.results = itertools.chain(self.results, self.parse_generator())
1624 1699
1625 def shutdown(self, clean=True, force=False): 1700 def shutdown(self, clean=True, force=False):
1626 if not self.toparse: 1701 if not self.toparse:
@@ -1636,9 +1711,25 @@ class CookerParser(object):
1636 self.total) 1711 self.total)
1637 1712
1638 bb.event.fire(event, self.cfgdata) 1713 bb.event.fire(event, self.cfgdata)
1714 self.feeder_quit.put(None)
1715 for process in self.processes:
1716 self.jobs.put(None)
1639 else: 1717 else:
1640 self.pool.terminate() 1718 self.feeder_quit.put('cancel')
1641 self.pool.join() 1719
1720 self.parser_quit.cancel_join_thread()
1721 for process in self.processes:
1722 self.parser_quit.put(None)
1723
1724 self.jobs.cancel_join_thread()
1725
1726 for process in self.processes:
1727 if force:
1728 process.join(.1)
1729 process.terminate()
1730 else:
1731 process.join()
1732 self.feeder.join()
1642 1733
1643 sync = threading.Thread(target=self.bb_cache.sync) 1734 sync = threading.Thread(target=self.bb_cache.sync)
1644 sync.start() 1735 sync.start()
@@ -1651,6 +1742,22 @@ class CookerParser(object):
1651 cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) 1742 cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
1652 yield not cached, infos 1743 yield not cached, infos
1653 1744
1745 def parse_generator(self):
1746 while True:
1747 if self.parsed >= self.toparse:
1748 break
1749
1750 try:
1751 result = self.result_queue.get(timeout=0.25)
1752 except Queue.Empty:
1753 pass
1754 else:
1755 value = result[1]
1756 if isinstance(value, BaseException):
1757 raise value
1758 else:
1759 yield result
1760
1654 def parse_next(self): 1761 def parse_next(self):
1655 result = [] 1762 result = []
1656 parsed = None 1763 parsed = None