diff options
| author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-03-06 15:32:35 +0000 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-03-06 15:40:56 +0000 |
| commit | 164a4cb2fc64e76836182ad2d412343a7b26b964 (patch) | |
| tree | c4246277b4148eec0b77ba8b89ac234b114cad6b /bitbake | |
| parent | ed76a48e6832c4a2015b9675e758f8c31229938e (diff) | |
| download | poky-164a4cb2fc64e76836182ad2d412343a7b26b964.tar.gz | |
bitbake: Revert "cooker: parse using bb.compat.Pool"
Reverting the pool changes, terminate does not work reliably on
bb.compat.Pool :(
[YOCTO #3978]
This reverts commit 8af519a49a3374bd9004864ef31ca8aa328e9f34.
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
| -rw-r--r-- | bitbake/lib/bb/cooker.py | 161 |
1 files changed, 134 insertions, 27 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index 9f7121fefc..9d051fa30f 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py | |||
| @@ -34,7 +34,7 @@ from cStringIO import StringIO | |||
| 34 | from contextlib import closing | 34 | from contextlib import closing |
| 35 | from functools import wraps | 35 | from functools import wraps |
| 36 | from collections import defaultdict | 36 | from collections import defaultdict |
| 37 | import bb, bb.exceptions, bb.command, bb.compat | 37 | import bb, bb.exceptions, bb.command |
| 38 | from bb import utils, data, parse, event, cache, providers, taskdata, runqueue | 38 | from bb import utils, data, parse, event, cache, providers, taskdata, runqueue |
| 39 | import Queue | 39 | import Queue |
| 40 | import prserv.serv | 40 | import prserv.serv |
| @@ -1556,19 +1556,87 @@ class ParsingFailure(Exception): | |||
| 1556 | self.recipe = recipe | 1556 | self.recipe = recipe |
| 1557 | Exception.__init__(self, realexception, recipe) | 1557 | Exception.__init__(self, realexception, recipe) |
| 1558 | 1558 | ||
| 1559 | def parse_file((filename, appends, caches_array)): | 1559 | class Feeder(multiprocessing.Process): |
| 1560 | try: | 1560 | def __init__(self, jobs, to_parsers, quit): |
| 1561 | return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) | 1561 | self.quit = quit |
| 1562 | except Exception as exc: | 1562 | self.jobs = jobs |
| 1563 | tb = sys.exc_info()[2] | 1563 | self.to_parsers = to_parsers |
| 1564 | exc.recipe = filename | 1564 | multiprocessing.Process.__init__(self) |
| 1565 | exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) | 1565 | |
| 1566 | return True, exc | 1566 | def run(self): |
| 1567 | # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown | 1567 | while True: |
| 1568 | # and for example a worker thread doesn't just exit on its own in response to | 1568 | try: |
| 1569 | # a SystemExit event for example. | 1569 | quit = self.quit.get_nowait() |
| 1570 | except BaseException as exc: | 1570 | except Queue.Empty: |
| 1571 | return True, ParsingFailure(exc, filename) | 1571 | pass |
| 1572 | else: | ||
| 1573 | if quit == 'cancel': | ||
| 1574 | self.to_parsers.cancel_join_thread() | ||
| 1575 | break | ||
| 1576 | |||
| 1577 | try: | ||
| 1578 | job = self.jobs.pop() | ||
| 1579 | except IndexError: | ||
| 1580 | break | ||
| 1581 | |||
| 1582 | try: | ||
| 1583 | self.to_parsers.put(job, timeout=0.5) | ||
| 1584 | except Queue.Full: | ||
| 1585 | self.jobs.insert(0, job) | ||
| 1586 | continue | ||
| 1587 | |||
| 1588 | class Parser(multiprocessing.Process): | ||
| 1589 | def __init__(self, jobs, results, quit, init): | ||
| 1590 | self.jobs = jobs | ||
| 1591 | self.results = results | ||
| 1592 | self.quit = quit | ||
| 1593 | self.init = init | ||
| 1594 | multiprocessing.Process.__init__(self) | ||
| 1595 | |||
| 1596 | def run(self): | ||
| 1597 | if self.init: | ||
| 1598 | self.init() | ||
| 1599 | |||
| 1600 | pending = [] | ||
| 1601 | while True: | ||
| 1602 | try: | ||
| 1603 | self.quit.get_nowait() | ||
| 1604 | except Queue.Empty: | ||
| 1605 | pass | ||
| 1606 | else: | ||
| 1607 | self.results.cancel_join_thread() | ||
| 1608 | break | ||
| 1609 | |||
| 1610 | if pending: | ||
| 1611 | result = pending.pop() | ||
| 1612 | else: | ||
| 1613 | try: | ||
| 1614 | job = self.jobs.get(timeout=0.25) | ||
| 1615 | except Queue.Empty: | ||
| 1616 | continue | ||
| 1617 | |||
| 1618 | if job is None: | ||
| 1619 | break | ||
| 1620 | result = self.parse(*job) | ||
| 1621 | |||
| 1622 | try: | ||
| 1623 | self.results.put(result, timeout=0.25) | ||
| 1624 | except Queue.Full: | ||
| 1625 | pending.append(result) | ||
| 1626 | |||
| 1627 | def parse(self, filename, appends, caches_array): | ||
| 1628 | try: | ||
| 1629 | return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array) | ||
| 1630 | except Exception as exc: | ||
| 1631 | tb = sys.exc_info()[2] | ||
| 1632 | exc.recipe = filename | ||
| 1633 | exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) | ||
| 1634 | return True, exc | ||
| 1635 | # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown | ||
| 1636 | # and for example a worker thread doesn't just exit on its own in response to | ||
| 1637 | # a SystemExit event for example. | ||
| 1638 | except BaseException as exc: | ||
| 1639 | return True, ParsingFailure(exc, filename) | ||
| 1572 | 1640 | ||
| 1573 | class CookerParser(object): | 1641 | class CookerParser(object): |
| 1574 | def __init__(self, cooker, filelist, masked): | 1642 | def __init__(self, cooker, filelist, masked): |
| @@ -1602,25 +1670,32 @@ class CookerParser(object): | |||
| 1602 | self.fromcache.append((filename, appends)) | 1670 | self.fromcache.append((filename, appends)) |
| 1603 | self.toparse = self.total - len(self.fromcache) | 1671 | self.toparse = self.total - len(self.fromcache) |
| 1604 | self.progress_chunk = max(self.toparse / 100, 1) | 1672 | self.progress_chunk = max(self.toparse / 100, 1) |
| 1605 | self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1) | ||
| 1606 | 1673 | ||
| 1607 | self.start() | 1674 | self.start() |
| 1608 | self.haveshutdown = False | 1675 | self.haveshutdown = False |
| 1609 | 1676 | ||
| 1610 | def start(self): | 1677 | def start(self): |
| 1611 | self.results = self.load_cached() | 1678 | self.results = self.load_cached() |
| 1679 | self.processes = [] | ||
| 1612 | if self.toparse: | 1680 | if self.toparse: |
| 1613 | def process_init(): | ||
| 1614 | parse_file.cfg = self.cfgdata | ||
| 1615 | multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1) | ||
| 1616 | multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1) | ||
| 1617 | |||
| 1618 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) | 1681 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) |
| 1619 | 1682 | def init(): | |
| 1620 | self.pool = bb.compat.Pool(self.num_processes, process_init) | 1683 | Parser.cfg = self.cfgdata |
| 1621 | parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk) | 1684 | multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1) |
| 1622 | self.pool.close() | 1685 | multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1) |
| 1623 | self.results = itertools.chain(self.results, parsed) | 1686 | |
| 1687 | self.feeder_quit = multiprocessing.Queue(maxsize=1) | ||
| 1688 | self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) | ||
| 1689 | self.jobs = multiprocessing.Queue(maxsize=self.num_processes) | ||
| 1690 | self.result_queue = multiprocessing.Queue() | ||
| 1691 | self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit) | ||
| 1692 | self.feeder.start() | ||
| 1693 | for i in range(0, self.num_processes): | ||
| 1694 | parser = Parser(self.jobs, self.result_queue, self.parser_quit, init) | ||
| 1695 | parser.start() | ||
| 1696 | self.processes.append(parser) | ||
| 1697 | |||
| 1698 | self.results = itertools.chain(self.results, self.parse_generator()) | ||
| 1624 | 1699 | ||
| 1625 | def shutdown(self, clean=True, force=False): | 1700 | def shutdown(self, clean=True, force=False): |
| 1626 | if not self.toparse: | 1701 | if not self.toparse: |
| @@ -1636,9 +1711,25 @@ class CookerParser(object): | |||
| 1636 | self.total) | 1711 | self.total) |
| 1637 | 1712 | ||
| 1638 | bb.event.fire(event, self.cfgdata) | 1713 | bb.event.fire(event, self.cfgdata) |
| 1714 | self.feeder_quit.put(None) | ||
| 1715 | for process in self.processes: | ||
| 1716 | self.jobs.put(None) | ||
| 1639 | else: | 1717 | else: |
| 1640 | self.pool.terminate() | 1718 | self.feeder_quit.put('cancel') |
| 1641 | self.pool.join() | 1719 | |
| 1720 | self.parser_quit.cancel_join_thread() | ||
| 1721 | for process in self.processes: | ||
| 1722 | self.parser_quit.put(None) | ||
| 1723 | |||
| 1724 | self.jobs.cancel_join_thread() | ||
| 1725 | |||
| 1726 | for process in self.processes: | ||
| 1727 | if force: | ||
| 1728 | process.join(.1) | ||
| 1729 | process.terminate() | ||
| 1730 | else: | ||
| 1731 | process.join() | ||
| 1732 | self.feeder.join() | ||
| 1642 | 1733 | ||
| 1643 | sync = threading.Thread(target=self.bb_cache.sync) | 1734 | sync = threading.Thread(target=self.bb_cache.sync) |
| 1644 | sync.start() | 1735 | sync.start() |
| @@ -1651,6 +1742,22 @@ class CookerParser(object): | |||
| 1651 | cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) | 1742 | cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) |
| 1652 | yield not cached, infos | 1743 | yield not cached, infos |
| 1653 | 1744 | ||
| 1745 | def parse_generator(self): | ||
| 1746 | while True: | ||
| 1747 | if self.parsed >= self.toparse: | ||
| 1748 | break | ||
| 1749 | |||
| 1750 | try: | ||
| 1751 | result = self.result_queue.get(timeout=0.25) | ||
| 1752 | except Queue.Empty: | ||
| 1753 | pass | ||
| 1754 | else: | ||
| 1755 | value = result[1] | ||
| 1756 | if isinstance(value, BaseException): | ||
| 1757 | raise value | ||
| 1758 | else: | ||
| 1759 | yield result | ||
| 1760 | |||
| 1654 | def parse_next(self): | 1761 | def parse_next(self): |
| 1655 | result = [] | 1762 | result = [] |
| 1656 | parsed = None | 1763 | parsed = None |
