summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-07 18:11:09 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-14 12:52:56 +0100
commitd0f0e5d9e69cc22f0c6635c7e416de93660c6bca (patch)
tree1877f962137e31bbf93be5c13763488fb2321886 /bitbake/lib/bb/runqueue.py
parentcd7b7de91a98e712e796a8d6a3a8e3741950396e (diff)
downloadpoky-d0f0e5d9e69cc22f0c6635c7e416de93660c6bca.tar.gz
bitbake: runqueue: Split runqueue to use bitbake-worker
This is a pretty fundamental change to the way bitbake operates. It splits out the task execution part of runqueue into a completely separately exec'd process called bitbake-worker. This means that the separate process has to build its own datastore and that configuration needs to be passed from the cooker over to the bitbake worker process. Known issues: * Hob is broken with this patch since it writes to the configuration and that configuration isn't preserved in bitbake-worker. * We create a worker for setscene, then a new worker for the main task execution. This is wasteful but shouldn't be hard to fix. * We probably send too much data over to bitbake-worker, need to see if we can streamline it. These are issues which will be followed up in subsequent patches. This patch sets the groundwork for the removal of the double bitbake execution for psuedo which will be in a follow on patch. (Bitbake rev: b2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py284
1 files changed, 106 insertions, 178 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 090d1b56a2..dd6e071c37 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -28,10 +28,17 @@ import sys
28import signal 28import signal
29import stat 29import stat
30import fcntl 30import fcntl
31import errno
31import logging 32import logging
32import bb 33import bb
33from bb import msg, data, event 34from bb import msg, data, event
34from bb import monitordisk 35from bb import monitordisk
36import subprocess
37
38try:
39 import cPickle as pickle
40except ImportError:
41 import pickle
35 42
36bblogger = logging.getLogger("BitBake") 43bblogger = logging.getLogger("BitBake")
37logger = logging.getLogger("BitBake.RunQueue") 44logger = logging.getLogger("BitBake.RunQueue")
@@ -938,6 +945,10 @@ class RunQueue:
938 raise 945 raise
939 except: 946 except:
940 logger.error("An uncaught exception occured in runqueue, please see the failure below:") 947 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
948 try:
949 self.rqexe.teardown()
950 except:
951 pass
941 self.state = runQueueComplete 952 self.state = runQueueComplete
942 raise 953 raise
943 954
@@ -979,38 +990,41 @@ class RunQueueExecute:
979 self.runq_buildable = [] 990 self.runq_buildable = []
980 self.runq_running = [] 991 self.runq_running = []
981 self.runq_complete = [] 992 self.runq_complete = []
982 self.build_pids = {} 993
983 self.build_pipes = {}
984 self.build_stamps = {} 994 self.build_stamps = {}
985 self.failed_fnids = [] 995 self.failed_fnids = []
986 996
987 self.stampcache = {} 997 self.stampcache = {}
988 998
989 def runqueue_process_waitpid(self): 999 logger.debug(1, "Starting bitbake-worker")
990 """ 1000 self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
991 Return none is there are no processes awaiting result collection, otherwise 1001 bb.utils.nonblockingfd(self.worker.stdout)
992 collect the process exit codes and close the information pipe. 1002 self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self)
993 """ 1003
994 pid, status = os.waitpid(-1, os.WNOHANG) 1004 workerdata = {
995 if pid == 0 or os.WIFSTOPPED(status): 1005 "taskdeps" : self.rqdata.dataCache.task_deps,
996 return None 1006 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
997 1007 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
998 if os.WIFEXITED(status): 1008 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
999 status = os.WEXITSTATUS(status) 1009 "hashes" : self.rqdata.hashes,
1000 elif os.WIFSIGNALED(status): 1010 "hash_deps" : self.rqdata.hash_deps,
1001 # Per shell conventions for $?, when a process exits due to 1011 "sigchecksums" : bb.parse.siggen.file_checksum_values,
1002 # a signal, we return an exit code of 128 + SIGNUM 1012 "runq_hash" : self.rqdata.runq_hash,
1003 status = 128 + os.WTERMSIG(status) 1013 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
1004 1014 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
1005 task = self.build_pids[pid] 1015 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
1006 del self.build_pids[pid] 1016 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1007 1017 }
1008 self.build_pipes[pid].close() 1018
1009 del self.build_pipes[pid] 1019 self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
1020 self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
1021 self.worker.stdin.flush()
1022
1023 def runqueue_process_waitpid(self, task, status):
1010 1024
1011 # self.build_stamps[pid] may not exist when use shared work directory. 1025 # self.build_stamps[pid] may not exist when use shared work directory.
1012 if pid in self.build_stamps: 1026 if task in self.build_stamps:
1013 del self.build_stamps[pid] 1027 del self.build_stamps[task]
1014 1028
1015 if status != 0: 1029 if status != 0:
1016 self.task_fail(task, status) 1030 self.task_fail(task, status)
@@ -1019,16 +1033,11 @@ class RunQueueExecute:
1019 return True 1033 return True
1020 1034
1021 def finish_now(self): 1035 def finish_now(self):
1022 if self.stats.active: 1036
1023 logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) 1037 self.worker.stdin.write("<finishnow></finishnow>")
1024 for k, v in self.build_pids.iteritems(): 1038 self.worker.stdin.flush()
1025 try: 1039
1026 os.kill(-k, signal.SIGTERM) 1040 self.teardown()
1027 os.waitpid(-1, 0)
1028 except:
1029 pass
1030 for pipe in self.build_pipes:
1031 self.build_pipes[pipe].read()
1032 1041
1033 if len(self.failed_fnids) != 0: 1042 if len(self.failed_fnids) != 0:
1034 self.rq.state = runQueueFailed 1043 self.rq.state = runQueueFailed
@@ -1040,14 +1049,13 @@ class RunQueueExecute:
1040 def finish(self): 1049 def finish(self):
1041 self.rq.state = runQueueCleanUp 1050 self.rq.state = runQueueCleanUp
1042 1051
1043 for pipe in self.build_pipes:
1044 self.build_pipes[pipe].read()
1045
1046 if self.stats.active > 0: 1052 if self.stats.active > 0:
1047 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) 1053 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1048 self.runqueue_process_waitpid() 1054 self.workerpipe.read()
1049 return 1055 return
1050 1056
1057 self.teardown()
1058
1051 if len(self.failed_fnids) != 0: 1059 if len(self.failed_fnids) != 0:
1052 self.rq.state = runQueueFailed 1060 self.rq.state = runQueueFailed
1053 return 1061 return
@@ -1055,115 +1063,6 @@ class RunQueueExecute:
1055 self.rq.state = runQueueComplete 1063 self.rq.state = runQueueComplete
1056 return 1064 return
1057 1065
1058 def fork_off_task(self, fn, task, taskname, quieterrors=False):
1059 # We need to setup the environment BEFORE the fork, since
1060 # a fork() or exec*() activates PSEUDO...
1061
1062 envbackup = {}
1063 fakeenv = {}
1064 umask = None
1065
1066 taskdep = self.rqdata.dataCache.task_deps[fn]
1067 if 'umask' in taskdep and taskname in taskdep['umask']:
1068 # umask might come in as a number or text string..
1069 try:
1070 umask = int(taskdep['umask'][taskname],8)
1071 except TypeError:
1072 umask = taskdep['umask'][taskname]
1073
1074 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1075 envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1076 for key, value in (var.split('=') for var in envvars):
1077 envbackup[key] = os.environ.get(key)
1078 os.environ[key] = value
1079 fakeenv[key] = value
1080
1081 fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1082 for p in fakedirs:
1083 bb.utils.mkdirhier(p)
1084
1085 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1086 (fn, taskname, ', '.join(fakedirs)))
1087 else:
1088 envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
1089 for key, value in (var.split('=') for var in envvars):
1090 envbackup[key] = os.environ.get(key)
1091 os.environ[key] = value
1092 fakeenv[key] = value
1093
1094 sys.stdout.flush()
1095 sys.stderr.flush()
1096 try:
1097 pipein, pipeout = os.pipe()
1098 pipein = os.fdopen(pipein, 'rb', 4096)
1099 pipeout = os.fdopen(pipeout, 'wb', 0)
1100 pid = os.fork()
1101 except OSError as e:
1102 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
1103
1104 if pid == 0:
1105 pipein.close()
1106
1107 # Save out the PID so that the event can include it the
1108 # events
1109 bb.event.worker_pid = os.getpid()
1110 bb.event.worker_pipe = pipeout
1111
1112 self.rq.state = runQueueChildProcess
1113 # Make the child the process group leader
1114 os.setpgid(0, 0)
1115 # No stdin
1116 newsi = os.open(os.devnull, os.O_RDWR)
1117 os.dup2(newsi, sys.stdin.fileno())
1118
1119 if umask:
1120 os.umask(umask)
1121
1122 self.cooker.data.setVar("BB_WORKERCONTEXT", "1")
1123 bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1124 ret = 0
1125 try:
1126 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1127 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1128 for h in self.rqdata.hashes:
1129 the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
1130 for h in self.rqdata.hash_deps:
1131 the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
1132
1133 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
1134 # successfully. We also need to unset anything from the environment which shouldn't be there
1135 exports = bb.data.exported_vars(the_data)
1136 bb.utils.empty_environment()
1137 for e, v in exports:
1138 os.environ[e] = v
1139 for e in fakeenv:
1140 os.environ[e] = fakeenv[e]
1141 the_data.setVar(e, fakeenv[e])
1142 the_data.setVarFlag(e, 'export', "1")
1143
1144 if quieterrors:
1145 the_data.setVarFlag(taskname, "quieterrors", "1")
1146
1147 except Exception as exc:
1148 if not quieterrors:
1149 logger.critical(str(exc))
1150 os._exit(1)
1151 try:
1152 if not self.cooker.configuration.dry_run:
1153 profile = self.cooker.configuration.profile
1154 ret = bb.build.exec_task(fn, taskname, the_data, profile)
1155 os._exit(ret)
1156 except:
1157 os._exit(1)
1158 else:
1159 for key, value in envbackup.iteritems():
1160 if value is None:
1161 del os.environ[key]
1162 else:
1163 os.environ[key] = value
1164
1165 return pid, pipein, pipeout
1166
1167 def check_dependencies(self, task, taskdeps, setscene = False): 1066 def check_dependencies(self, task, taskdeps, setscene = False):
1168 if not self.rq.depvalidate: 1067 if not self.rq.depvalidate:
1169 return False 1068 return False
@@ -1184,6 +1083,16 @@ class RunQueueExecute:
1184 valid = bb.utils.better_eval(call, locs) 1083 valid = bb.utils.better_eval(call, locs)
1185 return valid 1084 return valid
1186 1085
1086 def teardown(self):
1087 logger.debug(1, "Teardown for bitbake-worker")
1088 self.worker.stdin.write("<quit></quit>")
1089 self.worker.stdin.flush()
1090 while self.worker.returncode is None:
1091 self.workerpipe.read()
1092 self.worker.poll()
1093 while self.workerpipe.read():
1094 continue
1095
1187class RunQueueExecuteDummy(RunQueueExecute): 1096class RunQueueExecuteDummy(RunQueueExecute):
1188 def __init__(self, rq): 1097 def __init__(self, rq):
1189 self.rq = rq 1098 self.rq = rq
@@ -1275,7 +1184,6 @@ class RunQueueExecuteTasks(RunQueueExecute):
1275 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1184 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1276 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1185 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1277 1186
1278
1279 def get_schedulers(self): 1187 def get_schedulers(self):
1280 schedulers = set(obj for obj in globals().values() 1188 schedulers = set(obj for obj in globals().values()
1281 if type(obj) is type and 1189 if type(obj) is type and
@@ -1349,6 +1257,9 @@ class RunQueueExecuteTasks(RunQueueExecute):
1349 Run the tasks in a queue prepared by rqdata.prepare() 1257 Run the tasks in a queue prepared by rqdata.prepare()
1350 """ 1258 """
1351 1259
1260 self.workerpipe.read()
1261
1262
1352 if self.stats.total == 0: 1263 if self.stats.total == 0:
1353 # nothing to do 1264 # nothing to do
1354 self.rq.state = runQueueCleanUp 1265 self.rq.state = runQueueCleanUp
@@ -1384,23 +1295,20 @@ class RunQueueExecuteTasks(RunQueueExecute):
1384 startevent = runQueueTaskStarted(task, self.stats, self.rq) 1295 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1385 bb.event.fire(startevent, self.cfgData) 1296 bb.event.fire(startevent, self.cfgData)
1386 1297
1387 pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) 1298 self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1299 self.worker.stdin.flush()
1388 1300
1389 self.build_pids[pid] = task 1301 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1390 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1391 self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1392 self.runq_running[task] = 1 1302 self.runq_running[task] = 1
1393 self.stats.taskActive() 1303 self.stats.taskActive()
1394 if self.stats.active < self.number_tasks: 1304 if self.stats.active < self.number_tasks:
1395 return True 1305 return True
1396 1306
1397 for pipe in self.build_pipes:
1398 self.build_pipes[pipe].read()
1399
1400 if self.stats.active > 0: 1307 if self.stats.active > 0:
1401 if self.runqueue_process_waitpid() is None: 1308 self.workerpipe.read()
1402 return 0.5 1309 return 0.5
1403 return True 1310
1311 self.teardown()
1404 1312
1405 if len(self.failed_fnids) != 0: 1313 if len(self.failed_fnids) != 0:
1406 self.rq.state = runQueueFailed 1314 self.rq.state = runQueueFailed
@@ -1415,6 +1323,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
1415 if self.runq_complete[task] == 0: 1323 if self.runq_complete[task] == 0:
1416 logger.error("Task %s never completed!", task) 1324 logger.error("Task %s never completed!", task)
1417 self.rq.state = runQueueComplete 1325 self.rq.state = runQueueComplete
1326
1418 return True 1327 return True
1419 1328
1420class RunQueueExecuteScenequeue(RunQueueExecute): 1329class RunQueueExecuteScenequeue(RunQueueExecute):
@@ -1428,6 +1337,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1428 # If we don't have any setscene functions, skip this step 1337 # If we don't have any setscene functions, skip this step
1429 if len(self.rqdata.runq_setscene) == 0: 1338 if len(self.rqdata.runq_setscene) == 0:
1430 rq.scenequeue_covered = set() 1339 rq.scenequeue_covered = set()
1340 self.teardown()
1431 rq.state = runQueueRunInit 1341 rq.state = runQueueRunInit
1432 return 1342 return
1433 1343
@@ -1676,6 +1586,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1676 Run the tasks in a queue prepared by prepare_runqueue 1586 Run the tasks in a queue prepared by prepare_runqueue
1677 """ 1587 """
1678 1588
1589 self.workerpipe.read()
1590
1679 task = None 1591 task = None
1680 if self.stats.active < self.number_tasks: 1592 if self.stats.active < self.number_tasks:
1681 # Find the next setscene to run 1593 # Find the next setscene to run
@@ -1716,22 +1628,17 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1716 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 1628 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1717 bb.event.fire(startevent, self.cfgData) 1629 bb.event.fire(startevent, self.cfgData)
1718 1630
1719 pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) 1631 self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1632 self.worker.stdin.flush()
1720 1633
1721 self.build_pids[pid] = task
1722 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1723 self.runq_running[task] = 1 1634 self.runq_running[task] = 1
1724 self.stats.taskActive() 1635 self.stats.taskActive()
1725 if self.stats.active < self.number_tasks: 1636 if self.stats.active < self.number_tasks:
1726 return True 1637 return True
1727 1638
1728 for pipe in self.build_pipes:
1729 self.build_pipes[pipe].read()
1730
1731 if self.stats.active > 0: 1639 if self.stats.active > 0:
1732 if self.runqueue_process_waitpid() is None: 1640 self.workerpipe.read()
1733 return 0.5 1641 return 0.5
1734 return True
1735 1642
1736 # Convert scenequeue_covered task numbers into full taskgraph ids 1643 # Convert scenequeue_covered task numbers into full taskgraph ids
1737 oldcovered = self.scenequeue_covered 1644 oldcovered = self.scenequeue_covered
@@ -1745,10 +1652,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1745 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) 1652 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1746 1653
1747 self.rq.state = runQueueRunInit 1654 self.rq.state = runQueueRunInit
1655 self.teardown()
1748 return True 1656 return True
1749 1657
1750 def fork_off_task(self, fn, task, taskname): 1658 def runqueue_process_waitpid(self, task, status):
1751 return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True) 1659 task = self.rq.rqdata.runq_setscene.index(task)
1660
1661 RunQueueExecute.runqueue_process_waitpid(self, task, status)
1752 1662
1753class TaskFailure(Exception): 1663class TaskFailure(Exception):
1754 """ 1664 """
@@ -1828,25 +1738,43 @@ class runQueuePipe():
1828 """ 1738 """
1829 Abstraction for a pipe between a worker thread and the server 1739 Abstraction for a pipe between a worker thread and the server
1830 """ 1740 """
1831 def __init__(self, pipein, pipeout, d): 1741 def __init__(self, pipein, pipeout, d, rq):
1832 self.input = pipein 1742 self.input = pipein
1833 pipeout.close() 1743 if pipeout:
1744 pipeout.close()
1834 bb.utils.nonblockingfd(self.input) 1745 bb.utils.nonblockingfd(self.input)
1835 self.queue = "" 1746 self.queue = ""
1836 self.d = d 1747 self.d = d
1748 self.rq = rq
1749
1750 def setrunqueue(self, rq):
1751 self.rq = rq
1837 1752
1838 def read(self): 1753 def read(self):
1839 start = len(self.queue) 1754 start = len(self.queue)
1840 try: 1755 try:
1841 self.queue = self.queue + self.input.read(102400) 1756 self.queue = self.queue + self.input.read(102400)
1842 except (OSError, IOError): 1757 except (OSError, IOError) as e:
1843 pass 1758 if e.errno != errno.EAGAIN:
1759 raise
1844 end = len(self.queue) 1760 end = len(self.queue)
1845 index = self.queue.find("</event>") 1761 found = True
1846 while index != -1: 1762 while found and len(self.queue):
1847 bb.event.fire_from_worker(self.queue[:index+8], self.d) 1763 found = False
1848 self.queue = self.queue[index+8:]
1849 index = self.queue.find("</event>") 1764 index = self.queue.find("</event>")
1765 while index != -1 and self.queue.startswith("<event>"):
1766 event = pickle.loads(self.queue[7:index])
1767 bb.event.fire_from_worker(event, self.d)
1768 found = True
1769 self.queue = self.queue[index+8:]
1770 index = self.queue.find("</event>")
1771 index = self.queue.find("</exitcode>")
1772 while index != -1 and self.queue.startswith("<exitcode>"):
1773 task, status = pickle.loads(self.queue[10:index])
1774 self.rq.runqueue_process_waitpid(task, status)
1775 found = True
1776 self.queue = self.queue[index+11:]
1777 index = self.queue.find("</exitcode>")
1850 return (end > start) 1778 return (end > start)
1851 1779
1852 def close(self): 1780 def close(self):