summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorChristopher Larson <chris_larson@mentor.com>2012-01-09 12:31:32 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2012-02-22 20:25:29 +0000
commitcf60f95d9fc53828c1aa95198f24917aae2617e3 (patch)
treec96ba1e539f8a9b3d30e61bbd279086409c9f928 /bitbake
parent87e32edb88c30ac116fa396148ac26357051f93a (diff)
downloadpoky-cf60f95d9fc53828c1aa95198f24917aae2617e3.tar.gz
cooker: roll our own process pool
This fixes the hang issue encountered with parse errors. The underlying issue seems to have been the pool.terminate(). This sends SIGTERM to each of the multiprocessing pool's processes, however, a python process terminating in this fashion can corrupt any queues it's interacting with, causing a number of problems for us (e.g. the queue that sends events to the UI). So instead of using multiprocessing's pool, we roll our own, with the ability to cancel the work. In the very long term, the python concurrent.futures module introduced in python 3.2 could be used to resolve this as well. (Bitbake rev: 7c39cfd8e060cca8753ac4114775447b18e13067) Signed-off-by: Christopher Larson <chris_larson@mentor.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/bb/cooker.py161
1 files changed, 132 insertions, 29 deletions
diff --git a/bitbake/lib/bb/cooker.py b/bitbake/lib/bb/cooker.py
index bb09dff82f..8188aaef34 100644
--- a/bitbake/lib/bb/cooker.py
+++ b/bitbake/lib/bb/cooker.py
@@ -36,6 +36,7 @@ from functools import wraps
36from collections import defaultdict 36from collections import defaultdict
37import bb, bb.exceptions, bb.command 37import bb, bb.exceptions, bb.command
38from bb import utils, data, parse, event, cache, providers, taskdata, runqueue 38from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
39import Queue
39import prserv.serv 40import prserv.serv
40 41
41logger = logging.getLogger("BitBake") 42logger = logging.getLogger("BitBake")
@@ -1402,20 +1403,87 @@ class ParsingFailure(Exception):
1402 self.recipe = recipe 1403 self.recipe = recipe
1403 Exception.__init__(self, realexception, recipe) 1404 Exception.__init__(self, realexception, recipe)
1404 1405
1405def parse_file(task): 1406class Feeder(multiprocessing.Process):
1406 filename, appends, caches_array = task 1407 def __init__(self, jobs, to_parsers, quit):
1407 try: 1408 self.quit = quit
1408 return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array) 1409 self.jobs = jobs
1409 except Exception as exc: 1410 self.to_parsers = to_parsers
1410 tb = sys.exc_info()[2] 1411 multiprocessing.Process.__init__(self)
1411 exc.recipe = filename 1412
1412 exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3)) 1413 def run(self):
1413 raise exc 1414 while True:
1414 # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown 1415 try:
1415 # and for example a worker thread doesn't just exit on its own in response to 1416 quit = self.quit.get_nowait()
1416 # a SystemExit event for example. 1417 except Queue.Empty:
1417 except BaseException as exc: 1418 pass
1418 raise ParsingFailure(exc, filename) 1419 else:
1420 if quit == 'cancel':
1421 self.to_parsers.cancel_join_thread()
1422 break
1423
1424 try:
1425 job = self.jobs.pop()
1426 except IndexError:
1427 break
1428
1429 try:
1430 self.to_parsers.put(job, timeout=0.5)
1431 except Queue.Full:
1432 self.jobs.insert(0, job)
1433 continue
1434
1435class Parser(multiprocessing.Process):
1436 def __init__(self, jobs, results, quit, init):
1437 self.jobs = jobs
1438 self.results = results
1439 self.quit = quit
1440 self.init = init
1441 multiprocessing.Process.__init__(self)
1442
1443 def run(self):
1444 if self.init:
1445 self.init()
1446
1447 pending = []
1448 while True:
1449 try:
1450 self.quit.get_nowait()
1451 except Queue.Empty:
1452 pass
1453 else:
1454 self.results.cancel_join_thread()
1455 break
1456
1457 if pending:
1458 result = pending.pop()
1459 else:
1460 try:
1461 job = self.jobs.get(timeout=0.25)
1462 except Queue.Empty:
1463 continue
1464
1465 if job is None:
1466 break
1467 result = self.parse(*job)
1468
1469 try:
1470 self.results.put(result, timeout=0.25)
1471 except Queue.Full:
1472 pending.append(result)
1473
1474 def parse(self, filename, appends, caches_array):
1475 try:
1476 return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
1477 except Exception as exc:
1478 tb = sys.exc_info()[2]
1479 exc.recipe = filename
1480 exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
1481 return True, exc
1482 # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
1483 # and for example a worker thread doesn't just exit on its own in response to
1484 # a SystemExit event for example.
1485 except BaseException as exc:
1486 return True, ParsingFailure(exc, filename)
1419 1487
1420class CookerParser(object): 1488class CookerParser(object):
1421 def __init__(self, cooker, filelist, masked): 1489 def __init__(self, cooker, filelist, masked):
@@ -1452,22 +1520,28 @@ class CookerParser(object):
1452 self.start() 1520 self.start()
1453 1521
1454 def start(self): 1522 def start(self):
1455 def init(cfg):
1456 parse_file.cfg = cfg
1457 multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cooker.configuration.data, ), exitpriority=1)
1458
1459 self.results = self.load_cached() 1523 self.results = self.load_cached()
1460 1524 self.processes = []
1461 if self.toparse: 1525 if self.toparse:
1462 bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata) 1526 bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
1463 1527 def init():
1464 self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata]) 1528 Parser.cfg = self.cfgdata
1465 parsed = self.pool.imap(parse_file, self.willparse) 1529 multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
1466 self.pool.close() 1530
1467 1531 self.feeder_quit = multiprocessing.Queue(maxsize=1)
1468 self.results = itertools.chain(self.results, parsed) 1532 self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
1469 1533 self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
1470 def shutdown(self, clean=True): 1534 self.result_queue = multiprocessing.Queue()
1535 self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
1536 self.feeder.start()
1537 for i in range(1, self.num_processes):
1538 parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
1539 parser.start()
1540 self.processes.append(parser)
1541
1542 self.results = itertools.chain(self.results, self.parse_generator())
1543
1544 def shutdown(self, clean=True, force=False):
1471 if not self.toparse: 1545 if not self.toparse:
1472 return 1546 return
1473 1547
@@ -1477,9 +1551,22 @@ class CookerParser(object):
1477 self.virtuals, self.error, 1551 self.virtuals, self.error,
1478 self.total) 1552 self.total)
1479 bb.event.fire(event, self.cfgdata) 1553 bb.event.fire(event, self.cfgdata)
1554 self.feeder_quit.put(None)
1555 for process in self.processes:
1556 self.jobs.put(None)
1480 else: 1557 else:
1481 self.pool.terminate() 1558 self.feeder_quit.put('cancel')
1482 self.pool.join() 1559
1560 self.parser_quit.cancel_join_thread()
1561 for process in self.processes:
1562 self.parser_quit.put(None)
1563
1564 self.jobs.cancel_join_thread()
1565 sys.exit(1)
1566
1567 for process in self.processes:
1568 process.join()
1569 self.feeder.join()
1483 1570
1484 sync = threading.Thread(target=self.bb_cache.sync) 1571 sync = threading.Thread(target=self.bb_cache.sync)
1485 sync.start() 1572 sync.start()
@@ -1491,6 +1578,22 @@ class CookerParser(object):
1491 cached, infos = self.bb_cache.load(filename, appends, self.cfgdata) 1578 cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
1492 yield not cached, infos 1579 yield not cached, infos
1493 1580
1581 def parse_generator(self):
1582 while True:
1583 if self.parsed >= self.toparse:
1584 break
1585
1586 try:
1587 result = self.result_queue.get(timeout=0.25)
1588 except Queue.Empty:
1589 pass
1590 else:
1591 value = result[1]
1592 if isinstance(value, BaseException):
1593 raise value
1594 else:
1595 yield result
1596
1494 def parse_next(self): 1597 def parse_next(self):
1495 try: 1598 try:
1496 parsed, result = self.results.next() 1599 parsed, result = self.results.next()