diff options
| author | Adrian Dudau <adrian.dudau@enea.com> | 2013-12-12 13:38:32 +0100 |
|---|---|---|
| committer | Adrian Dudau <adrian.dudau@enea.com> | 2013-12-12 13:50:20 +0100 |
| commit | e2e6f6fe07049f33cb6348780fa975162752e421 (patch) | |
| tree | b1813295411235d1297a0ed642b1346b24fdfb12 /bitbake/bin/bitbake-worker | |
| download | poky-e2e6f6fe07049f33cb6348780fa975162752e421.tar.gz | |
initial commit of Enea Linux 3.1
Migrated from the internal git server on the dora-enea branch
Signed-off-by: Adrian Dudau <adrian.dudau@enea.com>
Diffstat (limited to 'bitbake/bin/bitbake-worker')
| -rwxr-xr-x | bitbake/bin/bitbake-worker | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker new file mode 100755 index 0000000000..66b6aabfdb --- /dev/null +++ b/bitbake/bin/bitbake-worker | |||
| @@ -0,0 +1,363 @@ | |||
| 1 | #!/usr/bin/env python | ||
| 2 | |||
| 3 | import os | ||
| 4 | import sys | ||
| 5 | import warnings | ||
| 6 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) | ||
| 7 | from bb import fetch2 | ||
| 8 | import logging | ||
| 9 | import bb | ||
| 10 | import select | ||
| 11 | import errno | ||
| 12 | import signal | ||
| 13 | |||
| 14 | # Users shouldn't be running this code directly | ||
| 15 | if len(sys.argv) != 2 or sys.argv[1] != "decafbad": | ||
| 16 | print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") | ||
| 17 | sys.exit(1) | ||
| 18 | |||
| 19 | logger = logging.getLogger("BitBake") | ||
| 20 | |||
| 21 | try: | ||
| 22 | import cPickle as pickle | ||
| 23 | except ImportError: | ||
| 24 | import pickle | ||
| 25 | bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") | ||
| 26 | |||
| 27 | |||
| 28 | worker_pipe = sys.stdout.fileno() | ||
| 29 | bb.utils.nonblockingfd(worker_pipe) | ||
| 30 | |||
| 31 | handler = bb.event.LogHandler() | ||
| 32 | logger.addHandler(handler) | ||
| 33 | |||
| 34 | if 0: | ||
| 35 | # Code to write out a log file of all events passing through the worker | ||
| 36 | logfilename = "/tmp/workerlogfile" | ||
| 37 | format_str = "%(levelname)s: %(message)s" | ||
| 38 | conlogformat = bb.msg.BBLogFormatter(format_str) | ||
| 39 | consolelog = logging.FileHandler(logfilename) | ||
| 40 | bb.msg.addDefaultlogFilter(consolelog) | ||
| 41 | consolelog.setFormatter(conlogformat) | ||
| 42 | logger.addHandler(consolelog) | ||
| 43 | |||
| 44 | worker_queue = "" | ||
| 45 | |||
| 46 | def worker_fire(event, d): | ||
| 47 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
| 48 | worker_fire_prepickled(data) | ||
| 49 | |||
| 50 | def worker_fire_prepickled(event): | ||
| 51 | global worker_queue | ||
| 52 | |||
| 53 | worker_queue = worker_queue + event | ||
| 54 | worker_flush() | ||
| 55 | |||
| 56 | def worker_flush(): | ||
| 57 | global worker_queue, worker_pipe | ||
| 58 | |||
| 59 | if not worker_queue: | ||
| 60 | return | ||
| 61 | |||
| 62 | try: | ||
| 63 | written = os.write(worker_pipe, worker_queue) | ||
| 64 | worker_queue = worker_queue[written:] | ||
| 65 | except (IOError, OSError) as e: | ||
| 66 | if e.errno != errno.EAGAIN: | ||
| 67 | raise | ||
| 68 | |||
| 69 | def worker_child_fire(event, d): | ||
| 70 | global worker_pipe | ||
| 71 | |||
| 72 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
| 73 | worker_pipe.write(data) | ||
| 74 | |||
| 75 | bb.event.worker_fire = worker_fire | ||
| 76 | |||
| 77 | lf = None | ||
| 78 | #lf = open("/tmp/workercommandlog", "w+") | ||
| 79 | def workerlog_write(msg): | ||
| 80 | if lf: | ||
| 81 | lf.write(msg) | ||
| 82 | lf.flush() | ||
| 83 | |||
| 84 | def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False): | ||
| 85 | # We need to setup the environment BEFORE the fork, since | ||
| 86 | # a fork() or exec*() activates PSEUDO... | ||
| 87 | |||
| 88 | envbackup = {} | ||
| 89 | fakeenv = {} | ||
| 90 | umask = None | ||
| 91 | |||
| 92 | taskdep = workerdata["taskdeps"][fn] | ||
| 93 | if 'umask' in taskdep and taskname in taskdep['umask']: | ||
| 94 | # umask might come in as a number or text string.. | ||
| 95 | try: | ||
| 96 | umask = int(taskdep['umask'][taskname],8) | ||
| 97 | except TypeError: | ||
| 98 | umask = taskdep['umask'][taskname] | ||
| 99 | |||
| 100 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']: | ||
| 101 | envvars = (workerdata["fakerootenv"][fn] or "").split() | ||
| 102 | for key, value in (var.split('=') for var in envvars): | ||
| 103 | envbackup[key] = os.environ.get(key) | ||
| 104 | os.environ[key] = value | ||
| 105 | fakeenv[key] = value | ||
| 106 | |||
| 107 | fakedirs = (workerdata["fakerootdirs"][fn] or "").split() | ||
| 108 | for p in fakedirs: | ||
| 109 | bb.utils.mkdirhier(p) | ||
| 110 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % | ||
| 111 | (fn, taskname, ', '.join(fakedirs))) | ||
| 112 | else: | ||
| 113 | envvars = (workerdata["fakerootnoenv"][fn] or "").split() | ||
| 114 | for key, value in (var.split('=') for var in envvars): | ||
| 115 | envbackup[key] = os.environ.get(key) | ||
| 116 | os.environ[key] = value | ||
| 117 | fakeenv[key] = value | ||
| 118 | |||
| 119 | sys.stdout.flush() | ||
| 120 | sys.stderr.flush() | ||
| 121 | |||
| 122 | try: | ||
| 123 | pipein, pipeout = os.pipe() | ||
| 124 | pipein = os.fdopen(pipein, 'rb', 4096) | ||
| 125 | pipeout = os.fdopen(pipeout, 'wb', 0) | ||
| 126 | pid = os.fork() | ||
| 127 | except OSError as e: | ||
| 128 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
| 129 | |||
| 130 | if pid == 0: | ||
| 131 | global worker_pipe | ||
| 132 | pipein.close() | ||
| 133 | |||
| 134 | # Save out the PID so that the event can include it the | ||
| 135 | # events | ||
| 136 | bb.event.worker_pid = os.getpid() | ||
| 137 | bb.event.worker_fire = worker_child_fire | ||
| 138 | worker_pipe = pipeout | ||
| 139 | |||
| 140 | # Make the child the process group leader | ||
| 141 | os.setpgid(0, 0) | ||
| 142 | # No stdin | ||
| 143 | newsi = os.open(os.devnull, os.O_RDWR) | ||
| 144 | os.dup2(newsi, sys.stdin.fileno()) | ||
| 145 | |||
| 146 | if umask: | ||
| 147 | os.umask(umask) | ||
| 148 | |||
| 149 | data.setVar("BB_WORKERCONTEXT", "1") | ||
| 150 | data.setVar("BUILDNAME", workerdata["buildname"]) | ||
| 151 | data.setVar("DATE", workerdata["date"]) | ||
| 152 | data.setVar("TIME", workerdata["time"]) | ||
| 153 | bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"]) | ||
| 154 | ret = 0 | ||
| 155 | try: | ||
| 156 | the_data = bb.cache.Cache.loadDataFull(fn, appends, data) | ||
| 157 | the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) | ||
| 158 | for h in workerdata["hashes"]: | ||
| 159 | the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h]) | ||
| 160 | for h in workerdata["hash_deps"]: | ||
| 161 | the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h]) | ||
| 162 | |||
| 163 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | ||
| 164 | # successfully. We also need to unset anything from the environment which shouldn't be there | ||
| 165 | exports = bb.data.exported_vars(the_data) | ||
| 166 | bb.utils.empty_environment() | ||
| 167 | for e, v in exports: | ||
| 168 | os.environ[e] = v | ||
| 169 | for e in fakeenv: | ||
| 170 | os.environ[e] = fakeenv[e] | ||
| 171 | the_data.setVar(e, fakeenv[e]) | ||
| 172 | the_data.setVarFlag(e, 'export', "1") | ||
| 173 | |||
| 174 | if quieterrors: | ||
| 175 | the_data.setVarFlag(taskname, "quieterrors", "1") | ||
| 176 | |||
| 177 | except Exception as exc: | ||
| 178 | if not quieterrors: | ||
| 179 | logger.critical(str(exc)) | ||
| 180 | os._exit(1) | ||
| 181 | try: | ||
| 182 | if not cfg.dry_run: | ||
| 183 | ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) | ||
| 184 | os._exit(ret) | ||
| 185 | except: | ||
| 186 | os._exit(1) | ||
| 187 | else: | ||
| 188 | for key, value in envbackup.iteritems(): | ||
| 189 | if value is None: | ||
| 190 | del os.environ[key] | ||
| 191 | else: | ||
| 192 | os.environ[key] = value | ||
| 193 | |||
| 194 | return pid, pipein, pipeout | ||
| 195 | |||
| 196 | class runQueueWorkerPipe(): | ||
| 197 | """ | ||
| 198 | Abstraction for a pipe between a worker thread and the worker server | ||
| 199 | """ | ||
| 200 | def __init__(self, pipein, pipeout): | ||
| 201 | self.input = pipein | ||
| 202 | if pipeout: | ||
| 203 | pipeout.close() | ||
| 204 | bb.utils.nonblockingfd(self.input) | ||
| 205 | self.queue = "" | ||
| 206 | |||
| 207 | def read(self): | ||
| 208 | start = len(self.queue) | ||
| 209 | try: | ||
| 210 | self.queue = self.queue + self.input.read(102400) | ||
| 211 | except (OSError, IOError) as e: | ||
| 212 | if e.errno != errno.EAGAIN: | ||
| 213 | raise | ||
| 214 | |||
| 215 | end = len(self.queue) | ||
| 216 | index = self.queue.find("</event>") | ||
| 217 | while index != -1: | ||
| 218 | worker_fire_prepickled(self.queue[:index+8]) | ||
| 219 | self.queue = self.queue[index+8:] | ||
| 220 | index = self.queue.find("</event>") | ||
| 221 | return (end > start) | ||
| 222 | |||
| 223 | def close(self): | ||
| 224 | while self.read(): | ||
| 225 | continue | ||
| 226 | if len(self.queue) > 0: | ||
| 227 | print("Warning, worker child left partial message: %s" % self.queue) | ||
| 228 | self.input.close() | ||
| 229 | |||
| 230 | normalexit = False | ||
| 231 | |||
| 232 | class BitbakeWorker(object): | ||
| 233 | def __init__(self, din): | ||
| 234 | self.input = din | ||
| 235 | bb.utils.nonblockingfd(self.input) | ||
| 236 | self.queue = "" | ||
| 237 | self.cookercfg = None | ||
| 238 | self.databuilder = None | ||
| 239 | self.data = None | ||
| 240 | self.build_pids = {} | ||
| 241 | self.build_pipes = {} | ||
| 242 | |||
| 243 | def serve(self): | ||
| 244 | while True: | ||
| 245 | (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) | ||
| 246 | if self.input in ready or len(self.queue): | ||
| 247 | start = len(self.queue) | ||
| 248 | try: | ||
| 249 | self.queue = self.queue + self.input.read() | ||
| 250 | except (OSError, IOError): | ||
| 251 | pass | ||
| 252 | end = len(self.queue) | ||
| 253 | self.handle_item("cookerconfig", self.handle_cookercfg) | ||
| 254 | self.handle_item("workerdata", self.handle_workerdata) | ||
| 255 | self.handle_item("runtask", self.handle_runtask) | ||
| 256 | self.handle_item("finishnow", self.handle_finishnow) | ||
| 257 | self.handle_item("ping", self.handle_ping) | ||
| 258 | self.handle_item("quit", self.handle_quit) | ||
| 259 | |||
| 260 | for pipe in self.build_pipes: | ||
| 261 | self.build_pipes[pipe].read() | ||
| 262 | if len(self.build_pids): | ||
| 263 | self.process_waitpid() | ||
| 264 | worker_flush() | ||
| 265 | |||
| 266 | |||
| 267 | def handle_item(self, item, func): | ||
| 268 | if self.queue.startswith("<" + item + ">"): | ||
| 269 | index = self.queue.find("</" + item + ">") | ||
| 270 | while index != -1: | ||
| 271 | func(self.queue[(len(item) + 2):index]) | ||
| 272 | self.queue = self.queue[(index + len(item) + 3):] | ||
| 273 | index = self.queue.find("</" + item + ">") | ||
| 274 | |||
| 275 | def handle_cookercfg(self, data): | ||
| 276 | self.cookercfg = pickle.loads(data) | ||
| 277 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) | ||
| 278 | self.databuilder.parseBaseConfiguration() | ||
| 279 | self.data = self.databuilder.data | ||
| 280 | |||
| 281 | def handle_workerdata(self, data): | ||
| 282 | self.workerdata = pickle.loads(data) | ||
| 283 | bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] | ||
| 284 | bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] | ||
| 285 | bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] | ||
| 286 | bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] | ||
| 287 | self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) | ||
| 288 | |||
| 289 | def handle_ping(self, _): | ||
| 290 | workerlog_write("Handling ping\n") | ||
| 291 | |||
| 292 | logger.warn("Pong from bitbake-worker!") | ||
| 293 | |||
| 294 | def handle_quit(self, data): | ||
| 295 | workerlog_write("Handling quit\n") | ||
| 296 | |||
| 297 | global normalexit | ||
| 298 | normalexit = True | ||
| 299 | sys.exit(0) | ||
| 300 | |||
| 301 | def handle_runtask(self, data): | ||
| 302 | fn, task, taskname, quieterrors, appends = pickle.loads(data) | ||
| 303 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) | ||
| 304 | |||
| 305 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors) | ||
| 306 | |||
| 307 | self.build_pids[pid] = task | ||
| 308 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) | ||
| 309 | |||
| 310 | def process_waitpid(self): | ||
| 311 | """ | ||
| 312 | Return none is there are no processes awaiting result collection, otherwise | ||
| 313 | collect the process exit codes and close the information pipe. | ||
| 314 | """ | ||
| 315 | try: | ||
| 316 | pid, status = os.waitpid(-1, os.WNOHANG) | ||
| 317 | if pid == 0 or os.WIFSTOPPED(status): | ||
| 318 | return None | ||
| 319 | except OSError: | ||
| 320 | return None | ||
| 321 | |||
| 322 | workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) | ||
| 323 | |||
| 324 | if os.WIFEXITED(status): | ||
| 325 | status = os.WEXITSTATUS(status) | ||
| 326 | elif os.WIFSIGNALED(status): | ||
| 327 | # Per shell conventions for $?, when a process exits due to | ||
| 328 | # a signal, we return an exit code of 128 + SIGNUM | ||
| 329 | status = 128 + os.WTERMSIG(status) | ||
| 330 | |||
| 331 | task = self.build_pids[pid] | ||
| 332 | del self.build_pids[pid] | ||
| 333 | |||
| 334 | self.build_pipes[pid].close() | ||
| 335 | del self.build_pipes[pid] | ||
| 336 | |||
| 337 | worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>") | ||
| 338 | |||
| 339 | def handle_finishnow(self, _): | ||
| 340 | if self.build_pids: | ||
| 341 | logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) | ||
| 342 | for k, v in self.build_pids.iteritems(): | ||
| 343 | try: | ||
| 344 | os.kill(-k, signal.SIGTERM) | ||
| 345 | os.waitpid(-1, 0) | ||
| 346 | except: | ||
| 347 | pass | ||
| 348 | for pipe in self.build_pipes: | ||
| 349 | self.build_pipes[pipe].read() | ||
| 350 | |||
| 351 | try: | ||
| 352 | worker = BitbakeWorker(sys.stdin) | ||
| 353 | worker.serve() | ||
| 354 | except BaseException as e: | ||
| 355 | if not normalexit: | ||
| 356 | import traceback | ||
| 357 | sys.stderr.write(traceback.format_exc()) | ||
| 358 | sys.stderr.write(str(e)) | ||
| 359 | while len(worker_queue): | ||
| 360 | worker_flush() | ||
| 361 | workerlog_write("exitting") | ||
| 362 | sys.exit(0) | ||
| 363 | |||
