diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-06-07 18:11:09 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-06-14 12:52:56 +0100 |
commit | d0f0e5d9e69cc22f0c6635c7e416de93660c6bca (patch) | |
tree | 1877f962137e31bbf93be5c13763488fb2321886 /bitbake/lib/bb/runqueue.py | |
parent | cd7b7de91a98e712e796a8d6a3a8e3741950396e (diff) | |
download | poky-d0f0e5d9e69cc22f0c6635c7e416de93660c6bca.tar.gz |
bitbake: runqueue: Split runqueue to use bitbake-worker
This is a pretty fundamental change to the way bitbake operates. It
splits out the task execution part of runqueue into a completely
separately exec'd process called bitbake-worker.
This means that the separate process has to build its own datastore and
that configuration needs to be passed from the cooker over to the
bitbake worker process.
Known issues:
* Hob is broken with this patch since it writes to the configuration
and that configuration isn't preserved in bitbake-worker.
* We create a worker for setscene, then a new worker for the main task
execution. This is wasteful but shouldn't be hard to fix.
* We probably send too much data over to bitbake-worker, need to
see if we can streamline it.
These are issues which will be followed up in subsequent patches.
This patch sets the groundwork for the removal of the double bitbake
execution for psuedo which will be in a follow on patch.
(Bitbake rev: b2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 284 |
1 files changed, 106 insertions, 178 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 090d1b56a2..dd6e071c37 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -28,10 +28,17 @@ import sys | |||
28 | import signal | 28 | import signal |
29 | import stat | 29 | import stat |
30 | import fcntl | 30 | import fcntl |
31 | import errno | ||
31 | import logging | 32 | import logging |
32 | import bb | 33 | import bb |
33 | from bb import msg, data, event | 34 | from bb import msg, data, event |
34 | from bb import monitordisk | 35 | from bb import monitordisk |
36 | import subprocess | ||
37 | |||
38 | try: | ||
39 | import cPickle as pickle | ||
40 | except ImportError: | ||
41 | import pickle | ||
35 | 42 | ||
36 | bblogger = logging.getLogger("BitBake") | 43 | bblogger = logging.getLogger("BitBake") |
37 | logger = logging.getLogger("BitBake.RunQueue") | 44 | logger = logging.getLogger("BitBake.RunQueue") |
@@ -938,6 +945,10 @@ class RunQueue: | |||
938 | raise | 945 | raise |
939 | except: | 946 | except: |
940 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") | 947 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") |
948 | try: | ||
949 | self.rqexe.teardown() | ||
950 | except: | ||
951 | pass | ||
941 | self.state = runQueueComplete | 952 | self.state = runQueueComplete |
942 | raise | 953 | raise |
943 | 954 | ||
@@ -979,38 +990,41 @@ class RunQueueExecute: | |||
979 | self.runq_buildable = [] | 990 | self.runq_buildable = [] |
980 | self.runq_running = [] | 991 | self.runq_running = [] |
981 | self.runq_complete = [] | 992 | self.runq_complete = [] |
982 | self.build_pids = {} | 993 | |
983 | self.build_pipes = {} | ||
984 | self.build_stamps = {} | 994 | self.build_stamps = {} |
985 | self.failed_fnids = [] | 995 | self.failed_fnids = [] |
986 | 996 | ||
987 | self.stampcache = {} | 997 | self.stampcache = {} |
988 | 998 | ||
989 | def runqueue_process_waitpid(self): | 999 | logger.debug(1, "Starting bitbake-worker") |
990 | """ | 1000 | self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
991 | Return none is there are no processes awaiting result collection, otherwise | 1001 | bb.utils.nonblockingfd(self.worker.stdout) |
992 | collect the process exit codes and close the information pipe. | 1002 | self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self) |
993 | """ | 1003 | |
994 | pid, status = os.waitpid(-1, os.WNOHANG) | 1004 | workerdata = { |
995 | if pid == 0 or os.WIFSTOPPED(status): | 1005 | "taskdeps" : self.rqdata.dataCache.task_deps, |
996 | return None | 1006 | "fakerootenv" : self.rqdata.dataCache.fakerootenv, |
997 | 1007 | "fakerootdirs" : self.rqdata.dataCache.fakerootdirs, | |
998 | if os.WIFEXITED(status): | 1008 | "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv, |
999 | status = os.WEXITSTATUS(status) | 1009 | "hashes" : self.rqdata.hashes, |
1000 | elif os.WIFSIGNALED(status): | 1010 | "hash_deps" : self.rqdata.hash_deps, |
1001 | # Per shell conventions for $?, when a process exits due to | 1011 | "sigchecksums" : bb.parse.siggen.file_checksum_values, |
1002 | # a signal, we return an exit code of 128 + SIGNUM | 1012 | "runq_hash" : self.rqdata.runq_hash, |
1003 | status = 128 + os.WTERMSIG(status) | 1013 | "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel, |
1004 | 1014 | "logdefaultverbose" : bb.msg.loggerDefaultVerbose, | |
1005 | task = self.build_pids[pid] | 1015 | "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, |
1006 | del self.build_pids[pid] | 1016 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, |
1007 | 1017 | } | |
1008 | self.build_pipes[pid].close() | 1018 | |
1009 | del self.build_pipes[pid] | 1019 | self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>") |
1020 | self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>") | ||
1021 | self.worker.stdin.flush() | ||
1022 | |||
1023 | def runqueue_process_waitpid(self, task, status): | ||
1010 | 1024 | ||
1011 | # self.build_stamps[pid] may not exist when use shared work directory. | 1025 | # self.build_stamps[pid] may not exist when use shared work directory. |
1012 | if pid in self.build_stamps: | 1026 | if task in self.build_stamps: |
1013 | del self.build_stamps[pid] | 1027 | del self.build_stamps[task] |
1014 | 1028 | ||
1015 | if status != 0: | 1029 | if status != 0: |
1016 | self.task_fail(task, status) | 1030 | self.task_fail(task, status) |
@@ -1019,16 +1033,11 @@ class RunQueueExecute: | |||
1019 | return True | 1033 | return True |
1020 | 1034 | ||
1021 | def finish_now(self): | 1035 | def finish_now(self): |
1022 | if self.stats.active: | 1036 | |
1023 | logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) | 1037 | self.worker.stdin.write("<finishnow></finishnow>") |
1024 | for k, v in self.build_pids.iteritems(): | 1038 | self.worker.stdin.flush() |
1025 | try: | 1039 | |
1026 | os.kill(-k, signal.SIGTERM) | 1040 | self.teardown() |
1027 | os.waitpid(-1, 0) | ||
1028 | except: | ||
1029 | pass | ||
1030 | for pipe in self.build_pipes: | ||
1031 | self.build_pipes[pipe].read() | ||
1032 | 1041 | ||
1033 | if len(self.failed_fnids) != 0: | 1042 | if len(self.failed_fnids) != 0: |
1034 | self.rq.state = runQueueFailed | 1043 | self.rq.state = runQueueFailed |
@@ -1040,14 +1049,13 @@ class RunQueueExecute: | |||
1040 | def finish(self): | 1049 | def finish(self): |
1041 | self.rq.state = runQueueCleanUp | 1050 | self.rq.state = runQueueCleanUp |
1042 | 1051 | ||
1043 | for pipe in self.build_pipes: | ||
1044 | self.build_pipes[pipe].read() | ||
1045 | |||
1046 | if self.stats.active > 0: | 1052 | if self.stats.active > 0: |
1047 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | 1053 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) |
1048 | self.runqueue_process_waitpid() | 1054 | self.workerpipe.read() |
1049 | return | 1055 | return |
1050 | 1056 | ||
1057 | self.teardown() | ||
1058 | |||
1051 | if len(self.failed_fnids) != 0: | 1059 | if len(self.failed_fnids) != 0: |
1052 | self.rq.state = runQueueFailed | 1060 | self.rq.state = runQueueFailed |
1053 | return | 1061 | return |
@@ -1055,115 +1063,6 @@ class RunQueueExecute: | |||
1055 | self.rq.state = runQueueComplete | 1063 | self.rq.state = runQueueComplete |
1056 | return | 1064 | return |
1057 | 1065 | ||
1058 | def fork_off_task(self, fn, task, taskname, quieterrors=False): | ||
1059 | # We need to setup the environment BEFORE the fork, since | ||
1060 | # a fork() or exec*() activates PSEUDO... | ||
1061 | |||
1062 | envbackup = {} | ||
1063 | fakeenv = {} | ||
1064 | umask = None | ||
1065 | |||
1066 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1067 | if 'umask' in taskdep and taskname in taskdep['umask']: | ||
1068 | # umask might come in as a number or text string.. | ||
1069 | try: | ||
1070 | umask = int(taskdep['umask'][taskname],8) | ||
1071 | except TypeError: | ||
1072 | umask = taskdep['umask'][taskname] | ||
1073 | |||
1074 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']: | ||
1075 | envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split() | ||
1076 | for key, value in (var.split('=') for var in envvars): | ||
1077 | envbackup[key] = os.environ.get(key) | ||
1078 | os.environ[key] = value | ||
1079 | fakeenv[key] = value | ||
1080 | |||
1081 | fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split() | ||
1082 | for p in fakedirs: | ||
1083 | bb.utils.mkdirhier(p) | ||
1084 | |||
1085 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % | ||
1086 | (fn, taskname, ', '.join(fakedirs))) | ||
1087 | else: | ||
1088 | envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split() | ||
1089 | for key, value in (var.split('=') for var in envvars): | ||
1090 | envbackup[key] = os.environ.get(key) | ||
1091 | os.environ[key] = value | ||
1092 | fakeenv[key] = value | ||
1093 | |||
1094 | sys.stdout.flush() | ||
1095 | sys.stderr.flush() | ||
1096 | try: | ||
1097 | pipein, pipeout = os.pipe() | ||
1098 | pipein = os.fdopen(pipein, 'rb', 4096) | ||
1099 | pipeout = os.fdopen(pipeout, 'wb', 0) | ||
1100 | pid = os.fork() | ||
1101 | except OSError as e: | ||
1102 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
1103 | |||
1104 | if pid == 0: | ||
1105 | pipein.close() | ||
1106 | |||
1107 | # Save out the PID so that the event can include it the | ||
1108 | # events | ||
1109 | bb.event.worker_pid = os.getpid() | ||
1110 | bb.event.worker_pipe = pipeout | ||
1111 | |||
1112 | self.rq.state = runQueueChildProcess | ||
1113 | # Make the child the process group leader | ||
1114 | os.setpgid(0, 0) | ||
1115 | # No stdin | ||
1116 | newsi = os.open(os.devnull, os.O_RDWR) | ||
1117 | os.dup2(newsi, sys.stdin.fileno()) | ||
1118 | |||
1119 | if umask: | ||
1120 | os.umask(umask) | ||
1121 | |||
1122 | self.cooker.data.setVar("BB_WORKERCONTEXT", "1") | ||
1123 | bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps) | ||
1124 | ret = 0 | ||
1125 | try: | ||
1126 | the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data) | ||
1127 | the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task]) | ||
1128 | for h in self.rqdata.hashes: | ||
1129 | the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h]) | ||
1130 | for h in self.rqdata.hash_deps: | ||
1131 | the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h]) | ||
1132 | |||
1133 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | ||
1134 | # successfully. We also need to unset anything from the environment which shouldn't be there | ||
1135 | exports = bb.data.exported_vars(the_data) | ||
1136 | bb.utils.empty_environment() | ||
1137 | for e, v in exports: | ||
1138 | os.environ[e] = v | ||
1139 | for e in fakeenv: | ||
1140 | os.environ[e] = fakeenv[e] | ||
1141 | the_data.setVar(e, fakeenv[e]) | ||
1142 | the_data.setVarFlag(e, 'export', "1") | ||
1143 | |||
1144 | if quieterrors: | ||
1145 | the_data.setVarFlag(taskname, "quieterrors", "1") | ||
1146 | |||
1147 | except Exception as exc: | ||
1148 | if not quieterrors: | ||
1149 | logger.critical(str(exc)) | ||
1150 | os._exit(1) | ||
1151 | try: | ||
1152 | if not self.cooker.configuration.dry_run: | ||
1153 | profile = self.cooker.configuration.profile | ||
1154 | ret = bb.build.exec_task(fn, taskname, the_data, profile) | ||
1155 | os._exit(ret) | ||
1156 | except: | ||
1157 | os._exit(1) | ||
1158 | else: | ||
1159 | for key, value in envbackup.iteritems(): | ||
1160 | if value is None: | ||
1161 | del os.environ[key] | ||
1162 | else: | ||
1163 | os.environ[key] = value | ||
1164 | |||
1165 | return pid, pipein, pipeout | ||
1166 | |||
1167 | def check_dependencies(self, task, taskdeps, setscene = False): | 1066 | def check_dependencies(self, task, taskdeps, setscene = False): |
1168 | if not self.rq.depvalidate: | 1067 | if not self.rq.depvalidate: |
1169 | return False | 1068 | return False |
@@ -1184,6 +1083,16 @@ class RunQueueExecute: | |||
1184 | valid = bb.utils.better_eval(call, locs) | 1083 | valid = bb.utils.better_eval(call, locs) |
1185 | return valid | 1084 | return valid |
1186 | 1085 | ||
1086 | def teardown(self): | ||
1087 | logger.debug(1, "Teardown for bitbake-worker") | ||
1088 | self.worker.stdin.write("<quit></quit>") | ||
1089 | self.worker.stdin.flush() | ||
1090 | while self.worker.returncode is None: | ||
1091 | self.workerpipe.read() | ||
1092 | self.worker.poll() | ||
1093 | while self.workerpipe.read(): | ||
1094 | continue | ||
1095 | |||
1187 | class RunQueueExecuteDummy(RunQueueExecute): | 1096 | class RunQueueExecuteDummy(RunQueueExecute): |
1188 | def __init__(self, rq): | 1097 | def __init__(self, rq): |
1189 | self.rq = rq | 1098 | self.rq = rq |
@@ -1275,7 +1184,6 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1275 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % | 1184 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % |
1276 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) | 1185 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) |
1277 | 1186 | ||
1278 | |||
1279 | def get_schedulers(self): | 1187 | def get_schedulers(self): |
1280 | schedulers = set(obj for obj in globals().values() | 1188 | schedulers = set(obj for obj in globals().values() |
1281 | if type(obj) is type and | 1189 | if type(obj) is type and |
@@ -1349,6 +1257,9 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1349 | Run the tasks in a queue prepared by rqdata.prepare() | 1257 | Run the tasks in a queue prepared by rqdata.prepare() |
1350 | """ | 1258 | """ |
1351 | 1259 | ||
1260 | self.workerpipe.read() | ||
1261 | |||
1262 | |||
1352 | if self.stats.total == 0: | 1263 | if self.stats.total == 0: |
1353 | # nothing to do | 1264 | # nothing to do |
1354 | self.rq.state = runQueueCleanUp | 1265 | self.rq.state = runQueueCleanUp |
@@ -1384,23 +1295,20 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1384 | startevent = runQueueTaskStarted(task, self.stats, self.rq) | 1295 | startevent = runQueueTaskStarted(task, self.stats, self.rq) |
1385 | bb.event.fire(startevent, self.cfgData) | 1296 | bb.event.fire(startevent, self.cfgData) |
1386 | 1297 | ||
1387 | pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) | 1298 | self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>") |
1299 | self.worker.stdin.flush() | ||
1388 | 1300 | ||
1389 | self.build_pids[pid] = task | 1301 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) |
1390 | self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) | ||
1391 | self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | ||
1392 | self.runq_running[task] = 1 | 1302 | self.runq_running[task] = 1 |
1393 | self.stats.taskActive() | 1303 | self.stats.taskActive() |
1394 | if self.stats.active < self.number_tasks: | 1304 | if self.stats.active < self.number_tasks: |
1395 | return True | 1305 | return True |
1396 | 1306 | ||
1397 | for pipe in self.build_pipes: | ||
1398 | self.build_pipes[pipe].read() | ||
1399 | |||
1400 | if self.stats.active > 0: | 1307 | if self.stats.active > 0: |
1401 | if self.runqueue_process_waitpid() is None: | 1308 | self.workerpipe.read() |
1402 | return 0.5 | 1309 | return 0.5 |
1403 | return True | 1310 | |
1311 | self.teardown() | ||
1404 | 1312 | ||
1405 | if len(self.failed_fnids) != 0: | 1313 | if len(self.failed_fnids) != 0: |
1406 | self.rq.state = runQueueFailed | 1314 | self.rq.state = runQueueFailed |
@@ -1415,6 +1323,7 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1415 | if self.runq_complete[task] == 0: | 1323 | if self.runq_complete[task] == 0: |
1416 | logger.error("Task %s never completed!", task) | 1324 | logger.error("Task %s never completed!", task) |
1417 | self.rq.state = runQueueComplete | 1325 | self.rq.state = runQueueComplete |
1326 | |||
1418 | return True | 1327 | return True |
1419 | 1328 | ||
1420 | class RunQueueExecuteScenequeue(RunQueueExecute): | 1329 | class RunQueueExecuteScenequeue(RunQueueExecute): |
@@ -1428,6 +1337,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1428 | # If we don't have any setscene functions, skip this step | 1337 | # If we don't have any setscene functions, skip this step |
1429 | if len(self.rqdata.runq_setscene) == 0: | 1338 | if len(self.rqdata.runq_setscene) == 0: |
1430 | rq.scenequeue_covered = set() | 1339 | rq.scenequeue_covered = set() |
1340 | self.teardown() | ||
1431 | rq.state = runQueueRunInit | 1341 | rq.state = runQueueRunInit |
1432 | return | 1342 | return |
1433 | 1343 | ||
@@ -1676,6 +1586,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1676 | Run the tasks in a queue prepared by prepare_runqueue | 1586 | Run the tasks in a queue prepared by prepare_runqueue |
1677 | """ | 1587 | """ |
1678 | 1588 | ||
1589 | self.workerpipe.read() | ||
1590 | |||
1679 | task = None | 1591 | task = None |
1680 | if self.stats.active < self.number_tasks: | 1592 | if self.stats.active < self.number_tasks: |
1681 | # Find the next setscene to run | 1593 | # Find the next setscene to run |
@@ -1716,22 +1628,17 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1716 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) | 1628 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) |
1717 | bb.event.fire(startevent, self.cfgData) | 1629 | bb.event.fire(startevent, self.cfgData) |
1718 | 1630 | ||
1719 | pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) | 1631 | self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>") |
1632 | self.worker.stdin.flush() | ||
1720 | 1633 | ||
1721 | self.build_pids[pid] = task | ||
1722 | self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) | ||
1723 | self.runq_running[task] = 1 | 1634 | self.runq_running[task] = 1 |
1724 | self.stats.taskActive() | 1635 | self.stats.taskActive() |
1725 | if self.stats.active < self.number_tasks: | 1636 | if self.stats.active < self.number_tasks: |
1726 | return True | 1637 | return True |
1727 | 1638 | ||
1728 | for pipe in self.build_pipes: | ||
1729 | self.build_pipes[pipe].read() | ||
1730 | |||
1731 | if self.stats.active > 0: | 1639 | if self.stats.active > 0: |
1732 | if self.runqueue_process_waitpid() is None: | 1640 | self.workerpipe.read() |
1733 | return 0.5 | 1641 | return 0.5 |
1734 | return True | ||
1735 | 1642 | ||
1736 | # Convert scenequeue_covered task numbers into full taskgraph ids | 1643 | # Convert scenequeue_covered task numbers into full taskgraph ids |
1737 | oldcovered = self.scenequeue_covered | 1644 | oldcovered = self.scenequeue_covered |
@@ -1745,10 +1652,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1745 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) | 1652 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) |
1746 | 1653 | ||
1747 | self.rq.state = runQueueRunInit | 1654 | self.rq.state = runQueueRunInit |
1655 | self.teardown() | ||
1748 | return True | 1656 | return True |
1749 | 1657 | ||
1750 | def fork_off_task(self, fn, task, taskname): | 1658 | def runqueue_process_waitpid(self, task, status): |
1751 | return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True) | 1659 | task = self.rq.rqdata.runq_setscene.index(task) |
1660 | |||
1661 | RunQueueExecute.runqueue_process_waitpid(self, task, status) | ||
1752 | 1662 | ||
1753 | class TaskFailure(Exception): | 1663 | class TaskFailure(Exception): |
1754 | """ | 1664 | """ |
@@ -1828,25 +1738,43 @@ class runQueuePipe(): | |||
1828 | """ | 1738 | """ |
1829 | Abstraction for a pipe between a worker thread and the server | 1739 | Abstraction for a pipe between a worker thread and the server |
1830 | """ | 1740 | """ |
1831 | def __init__(self, pipein, pipeout, d): | 1741 | def __init__(self, pipein, pipeout, d, rq): |
1832 | self.input = pipein | 1742 | self.input = pipein |
1833 | pipeout.close() | 1743 | if pipeout: |
1744 | pipeout.close() | ||
1834 | bb.utils.nonblockingfd(self.input) | 1745 | bb.utils.nonblockingfd(self.input) |
1835 | self.queue = "" | 1746 | self.queue = "" |
1836 | self.d = d | 1747 | self.d = d |
1748 | self.rq = rq | ||
1749 | |||
1750 | def setrunqueue(self, rq): | ||
1751 | self.rq = rq | ||
1837 | 1752 | ||
1838 | def read(self): | 1753 | def read(self): |
1839 | start = len(self.queue) | 1754 | start = len(self.queue) |
1840 | try: | 1755 | try: |
1841 | self.queue = self.queue + self.input.read(102400) | 1756 | self.queue = self.queue + self.input.read(102400) |
1842 | except (OSError, IOError): | 1757 | except (OSError, IOError) as e: |
1843 | pass | 1758 | if e.errno != errno.EAGAIN: |
1759 | raise | ||
1844 | end = len(self.queue) | 1760 | end = len(self.queue) |
1845 | index = self.queue.find("</event>") | 1761 | found = True |
1846 | while index != -1: | 1762 | while found and len(self.queue): |
1847 | bb.event.fire_from_worker(self.queue[:index+8], self.d) | 1763 | found = False |
1848 | self.queue = self.queue[index+8:] | ||
1849 | index = self.queue.find("</event>") | 1764 | index = self.queue.find("</event>") |
1765 | while index != -1 and self.queue.startswith("<event>"): | ||
1766 | event = pickle.loads(self.queue[7:index]) | ||
1767 | bb.event.fire_from_worker(event, self.d) | ||
1768 | found = True | ||
1769 | self.queue = self.queue[index+8:] | ||
1770 | index = self.queue.find("</event>") | ||
1771 | index = self.queue.find("</exitcode>") | ||
1772 | while index != -1 and self.queue.startswith("<exitcode>"): | ||
1773 | task, status = pickle.loads(self.queue[10:index]) | ||
1774 | self.rq.runqueue_process_waitpid(task, status) | ||
1775 | found = True | ||
1776 | self.queue = self.queue[index+11:] | ||
1777 | index = self.queue.find("</exitcode>") | ||
1850 | return (end > start) | 1778 | return (end > start) |
1851 | 1779 | ||
1852 | def close(self): | 1780 | def close(self): |