diff options
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 284 |
1 files changed, 106 insertions, 178 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 090d1b56a2..dd6e071c37 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -28,10 +28,17 @@ import sys | |||
28 | import signal | 28 | import signal |
29 | import stat | 29 | import stat |
30 | import fcntl | 30 | import fcntl |
31 | import errno | ||
31 | import logging | 32 | import logging |
32 | import bb | 33 | import bb |
33 | from bb import msg, data, event | 34 | from bb import msg, data, event |
34 | from bb import monitordisk | 35 | from bb import monitordisk |
36 | import subprocess | ||
37 | |||
38 | try: | ||
39 | import cPickle as pickle | ||
40 | except ImportError: | ||
41 | import pickle | ||
35 | 42 | ||
36 | bblogger = logging.getLogger("BitBake") | 43 | bblogger = logging.getLogger("BitBake") |
37 | logger = logging.getLogger("BitBake.RunQueue") | 44 | logger = logging.getLogger("BitBake.RunQueue") |
@@ -938,6 +945,10 @@ class RunQueue: | |||
938 | raise | 945 | raise |
939 | except: | 946 | except: |
940 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") | 947 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") |
948 | try: | ||
949 | self.rqexe.teardown() | ||
950 | except: | ||
951 | pass | ||
941 | self.state = runQueueComplete | 952 | self.state = runQueueComplete |
942 | raise | 953 | raise |
943 | 954 | ||
@@ -979,38 +990,41 @@ class RunQueueExecute: | |||
979 | self.runq_buildable = [] | 990 | self.runq_buildable = [] |
980 | self.runq_running = [] | 991 | self.runq_running = [] |
981 | self.runq_complete = [] | 992 | self.runq_complete = [] |
982 | self.build_pids = {} | 993 | |
983 | self.build_pipes = {} | ||
984 | self.build_stamps = {} | 994 | self.build_stamps = {} |
985 | self.failed_fnids = [] | 995 | self.failed_fnids = [] |
986 | 996 | ||
987 | self.stampcache = {} | 997 | self.stampcache = {} |
988 | 998 | ||
989 | def runqueue_process_waitpid(self): | 999 | logger.debug(1, "Starting bitbake-worker") |
990 | """ | 1000 | self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
991 | Return none is there are no processes awaiting result collection, otherwise | 1001 | bb.utils.nonblockingfd(self.worker.stdout) |
992 | collect the process exit codes and close the information pipe. | 1002 | self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self) |
993 | """ | 1003 | |
994 | pid, status = os.waitpid(-1, os.WNOHANG) | 1004 | workerdata = { |
995 | if pid == 0 or os.WIFSTOPPED(status): | 1005 | "taskdeps" : self.rqdata.dataCache.task_deps, |
996 | return None | 1006 | "fakerootenv" : self.rqdata.dataCache.fakerootenv, |
997 | 1007 | "fakerootdirs" : self.rqdata.dataCache.fakerootdirs, | |
998 | if os.WIFEXITED(status): | 1008 | "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv, |
999 | status = os.WEXITSTATUS(status) | 1009 | "hashes" : self.rqdata.hashes, |
1000 | elif os.WIFSIGNALED(status): | 1010 | "hash_deps" : self.rqdata.hash_deps, |
1001 | # Per shell conventions for $?, when a process exits due to | 1011 | "sigchecksums" : bb.parse.siggen.file_checksum_values, |
1002 | # a signal, we return an exit code of 128 + SIGNUM | 1012 | "runq_hash" : self.rqdata.runq_hash, |
1003 | status = 128 + os.WTERMSIG(status) | 1013 | "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel, |
1004 | 1014 | "logdefaultverbose" : bb.msg.loggerDefaultVerbose, | |
1005 | task = self.build_pids[pid] | 1015 | "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, |
1006 | del self.build_pids[pid] | 1016 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, |
1007 | 1017 | } | |
1008 | self.build_pipes[pid].close() | 1018 | |
1009 | del self.build_pipes[pid] | 1019 | self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>") |
1020 | self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>") | ||
1021 | self.worker.stdin.flush() | ||
1022 | |||
1023 | def runqueue_process_waitpid(self, task, status): | ||
1010 | 1024 | ||
1011 | # self.build_stamps[pid] may not exist when use shared work directory. | 1025 | # self.build_stamps[pid] may not exist when use shared work directory. |
1012 | if pid in self.build_stamps: | 1026 | if task in self.build_stamps: |
1013 | del self.build_stamps[pid] | 1027 | del self.build_stamps[task] |
1014 | 1028 | ||
1015 | if status != 0: | 1029 | if status != 0: |
1016 | self.task_fail(task, status) | 1030 | self.task_fail(task, status) |
@@ -1019,16 +1033,11 @@ class RunQueueExecute: | |||
1019 | return True | 1033 | return True |
1020 | 1034 | ||
1021 | def finish_now(self): | 1035 | def finish_now(self): |
1022 | if self.stats.active: | 1036 | |
1023 | logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) | 1037 | self.worker.stdin.write("<finishnow></finishnow>") |
1024 | for k, v in self.build_pids.iteritems(): | 1038 | self.worker.stdin.flush() |
1025 | try: | 1039 | |
1026 | os.kill(-k, signal.SIGTERM) | 1040 | self.teardown() |
1027 | os.waitpid(-1, 0) | ||
1028 | except: | ||
1029 | pass | ||
1030 | for pipe in self.build_pipes: | ||
1031 | self.build_pipes[pipe].read() | ||
1032 | 1041 | ||
1033 | if len(self.failed_fnids) != 0: | 1042 | if len(self.failed_fnids) != 0: |
1034 | self.rq.state = runQueueFailed | 1043 | self.rq.state = runQueueFailed |
@@ -1040,14 +1049,13 @@ class RunQueueExecute: | |||
1040 | def finish(self): | 1049 | def finish(self): |
1041 | self.rq.state = runQueueCleanUp | 1050 | self.rq.state = runQueueCleanUp |
1042 | 1051 | ||
1043 | for pipe in self.build_pipes: | ||
1044 | self.build_pipes[pipe].read() | ||
1045 | |||
1046 | if self.stats.active > 0: | 1052 | if self.stats.active > 0: |
1047 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | 1053 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) |
1048 | self.runqueue_process_waitpid() | 1054 | self.workerpipe.read() |
1049 | return | 1055 | return |
1050 | 1056 | ||
1057 | self.teardown() | ||
1058 | |||
1051 | if len(self.failed_fnids) != 0: | 1059 | if len(self.failed_fnids) != 0: |
1052 | self.rq.state = runQueueFailed | 1060 | self.rq.state = runQueueFailed |
1053 | return | 1061 | return |
@@ -1055,115 +1063,6 @@ class RunQueueExecute: | |||
1055 | self.rq.state = runQueueComplete | 1063 | self.rq.state = runQueueComplete |
1056 | return | 1064 | return |
1057 | 1065 | ||
1058 | def fork_off_task(self, fn, task, taskname, quieterrors=False): | ||
1059 | # We need to setup the environment BEFORE the fork, since | ||
1060 | # a fork() or exec*() activates PSEUDO... | ||
1061 | |||
1062 | envbackup = {} | ||
1063 | fakeenv = {} | ||
1064 | umask = None | ||
1065 | |||
1066 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1067 | if 'umask' in taskdep and taskname in taskdep['umask']: | ||
1068 | # umask might come in as a number or text string.. | ||
1069 | try: | ||
1070 | umask = int(taskdep['umask'][taskname],8) | ||
1071 | except TypeError: | ||
1072 | umask = taskdep['umask'][taskname] | ||
1073 | |||
1074 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']: | ||
1075 | envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split() | ||
1076 | for key, value in (var.split('=') for var in envvars): | ||
1077 | envbackup[key] = os.environ.get(key) | ||
1078 | os.environ[key] = value | ||
1079 | fakeenv[key] = value | ||
1080 | |||
1081 | fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split() | ||
1082 | for p in fakedirs: | ||
1083 | bb.utils.mkdirhier(p) | ||
1084 | |||
1085 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % | ||
1086 | (fn, taskname, ', '.join(fakedirs))) | ||
1087 | else: | ||
1088 | envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split() | ||
1089 | for key, value in (var.split('=') for var in envvars): | ||
1090 | envbackup[key] = os.environ.get(key) | ||
1091 | os.environ[key] = value | ||
1092 | fakeenv[key] = value | ||
1093 | |||
1094 | sys.stdout.flush() | ||
1095 | sys.stderr.flush() | ||
1096 | try: | ||
1097 | pipein, pipeout = os.pipe() | ||
1098 | pipein = os.fdopen(pipein, 'rb', 4096) | ||
1099 | pipeout = os.fdopen(pipeout, 'wb', 0) | ||
1100 | pid = os.fork() | ||
1101 | except OSError as e: | ||
1102 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
1103 | |||
1104 | if pid == 0: | ||
1105 | pipein.close() | ||
1106 | |||
1107 | # Save out the PID so that the event can include it the | ||
1108 | # events | ||
1109 | bb.event.worker_pid = os.getpid() | ||
1110 | bb.event.worker_pipe = pipeout | ||
1111 | |||
1112 | self.rq.state = runQueueChildProcess | ||
1113 | # Make the child the process group leader | ||
1114 | os.setpgid(0, 0) | ||
1115 | # No stdin | ||
1116 | newsi = os.open(os.devnull, os.O_RDWR) | ||
1117 | os.dup2(newsi, sys.stdin.fileno()) | ||
1118 | |||
1119 | if umask: | ||
1120 | os.umask(umask) | ||
1121 | |||
1122 | self.cooker.data.setVar("BB_WORKERCONTEXT", "1") | ||
1123 | bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps) | ||
1124 | ret = 0 | ||
1125 | try: | ||
1126 | the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data) | ||
1127 | the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task]) | ||
1128 | for h in self.rqdata.hashes: | ||
1129 | the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h]) | ||
1130 | for h in self.rqdata.hash_deps: | ||
1131 | the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h]) | ||
1132 | |||
1133 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | ||
1134 | # successfully. We also need to unset anything from the environment which shouldn't be there | ||
1135 | exports = bb.data.exported_vars(the_data) | ||
1136 | bb.utils.empty_environment() | ||
1137 | for e, v in exports: | ||
1138 | os.environ[e] = v | ||
1139 | for e in fakeenv: | ||
1140 | os.environ[e] = fakeenv[e] | ||
1141 | the_data.setVar(e, fakeenv[e]) | ||
1142 | the_data.setVarFlag(e, 'export', "1") | ||
1143 | |||
1144 | if quieterrors: | ||
1145 | the_data.setVarFlag(taskname, "quieterrors", "1") | ||
1146 | |||
1147 | except Exception as exc: | ||
1148 | if not quieterrors: | ||
1149 | logger.critical(str(exc)) | ||
1150 | os._exit(1) | ||
1151 | try: | ||
1152 | if not self.cooker.configuration.dry_run: | ||
1153 | profile = self.cooker.configuration.profile | ||
1154 | ret = bb.build.exec_task(fn, taskname, the_data, profile) | ||
1155 | os._exit(ret) | ||
1156 | except: | ||
1157 | os._exit(1) | ||
1158 | else: | ||
1159 | for key, value in envbackup.iteritems(): | ||
1160 | if value is None: | ||
1161 | del os.environ[key] | ||
1162 | else: | ||
1163 | os.environ[key] = value | ||
1164 | |||
1165 | return pid, pipein, pipeout | ||
1166 | |||
1167 | def check_dependencies(self, task, taskdeps, setscene = False): | 1066 | def check_dependencies(self, task, taskdeps, setscene = False): |
1168 | if not self.rq.depvalidate: | 1067 | if not self.rq.depvalidate: |
1169 | return False | 1068 | return False |
@@ -1184,6 +1083,16 @@ class RunQueueExecute: | |||
1184 | valid = bb.utils.better_eval(call, locs) | 1083 | valid = bb.utils.better_eval(call, locs) |
1185 | return valid | 1084 | return valid |
1186 | 1085 | ||
1086 | def teardown(self): | ||
1087 | logger.debug(1, "Teardown for bitbake-worker") | ||
1088 | self.worker.stdin.write("<quit></quit>") | ||
1089 | self.worker.stdin.flush() | ||
1090 | while self.worker.returncode is None: | ||
1091 | self.workerpipe.read() | ||
1092 | self.worker.poll() | ||
1093 | while self.workerpipe.read(): | ||
1094 | continue | ||
1095 | |||
1187 | class RunQueueExecuteDummy(RunQueueExecute): | 1096 | class RunQueueExecuteDummy(RunQueueExecute): |
1188 | def __init__(self, rq): | 1097 | def __init__(self, rq): |
1189 | self.rq = rq | 1098 | self.rq = rq |
@@ -1275,7 +1184,6 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1275 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % | 1184 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % |
1276 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) | 1185 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) |
1277 | 1186 | ||
1278 | |||
1279 | def get_schedulers(self): | 1187 | def get_schedulers(self): |
1280 | schedulers = set(obj for obj in globals().values() | 1188 | schedulers = set(obj for obj in globals().values() |
1281 | if type(obj) is type and | 1189 | if type(obj) is type and |
@@ -1349,6 +1257,9 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1349 | Run the tasks in a queue prepared by rqdata.prepare() | 1257 | Run the tasks in a queue prepared by rqdata.prepare() |
1350 | """ | 1258 | """ |
1351 | 1259 | ||
1260 | self.workerpipe.read() | ||
1261 | |||
1262 | |||
1352 | if self.stats.total == 0: | 1263 | if self.stats.total == 0: |
1353 | # nothing to do | 1264 | # nothing to do |
1354 | self.rq.state = runQueueCleanUp | 1265 | self.rq.state = runQueueCleanUp |
@@ -1384,23 +1295,20 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1384 | startevent = runQueueTaskStarted(task, self.stats, self.rq) | 1295 | startevent = runQueueTaskStarted(task, self.stats, self.rq) |
1385 | bb.event.fire(startevent, self.cfgData) | 1296 | bb.event.fire(startevent, self.cfgData) |
1386 | 1297 | ||
1387 | pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) | 1298 | self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>") |
1299 | self.worker.stdin.flush() | ||
1388 | 1300 | ||
1389 | self.build_pids[pid] = task | 1301 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) |
1390 | self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) | ||
1391 | self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | ||
1392 | self.runq_running[task] = 1 | 1302 | self.runq_running[task] = 1 |
1393 | self.stats.taskActive() | 1303 | self.stats.taskActive() |
1394 | if self.stats.active < self.number_tasks: | 1304 | if self.stats.active < self.number_tasks: |
1395 | return True | 1305 | return True |
1396 | 1306 | ||
1397 | for pipe in self.build_pipes: | ||
1398 | self.build_pipes[pipe].read() | ||
1399 | |||
1400 | if self.stats.active > 0: | 1307 | if self.stats.active > 0: |
1401 | if self.runqueue_process_waitpid() is None: | 1308 | self.workerpipe.read() |
1402 | return 0.5 | 1309 | return 0.5 |
1403 | return True | 1310 | |
1311 | self.teardown() | ||
1404 | 1312 | ||
1405 | if len(self.failed_fnids) != 0: | 1313 | if len(self.failed_fnids) != 0: |
1406 | self.rq.state = runQueueFailed | 1314 | self.rq.state = runQueueFailed |
@@ -1415,6 +1323,7 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1415 | if self.runq_complete[task] == 0: | 1323 | if self.runq_complete[task] == 0: |
1416 | logger.error("Task %s never completed!", task) | 1324 | logger.error("Task %s never completed!", task) |
1417 | self.rq.state = runQueueComplete | 1325 | self.rq.state = runQueueComplete |
1326 | |||
1418 | return True | 1327 | return True |
1419 | 1328 | ||
1420 | class RunQueueExecuteScenequeue(RunQueueExecute): | 1329 | class RunQueueExecuteScenequeue(RunQueueExecute): |
@@ -1428,6 +1337,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1428 | # If we don't have any setscene functions, skip this step | 1337 | # If we don't have any setscene functions, skip this step |
1429 | if len(self.rqdata.runq_setscene) == 0: | 1338 | if len(self.rqdata.runq_setscene) == 0: |
1430 | rq.scenequeue_covered = set() | 1339 | rq.scenequeue_covered = set() |
1340 | self.teardown() | ||
1431 | rq.state = runQueueRunInit | 1341 | rq.state = runQueueRunInit |
1432 | return | 1342 | return |
1433 | 1343 | ||
@@ -1676,6 +1586,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1676 | Run the tasks in a queue prepared by prepare_runqueue | 1586 | Run the tasks in a queue prepared by prepare_runqueue |
1677 | """ | 1587 | """ |
1678 | 1588 | ||
1589 | self.workerpipe.read() | ||
1590 | |||
1679 | task = None | 1591 | task = None |
1680 | if self.stats.active < self.number_tasks: | 1592 | if self.stats.active < self.number_tasks: |
1681 | # Find the next setscene to run | 1593 | # Find the next setscene to run |
@@ -1716,22 +1628,17 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1716 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) | 1628 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) |
1717 | bb.event.fire(startevent, self.cfgData) | 1629 | bb.event.fire(startevent, self.cfgData) |
1718 | 1630 | ||
1719 | pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) | 1631 | self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>") |
1632 | self.worker.stdin.flush() | ||
1720 | 1633 | ||
1721 | self.build_pids[pid] = task | ||
1722 | self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) | ||
1723 | self.runq_running[task] = 1 | 1634 | self.runq_running[task] = 1 |
1724 | self.stats.taskActive() | 1635 | self.stats.taskActive() |
1725 | if self.stats.active < self.number_tasks: | 1636 | if self.stats.active < self.number_tasks: |
1726 | return True | 1637 | return True |
1727 | 1638 | ||
1728 | for pipe in self.build_pipes: | ||
1729 | self.build_pipes[pipe].read() | ||
1730 | |||
1731 | if self.stats.active > 0: | 1639 | if self.stats.active > 0: |
1732 | if self.runqueue_process_waitpid() is None: | 1640 | self.workerpipe.read() |
1733 | return 0.5 | 1641 | return 0.5 |
1734 | return True | ||
1735 | 1642 | ||
1736 | # Convert scenequeue_covered task numbers into full taskgraph ids | 1643 | # Convert scenequeue_covered task numbers into full taskgraph ids |
1737 | oldcovered = self.scenequeue_covered | 1644 | oldcovered = self.scenequeue_covered |
@@ -1745,10 +1652,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1745 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) | 1652 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) |
1746 | 1653 | ||
1747 | self.rq.state = runQueueRunInit | 1654 | self.rq.state = runQueueRunInit |
1655 | self.teardown() | ||
1748 | return True | 1656 | return True |
1749 | 1657 | ||
1750 | def fork_off_task(self, fn, task, taskname): | 1658 | def runqueue_process_waitpid(self, task, status): |
1751 | return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True) | 1659 | task = self.rq.rqdata.runq_setscene.index(task) |
1660 | |||
1661 | RunQueueExecute.runqueue_process_waitpid(self, task, status) | ||
1752 | 1662 | ||
1753 | class TaskFailure(Exception): | 1663 | class TaskFailure(Exception): |
1754 | """ | 1664 | """ |
@@ -1828,25 +1738,43 @@ class runQueuePipe(): | |||
1828 | """ | 1738 | """ |
1829 | Abstraction for a pipe between a worker thread and the server | 1739 | Abstraction for a pipe between a worker thread and the server |
1830 | """ | 1740 | """ |
1831 | def __init__(self, pipein, pipeout, d): | 1741 | def __init__(self, pipein, pipeout, d, rq): |
1832 | self.input = pipein | 1742 | self.input = pipein |
1833 | pipeout.close() | 1743 | if pipeout: |
1744 | pipeout.close() | ||
1834 | bb.utils.nonblockingfd(self.input) | 1745 | bb.utils.nonblockingfd(self.input) |
1835 | self.queue = "" | 1746 | self.queue = "" |
1836 | self.d = d | 1747 | self.d = d |
1748 | self.rq = rq | ||
1749 | |||
1750 | def setrunqueue(self, rq): | ||
1751 | self.rq = rq | ||
1837 | 1752 | ||
1838 | def read(self): | 1753 | def read(self): |
1839 | start = len(self.queue) | 1754 | start = len(self.queue) |
1840 | try: | 1755 | try: |
1841 | self.queue = self.queue + self.input.read(102400) | 1756 | self.queue = self.queue + self.input.read(102400) |
1842 | except (OSError, IOError): | 1757 | except (OSError, IOError) as e: |
1843 | pass | 1758 | if e.errno != errno.EAGAIN: |
1759 | raise | ||
1844 | end = len(self.queue) | 1760 | end = len(self.queue) |
1845 | index = self.queue.find("</event>") | 1761 | found = True |
1846 | while index != -1: | 1762 | while found and len(self.queue): |
1847 | bb.event.fire_from_worker(self.queue[:index+8], self.d) | 1763 | found = False |
1848 | self.queue = self.queue[index+8:] | ||
1849 | index = self.queue.find("</event>") | 1764 | index = self.queue.find("</event>") |
1765 | while index != -1 and self.queue.startswith("<event>"): | ||
1766 | event = pickle.loads(self.queue[7:index]) | ||
1767 | bb.event.fire_from_worker(event, self.d) | ||
1768 | found = True | ||
1769 | self.queue = self.queue[index+8:] | ||
1770 | index = self.queue.find("</event>") | ||
1771 | index = self.queue.find("</exitcode>") | ||
1772 | while index != -1 and self.queue.startswith("<exitcode>"): | ||
1773 | task, status = pickle.loads(self.queue[10:index]) | ||
1774 | self.rq.runqueue_process_waitpid(task, status) | ||
1775 | found = True | ||
1776 | self.queue = self.queue[index+11:] | ||
1777 | index = self.queue.find("</exitcode>") | ||
1850 | return (end > start) | 1778 | return (end > start) |
1851 | 1779 | ||
1852 | def close(self): | 1780 | def close(self): |