diff options
author | Christopher Larson <chris_larson@mentor.com> | 2012-01-09 12:31:32 -0600 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2012-02-22 20:25:29 +0000 |
commit | cf60f95d9fc53828c1aa95198f24917aae2617e3 (patch) | |
tree | c96ba1e539f8a9b3d30e61bbd279086409c9f928 /bitbake | |
parent | 87e32edb88c30ac116fa396148ac26357051f93a (diff) | |
download | poky-cf60f95d9fc53828c1aa95198f24917aae2617e3.tar.gz |
cooker: roll our own process pool
This fixes the hang issue encountered with parse errors. The underlying issue
seems to have been the pool.terminate(). This sends SIGTERM to each of the
multiprocessing pool's processes, however, a python process terminating in
this fashion can corrupt any queues it's interacting with, causing a number of
problems for us (e.g. the queue that sends events to the UI).
So instead of using multiprocessing's pool, we roll our own, with the ability
to cancel the work. In the very long term, the python concurrent.futures
module introduced in python 3.2 could be used to resolve this as well.
(Bitbake rev: 7c39cfd8e060cca8753ac4114775447b18e13067)
Signed-off-by: Christopher Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/bb/cooker.py | 161 |
1 files changed, 132 insertions, 29 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py index bb09dff82f..8188aaef34 100644 --- a/bitbake/lib/bb/cooker.py +++ b/bitbake/lib/bb/cooker.py | |||
@@ -36,6 +36,7 @@ from functools import wraps | |||
36 | from collections import defaultdict | 36 | from collections import defaultdict |
37 | import bb, bb.exceptions, bb.command | 37 | import bb, bb.exceptions, bb.command |
38 | from bb import utils, data, parse, event, cache, providers, taskdata, runqueue | 38 | from bb import utils, data, parse, event, cache, providers, taskdata, runqueue |
39 | import Queue | ||
39 | import prserv.serv | 40 | import prserv.serv |
40 | 41 | ||
41 | logger = logging.getLogger("BitBake") | 42 | logger = logging.getLogger("BitBake") |
@@ -1402,20 +1403,87 @@ class ParsingFailure(Exception): | |||
1402 | self.recipe = recipe | 1403 | self.recipe = recipe |
1403 | Exception.__init__(self, realexception, recipe) | 1404 | Exception.__init__(self, realexception, recipe) |
1404 | 1405 | ||
1405 | def parse_file(task): | 1406 | class Feeder(multiprocessing.Process): |
1406 | filename, appends, caches_array = task | 1407 | def __init__(self, jobs, to_parsers, quit): |
1407 | try: | 1408 | self.quit = quit |
1408 | return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) | 1409 | self.jobs = jobs |
1409 | except Exception as exc: | 1410 | self.to_parsers = to_parsers |
1410 | tb = sys.exc_info()[2] | 1411 | multiprocessing.Process.__init__(self) |
1411 | exc.recipe = filename | 1412 | |
1412 | exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) | 1413 | def run(self): |
1413 | raise exc | 1414 | while True: |
1414 | # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown | 1415 | try: |
1415 | # and for example a worker thread doesn't just exit on its own in response to | 1416 | quit = self.quit.get_nowait() |
1416 | # a SystemExit event for example. | 1417 | except Queue.Empty: |
1417 | except BaseException as exc: | 1418 | pass |
1418 | raise ParsingFailure(exc, filename) | 1419 | else: |
1420 | if quit == 'cancel': | ||
1421 | self.to_parsers.cancel_join_thread() | ||
1422 | break | ||
1423 | |||
1424 | try: | ||
1425 | job = self.jobs.pop() | ||
1426 | except IndexError: | ||
1427 | break | ||
1428 | |||
1429 | try: | ||
1430 | self.to_parsers.put(job, timeout=0.5) | ||
1431 | except Queue.Full: | ||
1432 | self.jobs.insert(0, job) | ||
1433 | continue | ||
1434 | |||
1435 | class Parser(multiprocessing.Process): | ||
1436 | def __init__(self, jobs, results, quit, init): | ||
1437 | self.jobs = jobs | ||
1438 | self.results = results | ||
1439 | self.quit = quit | ||
1440 | self.init = init | ||
1441 | multiprocessing.Process.__init__(self) | ||
1442 | |||
1443 | def run(self): | ||
1444 | if self.init: | ||
1445 | self.init() | ||
1446 | |||
1447 | pending = [] | ||
1448 | while True: | ||
1449 | try: | ||
1450 | self.quit.get_nowait() | ||
1451 | except Queue.Empty: | ||
1452 | pass | ||
1453 | else: | ||
1454 | self.results.cancel_join_thread() | ||
1455 | break | ||
1456 | |||
1457 | if pending: | ||
1458 | result = pending.pop() | ||
1459 | else: | ||
1460 | try: | ||
1461 | job = self.jobs.get(timeout=0.25) | ||
1462 | except Queue.Empty: | ||
1463 | continue | ||
1464 | |||
1465 | if job is None: | ||
1466 | break | ||
1467 | result = self.parse(*job) | ||
1468 | |||
1469 | try: | ||
1470 | self.results.put(result, timeout=0.25) | ||
1471 | except Queue.Full: | ||
1472 | pending.append(result) | ||
1473 | |||
1474 | def parse(self, filename, appends, caches_array): | ||
1475 | try: | ||
1476 | return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array) | ||
1477 | except Exception as exc: | ||
1478 | tb = sys.exc_info()[2] | ||
1479 | exc.recipe = filename | ||
1480 | exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) | ||
1481 | return True, exc | ||
1482 | # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown | ||
1483 | # and for example a worker thread doesn't just exit on its own in response to | ||
1484 | # a SystemExit event for example. | ||
1485 | except BaseException as exc: | ||
1486 | return True, ParsingFailure(exc, filename) | ||
1419 | 1487 | ||
1420 | class CookerParser(object): | 1488 | class CookerParser(object): |
1421 | def __init__(self, cooker, filelist, masked): | 1489 | def __init__(self, cooker, filelist, masked): |
@@ -1452,22 +1520,28 @@ class CookerParser(object): | |||
1452 | self.start() | 1520 | self.start() |
1453 | 1521 | ||
1454 | def start(self): | 1522 | def start(self): |
1455 | def init(cfg): | ||
1456 | parse_file.cfg = cfg | ||
1457 | multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cooker.configuration.data, ), exitpriority=1) | ||
1458 | |||
1459 | self.results = self.load_cached() | 1523 | self.results = self.load_cached() |
1460 | 1524 | self.processes = [] | |
1461 | if self.toparse: | 1525 | if self.toparse: |
1462 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) | 1526 | bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) |
1463 | 1527 | def init(): | |
1464 | self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata]) | 1528 | Parser.cfg = self.cfgdata |
1465 | parsed = self.pool.imap(parse_file, self.willparse) | 1529 | multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1) |
1466 | self.pool.close() | 1530 | |
1467 | 1531 | self.feeder_quit = multiprocessing.Queue(maxsize=1) | |
1468 | self.results = itertools.chain(self.results, parsed) | 1532 | self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes) |
1469 | 1533 | self.jobs = multiprocessing.Queue(maxsize=self.num_processes) | |
1470 | def shutdown(self, clean=True): | 1534 | self.result_queue = multiprocessing.Queue() |
1535 | self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit) | ||
1536 | self.feeder.start() | ||
1537 | for i in range(1, self.num_processes): | ||
1538 | parser = Parser(self.jobs, self.result_queue, self.parser_quit, init) | ||
1539 | parser.start() | ||
1540 | self.processes.append(parser) | ||
1541 | |||
1542 | self.results = itertools.chain(self.results, self.parse_generator()) | ||
1543 | |||
1544 | def shutdown(self, clean=True, force=False): | ||
1471 | if not self.toparse: | 1545 | if not self.toparse: |
1472 | return | 1546 | return |
1473 | 1547 | ||
@@ -1477,9 +1551,22 @@ class CookerParser(object): | |||
1477 | self.virtuals, self.error, | 1551 | self.virtuals, self.error, |
1478 | self.total) | 1552 | self.total) |
1479 | bb.event.fire(event, self.cfgdata) | 1553 | bb.event.fire(event, self.cfgdata) |
1554 | self.feeder_quit.put(None) | ||
1555 | for process in self.processes: | ||
1556 | self.jobs.put(None) | ||
1480 | else: | 1557 | else: |
1481 | self.pool.terminate() | 1558 | self.feeder_quit.put('cancel') |
1482 | self.pool.join() | 1559 | |
1560 | self.parser_quit.cancel_join_thread() | ||
1561 | for process in self.processes: | ||
1562 | self.parser_quit.put(None) | ||
1563 | |||
1564 | self.jobs.cancel_join_thread() | ||
1565 | sys.exit(1) | ||
1566 | |||
1567 | for process in self.processes: | ||
1568 | process.join() | ||
1569 | self.feeder.join() | ||
1483 | 1570 | ||
1484 | sync = threading.Thread(target=self.bb_cache.sync) | 1571 | sync = threading.Thread(target=self.bb_cache.sync) |
1485 | sync.start() | 1572 | sync.start() |
@@ -1491,6 +1578,22 @@ class CookerParser(object): | |||
1491 | cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) | 1578 | cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) |
1492 | yield not cached, infos | 1579 | yield not cached, infos |
1493 | 1580 | ||
1581 | def parse_generator(self): | ||
1582 | while True: | ||
1583 | if self.parsed >= self.toparse: | ||
1584 | break | ||
1585 | |||
1586 | try: | ||
1587 | result = self.result_queue.get(timeout=0.25) | ||
1588 | except Queue.Empty: | ||
1589 | pass | ||
1590 | else: | ||
1591 | value = result[1] | ||
1592 | if isinstance(value, BaseException): | ||
1593 | raise value | ||
1594 | else: | ||
1595 | yield result | ||
1596 | |||
1494 | def parse_next(self): | 1597 | def parse_next(self): |
1495 | try: | 1598 | try: |
1496 | parsed, result = self.results.next() | 1599 | parsed, result = self.results.next() |