#!/usr/bin/env python import os import sys import warnings sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) from bb import fetch2 import logging import bb import select import errno import signal # Users shouldn't be running this code directly if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") sys.exit(1) profiling = False if sys.argv[1] == "decafbadbad": profiling = True try: import cProfile as profile except: import profile logger = logging.getLogger("BitBake") try: import cPickle as pickle except ImportError: import pickle bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") worker_pipe = sys.stdout.fileno() bb.utils.nonblockingfd(worker_pipe) handler = bb.event.LogHandler() logger.addHandler(handler) if 0: # Code to write out a log file of all events passing through the worker logfilename = "/tmp/workerlogfile" format_str = "%(levelname)s: %(message)s" conlogformat = bb.msg.BBLogFormatter(format_str) consolelog = logging.FileHandler(logfilename) bb.msg.addDefaultlogFilter(consolelog) consolelog.setFormatter(conlogformat) logger.addHandler(consolelog) worker_queue = "" def worker_fire(event, d): data = "" + pickle.dumps(event) + "" worker_fire_prepickled(data) def worker_fire_prepickled(event): global worker_queue worker_queue = worker_queue + event worker_flush() def worker_flush(): global worker_queue, worker_pipe if not worker_queue: return try: written = os.write(worker_pipe, worker_queue) worker_queue = worker_queue[written:] except (IOError, OSError) as e: if e.errno != errno.EAGAIN: raise def worker_child_fire(event, d): global worker_pipe data = "" + pickle.dumps(event) + "" worker_pipe.write(data) bb.event.worker_fire = worker_fire lf = None #lf = open("/tmp/workercommandlog", "w+") def workerlog_write(msg): if lf: lf.write(msg) lf.flush() def sigterm_handler(signum, frame): signal.signal(signal.SIGTERM, signal.SIG_DFL) os.killpg(0, signal.SIGTERM) sys.exit() def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): # We need to setup the environment BEFORE the fork, since # a fork() or exec*() activates PSEUDO... envbackup = {} fakeenv = {} umask = None taskdep = workerdata["taskdeps"][fn] if 'umask' in taskdep and taskname in taskdep['umask']: # umask might come in as a number or text string.. try: umask = int(taskdep['umask'][taskname],8) except TypeError: umask = taskdep['umask'][taskname] # We can't use the fakeroot environment in a dry run as it possibly hasn't been built if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run: envvars = (workerdata["fakerootenv"][fn] or "").split() for key, value in (var.split('=') for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value fakedirs = (workerdata["fakerootdirs"][fn] or "").split() for p in fakedirs: bb.utils.mkdirhier(p) logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % (fn, taskname, ', '.join(fakedirs))) else: envvars = (workerdata["fakerootnoenv"][fn] or "").split() for key, value in (var.split('=') for var in envvars): envbackup[key] = os.environ.get(key) os.environ[key] = value fakeenv[key] = value sys.stdout.flush() sys.stderr.flush() try: pipein, pipeout = os.pipe() pipein = os.fdopen(pipein, 'rb', 4096) pipeout = os.fdopen(pipeout, 'wb', 0) pid = os.fork() except OSError as e: bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) if pid == 0: def child(): global worker_pipe pipein.close() signal.signal(signal.SIGTERM, sigterm_handler) # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, sigterm_handler) # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() bb.event.worker_fire = worker_child_fire worker_pipe = pipeout # Make the child the process group leader and ensure no # child process will be controlled by the current terminal # This ensures signals sent to the controlling terminal like Ctrl+C # don't stop the child processes. os.setsid() # No stdin newsi = os.open(os.devnull, os.O_RDWR) os.dup2(newsi, sys.stdin.fileno()) if umask: os.umask(umask) data.setVar("BB_WORKERCONTEXT", "1") data.setVar("BB_TASKDEPDATA", taskdepdata) data.setVar("BUILDNAME", workerdata["buildname"]) data.setVar("DATE", workerdata["date"]) data.setVar("TIME", workerdata["time"]) bb.parse.siggen.set_taskdata(workerdata["sigdata"]) ret = 0 try: the_data = bb.cache.Cache.loadDataFull(fn, appends, data) the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) # exported_vars() returns a generator which *cannot* be passed to os.environ.update() # successfully. We also need to unset anything from the environment which shouldn't be there exports = bb.data.exported_vars(the_data) bb.utils.empty_environment() for e, v in exports: os.environ[e] = v for e in fakeenv: os.environ[e] = fakeenv[e] the_data.setVar(e, fakeenv[e]) the_data.setVarFlag(e, 'export', "1") if quieterrors: the_data.setVarFlag(taskname, "quieterrors", "1") except Exception as exc: if not quieterrors: logger.critical(str(exc)) os._exit(1) try: if cfg.dry_run: return 0 return bb.build.exec_task(fn, taskname, the_data, cfg.profile) except: os._exit(1) if not profiling: os._exit(child()) else: profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) prof = profile.Profile() try: ret = profile.Profile.runcall(prof, child) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) os._exit(ret) else: for key, value in envbackup.iteritems(): if value is None: del os.environ[key] else: os.environ[key] = value return pid, pipein, pipeout class runQueueWorkerPipe(): """ Abstraction for a pipe between a worker thread and the worker server """ def __init__(self, pipein, pipeout): self.input = pipein if pipeout: pipeout.close() bb.utils.nonblockingfd(self.input) self.queue = "" def read(self): start = len(self.queue) try: self.queue = self.queue + self.input.read(102400) except (OSError, IOError) as e: if e.errno != errno.EAGAIN: raise end = len(self.queue) index = self.queue.find("") while index != -1: worker_fire_prepickled(self.queue[:index+8]) self.queue = self.queue[index+8:] index = self.queue.find("") return (end > start) def close(self): while self.read(): continue if len(self.queue) > 0: print("Warning, worker child left partial message: %s" % self.queue) self.input.close() normalexit = False class BitbakeWorker(object): def __init__(self, din): self.input = din bb.utils.nonblockingfd(self.input) self.queue = "" self.cookercfg = None self.databuilder = None self.data = None self.build_pids = {} self.build_pipes = {} signal.signal(signal.SIGTERM, self.sigterm_exception) # Let SIGHUP exit as SIGTERM signal.signal(signal.SIGHUP, self.sigterm_exception) def sigterm_exception(self, signum, stackframe): if signum == signal.SIGTERM: bb.warn("Worker recieved SIGTERM, shutting down...") elif signum == signal.SIGHUP: bb.warn("Worker recieved SIGHUP, shutting down...") self.handle_finishnow(None) signal.signal(signal.SIGTERM, signal.SIG_DFL) os.kill(os.getpid(), signal.SIGTERM) def serve(self): while True: (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) if self.input in ready or len(self.queue): start = len(self.queue) try: self.queue = self.queue + self.input.read() except (OSError, IOError): pass end = len(self.queue) self.handle_item("cookerconfig", self.handle_cookercfg) self.handle_item("workerdata", self.handle_workerdata) self.handle_item("runtask", self.handle_runtask) self.handle_item("finishnow", self.handle_finishnow) self.handle_item("ping", self.handle_ping) self.handle_item("quit", self.handle_quit) for pipe in self.build_pipes: self.build_pipes[pipe].read() if len(self.build_pids): self.process_waitpid() worker_flush() def handle_item(self, item, func): if self.queue.startswith("<" + item + ">"): index = self.queue.find("") while index != -1: func(self.queue[(len(item) + 2):index]) self.queue = self.queue[(index + len(item) + 3):] index = self.queue.find("") def handle_cookercfg(self, data): self.cookercfg = pickle.loads(data) self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) self.databuilder.parseBaseConfiguration() self.data = self.databuilder.data def handle_workerdata(self, data): self.workerdata = pickle.loads(data) bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) def handle_ping(self, _): workerlog_write("Handling ping\n") logger.warn("Pong from bitbake-worker!") def handle_quit(self, data): workerlog_write("Handling quit\n") global normalexit normalexit = True sys.exit(0) def handle_runtask(self, data): fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data) workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors) self.build_pids[pid] = task self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) def process_waitpid(self): """ Return none is there are no processes awaiting result collection, otherwise collect the process exit codes and close the information pipe. """ try: pid, status = os.waitpid(-1, os.WNOHANG) if pid == 0 or os.WIFSTOPPED(status): return None except OSError: return None workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) if os.WIFEXITED(status): status = os.WEXITSTATUS(status) elif os.WIFSIGNALED(status): # Per shell conventions for $?, when a process exits due to # a signal, we return an exit code of 128 + SIGNUM status = 128 + os.WTERMSIG(status) task = self.build_pids[pid] del self.build_pids[pid] self.build_pipes[pid].close() del self.build_pipes[pid] worker_fire_prepickled("" + pickle.dumps((task, status)) + "") def handle_finishnow(self, _): if self.build_pids: logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) for k, v in self.build_pids.iteritems(): try: os.kill(-k, signal.SIGTERM) os.waitpid(-1, 0) except: pass for pipe in self.build_pipes: self.build_pipes[pipe].read() try: worker = BitbakeWorker(sys.stdin) if not profiling: worker.serve() else: profname = "profile-worker.log" prof = profile.Profile() try: profile.Profile.runcall(prof, worker.serve) finally: prof.dump_stats(profname) bb.utils.process_profilelog(profname) except BaseException as e: if not normalexit: import traceback sys.stderr.write(traceback.format_exc()) sys.stderr.write(str(e)) while len(worker_queue): worker_flush() workerlog_write("exitting") sys.exit(0)