diff options
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 415 |
1 files changed, 415 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker new file mode 100755 index 0000000000..8a24161250 --- /dev/null +++ b/bitbake/bin/bitbake-worker | |||
@@ -0,0 +1,415 @@ | |||
1 | #!/usr/bin/env python | ||
2 | |||
3 | import os | ||
4 | import sys | ||
5 | import warnings | ||
6 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) | ||
7 | from bb import fetch2 | ||
8 | import logging | ||
9 | import bb | ||
10 | import select | ||
11 | import errno | ||
12 | import signal | ||
13 | |||
14 | # Users shouldn't be running this code directly | ||
15 | if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): | ||
16 | print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") | ||
17 | sys.exit(1) | ||
18 | |||
19 | profiling = False | ||
20 | if sys.argv[1] == "decafbadbad": | ||
21 | profiling = True | ||
22 | try: | ||
23 | import cProfile as profile | ||
24 | except: | ||
25 | import profile | ||
26 | |||
27 | logger = logging.getLogger("BitBake") | ||
28 | |||
29 | try: | ||
30 | import cPickle as pickle | ||
31 | except ImportError: | ||
32 | import pickle | ||
33 | bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") | ||
34 | |||
35 | |||
36 | worker_pipe = sys.stdout.fileno() | ||
37 | bb.utils.nonblockingfd(worker_pipe) | ||
38 | |||
39 | handler = bb.event.LogHandler() | ||
40 | logger.addHandler(handler) | ||
41 | |||
42 | if 0: | ||
43 | # Code to write out a log file of all events passing through the worker | ||
44 | logfilename = "/tmp/workerlogfile" | ||
45 | format_str = "%(levelname)s: %(message)s" | ||
46 | conlogformat = bb.msg.BBLogFormatter(format_str) | ||
47 | consolelog = logging.FileHandler(logfilename) | ||
48 | bb.msg.addDefaultlogFilter(consolelog) | ||
49 | consolelog.setFormatter(conlogformat) | ||
50 | logger.addHandler(consolelog) | ||
51 | |||
52 | worker_queue = "" | ||
53 | |||
54 | def worker_fire(event, d): | ||
55 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
56 | worker_fire_prepickled(data) | ||
57 | |||
58 | def worker_fire_prepickled(event): | ||
59 | global worker_queue | ||
60 | |||
61 | worker_queue = worker_queue + event | ||
62 | worker_flush() | ||
63 | |||
64 | def worker_flush(): | ||
65 | global worker_queue, worker_pipe | ||
66 | |||
67 | if not worker_queue: | ||
68 | return | ||
69 | |||
70 | try: | ||
71 | written = os.write(worker_pipe, worker_queue) | ||
72 | worker_queue = worker_queue[written:] | ||
73 | except (IOError, OSError) as e: | ||
74 | if e.errno != errno.EAGAIN: | ||
75 | raise | ||
76 | |||
77 | def worker_child_fire(event, d): | ||
78 | global worker_pipe | ||
79 | |||
80 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
81 | worker_pipe.write(data) | ||
82 | |||
83 | bb.event.worker_fire = worker_fire | ||
84 | |||
85 | lf = None | ||
86 | #lf = open("/tmp/workercommandlog", "w+") | ||
87 | def workerlog_write(msg): | ||
88 | if lf: | ||
89 | lf.write(msg) | ||
90 | lf.flush() | ||
91 | |||
92 | def sigterm_handler(signum, frame): | ||
93 | signal.signal(signal.SIGTERM, signal.SIG_DFL) | ||
94 | os.killpg(0, signal.SIGTERM) | ||
95 | sys.exit() | ||
96 | |||
97 | def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): | ||
98 | # We need to setup the environment BEFORE the fork, since | ||
99 | # a fork() or exec*() activates PSEUDO... | ||
100 | |||
101 | envbackup = {} | ||
102 | fakeenv = {} | ||
103 | umask = None | ||
104 | |||
105 | taskdep = workerdata["taskdeps"][fn] | ||
106 | if 'umask' in taskdep and taskname in taskdep['umask']: | ||
107 | # umask might come in as a number or text string.. | ||
108 | try: | ||
109 | umask = int(taskdep['umask'][taskname],8) | ||
110 | except TypeError: | ||
111 | umask = taskdep['umask'][taskname] | ||
112 | |||
113 | # We can't use the fakeroot environment in a dry run as it possibly hasn't been built | ||
114 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run: | ||
115 | envvars = (workerdata["fakerootenv"][fn] or "").split() | ||
116 | for key, value in (var.split('=') for var in envvars): | ||
117 | envbackup[key] = os.environ.get(key) | ||
118 | os.environ[key] = value | ||
119 | fakeenv[key] = value | ||
120 | |||
121 | fakedirs = (workerdata["fakerootdirs"][fn] or "").split() | ||
122 | for p in fakedirs: | ||
123 | bb.utils.mkdirhier(p) | ||
124 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % | ||
125 | (fn, taskname, ', '.join(fakedirs))) | ||
126 | else: | ||
127 | envvars = (workerdata["fakerootnoenv"][fn] or "").split() | ||
128 | for key, value in (var.split('=') for var in envvars): | ||
129 | envbackup[key] = os.environ.get(key) | ||
130 | os.environ[key] = value | ||
131 | fakeenv[key] = value | ||
132 | |||
133 | sys.stdout.flush() | ||
134 | sys.stderr.flush() | ||
135 | |||
136 | try: | ||
137 | pipein, pipeout = os.pipe() | ||
138 | pipein = os.fdopen(pipein, 'rb', 4096) | ||
139 | pipeout = os.fdopen(pipeout, 'wb', 0) | ||
140 | pid = os.fork() | ||
141 | except OSError as e: | ||
142 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
143 | |||
144 | if pid == 0: | ||
145 | def child(): | ||
146 | global worker_pipe | ||
147 | pipein.close() | ||
148 | |||
149 | signal.signal(signal.SIGTERM, sigterm_handler) | ||
150 | # Let SIGHUP exit as SIGTERM | ||
151 | signal.signal(signal.SIGHUP, sigterm_handler) | ||
152 | |||
153 | # Save out the PID so that the event can include it the | ||
154 | # events | ||
155 | bb.event.worker_pid = os.getpid() | ||
156 | bb.event.worker_fire = worker_child_fire | ||
157 | worker_pipe = pipeout | ||
158 | |||
159 | # Make the child the process group leader and ensure no | ||
160 | # child process will be controlled by the current terminal | ||
161 | # This ensures signals sent to the controlling terminal like Ctrl+C | ||
162 | # don't stop the child processes. | ||
163 | os.setsid() | ||
164 | # No stdin | ||
165 | newsi = os.open(os.devnull, os.O_RDWR) | ||
166 | os.dup2(newsi, sys.stdin.fileno()) | ||
167 | |||
168 | if umask: | ||
169 | os.umask(umask) | ||
170 | |||
171 | data.setVar("BB_WORKERCONTEXT", "1") | ||
172 | data.setVar("BB_TASKDEPDATA", taskdepdata) | ||
173 | data.setVar("BUILDNAME", workerdata["buildname"]) | ||
174 | data.setVar("DATE", workerdata["date"]) | ||
175 | data.setVar("TIME", workerdata["time"]) | ||
176 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) | ||
177 | ret = 0 | ||
178 | try: | ||
179 | the_data = bb.cache.Cache.loadDataFull(fn, appends, data) | ||
180 | the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) | ||
181 | |||
182 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | ||
183 | # successfully. We also need to unset anything from the environment which shouldn't be there | ||
184 | exports = bb.data.exported_vars(the_data) | ||
185 | bb.utils.empty_environment() | ||
186 | for e, v in exports: | ||
187 | os.environ[e] = v | ||
188 | for e in fakeenv: | ||
189 | os.environ[e] = fakeenv[e] | ||
190 | the_data.setVar(e, fakeenv[e]) | ||
191 | the_data.setVarFlag(e, 'export', "1") | ||
192 | |||
193 | if quieterrors: | ||
194 | the_data.setVarFlag(taskname, "quieterrors", "1") | ||
195 | |||
196 | except Exception as exc: | ||
197 | if not quieterrors: | ||
198 | logger.critical(str(exc)) | ||
199 | os._exit(1) | ||
200 | try: | ||
201 | if cfg.dry_run: | ||
202 | return 0 | ||
203 | return bb.build.exec_task(fn, taskname, the_data, cfg.profile) | ||
204 | except: | ||
205 | os._exit(1) | ||
206 | if not profiling: | ||
207 | os._exit(child()) | ||
208 | else: | ||
209 | profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) | ||
210 | prof = profile.Profile() | ||
211 | try: | ||
212 | ret = profile.Profile.runcall(prof, child) | ||
213 | finally: | ||
214 | prof.dump_stats(profname) | ||
215 | bb.utils.process_profilelog(profname) | ||
216 | os._exit(ret) | ||
217 | else: | ||
218 | for key, value in envbackup.iteritems(): | ||
219 | if value is None: | ||
220 | del os.environ[key] | ||
221 | else: | ||
222 | os.environ[key] = value | ||
223 | |||
224 | return pid, pipein, pipeout | ||
225 | |||
226 | class runQueueWorkerPipe(): | ||
227 | """ | ||
228 | Abstraction for a pipe between a worker thread and the worker server | ||
229 | """ | ||
230 | def __init__(self, pipein, pipeout): | ||
231 | self.input = pipein | ||
232 | if pipeout: | ||
233 | pipeout.close() | ||
234 | bb.utils.nonblockingfd(self.input) | ||
235 | self.queue = "" | ||
236 | |||
237 | def read(self): | ||
238 | start = len(self.queue) | ||
239 | try: | ||
240 | self.queue = self.queue + self.input.read(102400) | ||
241 | except (OSError, IOError) as e: | ||
242 | if e.errno != errno.EAGAIN: | ||
243 | raise | ||
244 | |||
245 | end = len(self.queue) | ||
246 | index = self.queue.find("</event>") | ||
247 | while index != -1: | ||
248 | worker_fire_prepickled(self.queue[:index+8]) | ||
249 | self.queue = self.queue[index+8:] | ||
250 | index = self.queue.find("</event>") | ||
251 | return (end > start) | ||
252 | |||
253 | def close(self): | ||
254 | while self.read(): | ||
255 | continue | ||
256 | if len(self.queue) > 0: | ||
257 | print("Warning, worker child left partial message: %s" % self.queue) | ||
258 | self.input.close() | ||
259 | |||
260 | normalexit = False | ||
261 | |||
262 | class BitbakeWorker(object): | ||
263 | def __init__(self, din): | ||
264 | self.input = din | ||
265 | bb.utils.nonblockingfd(self.input) | ||
266 | self.queue = "" | ||
267 | self.cookercfg = None | ||
268 | self.databuilder = None | ||
269 | self.data = None | ||
270 | self.build_pids = {} | ||
271 | self.build_pipes = {} | ||
272 | |||
273 | signal.signal(signal.SIGTERM, self.sigterm_exception) | ||
274 | # Let SIGHUP exit as SIGTERM | ||
275 | signal.signal(signal.SIGHUP, self.sigterm_exception) | ||
276 | |||
277 | def sigterm_exception(self, signum, stackframe): | ||
278 | if signum == signal.SIGTERM: | ||
279 | bb.warn("Worker recieved SIGTERM, shutting down...") | ||
280 | elif signum == signal.SIGHUP: | ||
281 | bb.warn("Worker recieved SIGHUP, shutting down...") | ||
282 | self.handle_finishnow(None) | ||
283 | signal.signal(signal.SIGTERM, signal.SIG_DFL) | ||
284 | os.kill(os.getpid(), signal.SIGTERM) | ||
285 | |||
286 | def serve(self): | ||
287 | while True: | ||
288 | (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) | ||
289 | if self.input in ready or len(self.queue): | ||
290 | start = len(self.queue) | ||
291 | try: | ||
292 | self.queue = self.queue + self.input.read() | ||
293 | except (OSError, IOError): | ||
294 | pass | ||
295 | end = len(self.queue) | ||
296 | self.handle_item("cookerconfig", self.handle_cookercfg) | ||
297 | self.handle_item("workerdata", self.handle_workerdata) | ||
298 | self.handle_item("runtask", self.handle_runtask) | ||
299 | self.handle_item("finishnow", self.handle_finishnow) | ||
300 | self.handle_item("ping", self.handle_ping) | ||
301 | self.handle_item("quit", self.handle_quit) | ||
302 | |||
303 | for pipe in self.build_pipes: | ||
304 | self.build_pipes[pipe].read() | ||
305 | if len(self.build_pids): | ||
306 | self.process_waitpid() | ||
307 | worker_flush() | ||
308 | |||
309 | |||
310 | def handle_item(self, item, func): | ||
311 | if self.queue.startswith("<" + item + ">"): | ||
312 | index = self.queue.find("</" + item + ">") | ||
313 | while index != -1: | ||
314 | func(self.queue[(len(item) + 2):index]) | ||
315 | self.queue = self.queue[(index + len(item) + 3):] | ||
316 | index = self.queue.find("</" + item + ">") | ||
317 | |||
318 | def handle_cookercfg(self, data): | ||
319 | self.cookercfg = pickle.loads(data) | ||
320 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) | ||
321 | self.databuilder.parseBaseConfiguration() | ||
322 | self.data = self.databuilder.data | ||
323 | |||
324 | def handle_workerdata(self, data): | ||
325 | self.workerdata = pickle.loads(data) | ||
326 | bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] | ||
327 | bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] | ||
328 | bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] | ||
329 | bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] | ||
330 | self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) | ||
331 | |||
332 | def handle_ping(self, _): | ||
333 | workerlog_write("Handling ping\n") | ||
334 | |||
335 | logger.warn("Pong from bitbake-worker!") | ||
336 | |||
337 | def handle_quit(self, data): | ||
338 | workerlog_write("Handling quit\n") | ||
339 | |||
340 | global normalexit | ||
341 | normalexit = True | ||
342 | sys.exit(0) | ||
343 | |||
344 | def handle_runtask(self, data): | ||
345 | fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data) | ||
346 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) | ||
347 | |||
348 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors) | ||
349 | |||
350 | self.build_pids[pid] = task | ||
351 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) | ||
352 | |||
353 | def process_waitpid(self): | ||
354 | """ | ||
355 | Return none is there are no processes awaiting result collection, otherwise | ||
356 | collect the process exit codes and close the information pipe. | ||
357 | """ | ||
358 | try: | ||
359 | pid, status = os.waitpid(-1, os.WNOHANG) | ||
360 | if pid == 0 or os.WIFSTOPPED(status): | ||
361 | return None | ||
362 | except OSError: | ||
363 | return None | ||
364 | |||
365 | workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) | ||
366 | |||
367 | if os.WIFEXITED(status): | ||
368 | status = os.WEXITSTATUS(status) | ||
369 | elif os.WIFSIGNALED(status): | ||
370 | # Per shell conventions for $?, when a process exits due to | ||
371 | # a signal, we return an exit code of 128 + SIGNUM | ||
372 | status = 128 + os.WTERMSIG(status) | ||
373 | |||
374 | task = self.build_pids[pid] | ||
375 | del self.build_pids[pid] | ||
376 | |||
377 | self.build_pipes[pid].close() | ||
378 | del self.build_pipes[pid] | ||
379 | |||
380 | worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>") | ||
381 | |||
382 | def handle_finishnow(self, _): | ||
383 | if self.build_pids: | ||
384 | logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) | ||
385 | for k, v in self.build_pids.iteritems(): | ||
386 | try: | ||
387 | os.kill(-k, signal.SIGTERM) | ||
388 | os.waitpid(-1, 0) | ||
389 | except: | ||
390 | pass | ||
391 | for pipe in self.build_pipes: | ||
392 | self.build_pipes[pipe].read() | ||
393 | |||
394 | try: | ||
395 | worker = BitbakeWorker(sys.stdin) | ||
396 | if not profiling: | ||
397 | worker.serve() | ||
398 | else: | ||
399 | profname = "profile-worker.log" | ||
400 | prof = profile.Profile() | ||
401 | try: | ||
402 | profile.Profile.runcall(prof, worker.serve) | ||
403 | finally: | ||
404 | prof.dump_stats(profname) | ||
405 | bb.utils.process_profilelog(profname) | ||
406 | except BaseException as e: | ||
407 | if not normalexit: | ||
408 | import traceback | ||
409 | sys.stderr.write(traceback.format_exc()) | ||
410 | sys.stderr.write(str(e)) | ||
411 | while len(worker_queue): | ||
412 | worker_flush() | ||
413 | workerlog_write("exitting") | ||
414 | sys.exit(0) | ||
415 | |||