From c527fd1f14c27855a37f2e8ac5346ce8d940ced2 Mon Sep 17 00:00:00 2001 From: Tudor Florea Date: Thu, 16 Oct 2014 03:05:19 +0200 Subject: initial commit for Enea Linux 4.0-140929 Migrated from the internal git server on the daisy-enea-point-release branch Signed-off-by: Tudor Florea --- bitbake/bin/bitbake-worker | 375 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 375 insertions(+) create mode 100755 bitbake/bin/bitbake-worker (limited to 'bitbake/bin/bitbake-worker') diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker new file mode 100755 index 0000000000..68e2bf4571 --- /dev/null +++ b/bitbake/bin/bitbake-worker @@ -0,0 +1,375 @@ +#!/usr/bin/env python + +import os +import sys +import warnings +sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) +from bb import fetch2 +import logging +import bb +import select +import errno +import signal + +# Users shouldn't be running this code directly +if len(sys.argv) != 2 or sys.argv[1] != "decafbad": + print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") + sys.exit(1) + +logger = logging.getLogger("BitBake") + +try: + import cPickle as pickle +except ImportError: + import pickle + bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") + + +worker_pipe = sys.stdout.fileno() +bb.utils.nonblockingfd(worker_pipe) + +handler = bb.event.LogHandler() +logger.addHandler(handler) + +if 0: + # Code to write out a log file of all events passing through the worker + logfilename = "/tmp/workerlogfile" + format_str = "%(levelname)s: %(message)s" + conlogformat = bb.msg.BBLogFormatter(format_str) + consolelog = logging.FileHandler(logfilename) + bb.msg.addDefaultlogFilter(consolelog) + consolelog.setFormatter(conlogformat) + logger.addHandler(consolelog) + +worker_queue = "" + +def worker_fire(event, d): + data = "" + pickle.dumps(event) + "" + worker_fire_prepickled(data) + +def worker_fire_prepickled(event): + global worker_queue + + worker_queue = worker_queue + event + worker_flush() + +def worker_flush(): + global worker_queue, worker_pipe + + if not worker_queue: + return + + try: + written = os.write(worker_pipe, worker_queue) + worker_queue = worker_queue[written:] + except (IOError, OSError) as e: + if e.errno != errno.EAGAIN: + raise + +def worker_child_fire(event, d): + global worker_pipe + + data = "" + pickle.dumps(event) + "" + worker_pipe.write(data) + +bb.event.worker_fire = worker_fire + +lf = None +#lf = open("/tmp/workercommandlog", "w+") +def workerlog_write(msg): + if lf: + lf.write(msg) + lf.flush() + +def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): + # We need to setup the environment BEFORE the fork, since + # a fork() or exec*() activates PSEUDO... + + envbackup = {} + fakeenv = {} + umask = None + + taskdep = workerdata["taskdeps"][fn] + if 'umask' in taskdep and taskname in taskdep['umask']: + # umask might come in as a number or text string.. + try: + umask = int(taskdep['umask'][taskname],8) + except TypeError: + umask = taskdep['umask'][taskname] + + # We can't use the fakeroot environment in a dry run as it possibly hasn't been built + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run: + envvars = (workerdata["fakerootenv"][fn] or "").split() + for key, value in (var.split('=') for var in envvars): + envbackup[key] = os.environ.get(key) + os.environ[key] = value + fakeenv[key] = value + + fakedirs = (workerdata["fakerootdirs"][fn] or "").split() + for p in fakedirs: + bb.utils.mkdirhier(p) + logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % + (fn, taskname, ', '.join(fakedirs))) + else: + envvars = (workerdata["fakerootnoenv"][fn] or "").split() + for key, value in (var.split('=') for var in envvars): + envbackup[key] = os.environ.get(key) + os.environ[key] = value + fakeenv[key] = value + + sys.stdout.flush() + sys.stderr.flush() + + try: + pipein, pipeout = os.pipe() + pipein = os.fdopen(pipein, 'rb', 4096) + pipeout = os.fdopen(pipeout, 'wb', 0) + pid = os.fork() + except OSError as e: + bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) + + if pid == 0: + global worker_pipe + pipein.close() + + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + # Save out the PID so that the event can include it the + # events + bb.event.worker_pid = os.getpid() + bb.event.worker_fire = worker_child_fire + worker_pipe = pipeout + + # Make the child the process group leader + os.setpgid(0, 0) + # No stdin + newsi = os.open(os.devnull, os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + + if umask: + os.umask(umask) + + data.setVar("BB_WORKERCONTEXT", "1") + data.setVar("BB_TASKDEPDATA", taskdepdata) + data.setVar("BUILDNAME", workerdata["buildname"]) + data.setVar("DATE", workerdata["date"]) + data.setVar("TIME", workerdata["time"]) + bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"]) + ret = 0 + try: + the_data = bb.cache.Cache.loadDataFull(fn, appends, data) + the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) + for h in workerdata["hashes"]: + the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h]) + for h in workerdata["hash_deps"]: + the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h]) + + # exported_vars() returns a generator which *cannot* be passed to os.environ.update() + # successfully. We also need to unset anything from the environment which shouldn't be there + exports = bb.data.exported_vars(the_data) + bb.utils.empty_environment() + for e, v in exports: + os.environ[e] = v + for e in fakeenv: + os.environ[e] = fakeenv[e] + the_data.setVar(e, fakeenv[e]) + the_data.setVarFlag(e, 'export', "1") + + if quieterrors: + the_data.setVarFlag(taskname, "quieterrors", "1") + + except Exception as exc: + if not quieterrors: + logger.critical(str(exc)) + os._exit(1) + try: + if not cfg.dry_run: + ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) + os._exit(ret) + except: + os._exit(1) + else: + for key, value in envbackup.iteritems(): + if value is None: + del os.environ[key] + else: + os.environ[key] = value + + return pid, pipein, pipeout + +class runQueueWorkerPipe(): + """ + Abstraction for a pipe between a worker thread and the worker server + """ + def __init__(self, pipein, pipeout): + self.input = pipein + if pipeout: + pipeout.close() + bb.utils.nonblockingfd(self.input) + self.queue = "" + + def read(self): + start = len(self.queue) + try: + self.queue = self.queue + self.input.read(102400) + except (OSError, IOError) as e: + if e.errno != errno.EAGAIN: + raise + + end = len(self.queue) + index = self.queue.find("") + while index != -1: + worker_fire_prepickled(self.queue[:index+8]) + self.queue = self.queue[index+8:] + index = self.queue.find("") + return (end > start) + + def close(self): + while self.read(): + continue + if len(self.queue) > 0: + print("Warning, worker child left partial message: %s" % self.queue) + self.input.close() + +normalexit = False + +class BitbakeWorker(object): + def __init__(self, din): + self.input = din + bb.utils.nonblockingfd(self.input) + self.queue = "" + self.cookercfg = None + self.databuilder = None + self.data = None + self.build_pids = {} + self.build_pipes = {} + + signal.signal(signal.SIGTERM, self.sigterm_exception) + + def sigterm_exception(self, signum, stackframe): + bb.warn("Worker recieved SIGTERM, shutting down...") + self.handle_finishnow(None) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGTERM) + + def serve(self): + while True: + (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) + if self.input in ready or len(self.queue): + start = len(self.queue) + try: + self.queue = self.queue + self.input.read() + except (OSError, IOError): + pass + end = len(self.queue) + self.handle_item("cookerconfig", self.handle_cookercfg) + self.handle_item("workerdata", self.handle_workerdata) + self.handle_item("runtask", self.handle_runtask) + self.handle_item("finishnow", self.handle_finishnow) + self.handle_item("ping", self.handle_ping) + self.handle_item("quit", self.handle_quit) + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + if len(self.build_pids): + self.process_waitpid() + worker_flush() + + + def handle_item(self, item, func): + if self.queue.startswith("<" + item + ">"): + index = self.queue.find("") + while index != -1: + func(self.queue[(len(item) + 2):index]) + self.queue = self.queue[(index + len(item) + 3):] + index = self.queue.find("") + + def handle_cookercfg(self, data): + self.cookercfg = pickle.loads(data) + self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) + self.databuilder.parseBaseConfiguration() + self.data = self.databuilder.data + + def handle_workerdata(self, data): + self.workerdata = pickle.loads(data) + bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] + bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] + bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] + bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] + self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) + + def handle_ping(self, _): + workerlog_write("Handling ping\n") + + logger.warn("Pong from bitbake-worker!") + + def handle_quit(self, data): + workerlog_write("Handling quit\n") + + global normalexit + normalexit = True + sys.exit(0) + + def handle_runtask(self, data): + fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data) + workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) + + pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors) + + self.build_pids[pid] = task + self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) + + def process_waitpid(self): + """ + Return none is there are no processes awaiting result collection, otherwise + collect the process exit codes and close the information pipe. + """ + try: + pid, status = os.waitpid(-1, os.WNOHANG) + if pid == 0 or os.WIFSTOPPED(status): + return None + except OSError: + return None + + workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) + + if os.WIFEXITED(status): + status = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + # Per shell conventions for $?, when a process exits due to + # a signal, we return an exit code of 128 + SIGNUM + status = 128 + os.WTERMSIG(status) + + task = self.build_pids[pid] + del self.build_pids[pid] + + self.build_pipes[pid].close() + del self.build_pipes[pid] + + worker_fire_prepickled("" + pickle.dumps((task, status)) + "") + + def handle_finishnow(self, _): + if self.build_pids: + logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) + for k, v in self.build_pids.iteritems(): + try: + os.kill(-k, signal.SIGTERM) + os.waitpid(-1, 0) + except: + pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + +try: + worker = BitbakeWorker(sys.stdin) + worker.serve() +except BaseException as e: + if not normalexit: + import traceback + sys.stderr.write(traceback.format_exc()) + sys.stderr.write(str(e)) +while len(worker_queue): + worker_flush() +workerlog_write("exitting") +sys.exit(0) + -- cgit v1.2.3-54-g00ecf