diff options
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker new file mode 100755 index 0000000000..68e2bf4571 --- /dev/null +++ b/bitbake/bin/bitbake-worker | |||
@@ -0,0 +1,375 @@ | |||
1 | #!/usr/bin/env python | ||
2 | |||
3 | import os | ||
4 | import sys | ||
5 | import warnings | ||
6 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) | ||
7 | from bb import fetch2 | ||
8 | import logging | ||
9 | import bb | ||
10 | import select | ||
11 | import errno | ||
12 | import signal | ||
13 | |||
14 | # Users shouldn't be running this code directly | ||
15 | if len(sys.argv) != 2 or sys.argv[1] != "decafbad": | ||
16 | print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") | ||
17 | sys.exit(1) | ||
18 | |||
19 | logger = logging.getLogger("BitBake") | ||
20 | |||
21 | try: | ||
22 | import cPickle as pickle | ||
23 | except ImportError: | ||
24 | import pickle | ||
25 | bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") | ||
26 | |||
27 | |||
28 | worker_pipe = sys.stdout.fileno() | ||
29 | bb.utils.nonblockingfd(worker_pipe) | ||
30 | |||
31 | handler = bb.event.LogHandler() | ||
32 | logger.addHandler(handler) | ||
33 | |||
34 | if 0: | ||
35 | # Code to write out a log file of all events passing through the worker | ||
36 | logfilename = "/tmp/workerlogfile" | ||
37 | format_str = "%(levelname)s: %(message)s" | ||
38 | conlogformat = bb.msg.BBLogFormatter(format_str) | ||
39 | consolelog = logging.FileHandler(logfilename) | ||
40 | bb.msg.addDefaultlogFilter(consolelog) | ||
41 | consolelog.setFormatter(conlogformat) | ||
42 | logger.addHandler(consolelog) | ||
43 | |||
44 | worker_queue = "" | ||
45 | |||
46 | def worker_fire(event, d): | ||
47 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
48 | worker_fire_prepickled(data) | ||
49 | |||
50 | def worker_fire_prepickled(event): | ||
51 | global worker_queue | ||
52 | |||
53 | worker_queue = worker_queue + event | ||
54 | worker_flush() | ||
55 | |||
56 | def worker_flush(): | ||
57 | global worker_queue, worker_pipe | ||
58 | |||
59 | if not worker_queue: | ||
60 | return | ||
61 | |||
62 | try: | ||
63 | written = os.write(worker_pipe, worker_queue) | ||
64 | worker_queue = worker_queue[written:] | ||
65 | except (IOError, OSError) as e: | ||
66 | if e.errno != errno.EAGAIN: | ||
67 | raise | ||
68 | |||
69 | def worker_child_fire(event, d): | ||
70 | global worker_pipe | ||
71 | |||
72 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
73 | worker_pipe.write(data) | ||
74 | |||
75 | bb.event.worker_fire = worker_fire | ||
76 | |||
77 | lf = None | ||
78 | #lf = open("/tmp/workercommandlog", "w+") | ||
79 | def workerlog_write(msg): | ||
80 | if lf: | ||
81 | lf.write(msg) | ||
82 | lf.flush() | ||
83 | |||
84 | def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): | ||
85 | # We need to setup the environment BEFORE the fork, since | ||
86 | # a fork() or exec*() activates PSEUDO... | ||
87 | |||
88 | envbackup = {} | ||
89 | fakeenv = {} | ||
90 | umask = None | ||
91 | |||
92 | taskdep = workerdata["taskdeps"][fn] | ||
93 | if 'umask' in taskdep and taskname in taskdep['umask']: | ||
94 | # umask might come in as a number or text string.. | ||
95 | try: | ||
96 | umask = int(taskdep['umask'][taskname],8) | ||
97 | except TypeError: | ||
98 | umask = taskdep['umask'][taskname] | ||
99 | |||
100 | # We can't use the fakeroot environment in a dry run as it possibly hasn't been built | ||
101 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run: | ||
102 | envvars = (workerdata["fakerootenv"][fn] or "").split() | ||
103 | for key, value in (var.split('=') for var in envvars): | ||
104 | envbackup[key] = os.environ.get(key) | ||
105 | os.environ[key] = value | ||
106 | fakeenv[key] = value | ||
107 | |||
108 | fakedirs = (workerdata["fakerootdirs"][fn] or "").split() | ||
109 | for p in fakedirs: | ||
110 | bb.utils.mkdirhier(p) | ||
111 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % | ||
112 | (fn, taskname, ', '.join(fakedirs))) | ||
113 | else: | ||
114 | envvars = (workerdata["fakerootnoenv"][fn] or "").split() | ||
115 | for key, value in (var.split('=') for var in envvars): | ||
116 | envbackup[key] = os.environ.get(key) | ||
117 | os.environ[key] = value | ||
118 | fakeenv[key] = value | ||
119 | |||
120 | sys.stdout.flush() | ||
121 | sys.stderr.flush() | ||
122 | |||
123 | try: | ||
124 | pipein, pipeout = os.pipe() | ||
125 | pipein = os.fdopen(pipein, 'rb', 4096) | ||
126 | pipeout = os.fdopen(pipeout, 'wb', 0) | ||
127 | pid = os.fork() | ||
128 | except OSError as e: | ||
129 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
130 | |||
131 | if pid == 0: | ||
132 | global worker_pipe | ||
133 | pipein.close() | ||
134 | |||
135 | signal.signal(signal.SIGTERM, signal.SIG_DFL) | ||
136 | |||
137 | # Save out the PID so that the event can include it the | ||
138 | # events | ||
139 | bb.event.worker_pid = os.getpid() | ||
140 | bb.event.worker_fire = worker_child_fire | ||
141 | worker_pipe = pipeout | ||
142 | |||
143 | # Make the child the process group leader | ||
144 | os.setpgid(0, 0) | ||
145 | # No stdin | ||
146 | newsi = os.open(os.devnull, os.O_RDWR) | ||
147 | os.dup2(newsi, sys.stdin.fileno()) | ||
148 | |||
149 | if umask: | ||
150 | os.umask(umask) | ||
151 | |||
152 | data.setVar("BB_WORKERCONTEXT", "1") | ||
153 | data.setVar("BB_TASKDEPDATA", taskdepdata) | ||
154 | data.setVar("BUILDNAME", workerdata["buildname"]) | ||
155 | data.setVar("DATE", workerdata["date"]) | ||
156 | data.setVar("TIME", workerdata["time"]) | ||
157 | bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"]) | ||
158 | ret = 0 | ||
159 | try: | ||
160 | the_data = bb.cache.Cache.loadDataFull(fn, appends, data) | ||
161 | the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) | ||
162 | for h in workerdata["hashes"]: | ||
163 | the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h]) | ||
164 | for h in workerdata["hash_deps"]: | ||
165 | the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h]) | ||
166 | |||
167 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | ||
168 | # successfully. We also need to unset anything from the environment which shouldn't be there | ||
169 | exports = bb.data.exported_vars(the_data) | ||
170 | bb.utils.empty_environment() | ||
171 | for e, v in exports: | ||
172 | os.environ[e] = v | ||
173 | for e in fakeenv: | ||
174 | os.environ[e] = fakeenv[e] | ||
175 | the_data.setVar(e, fakeenv[e]) | ||
176 | the_data.setVarFlag(e, 'export', "1") | ||
177 | |||
178 | if quieterrors: | ||
179 | the_data.setVarFlag(taskname, "quieterrors", "1") | ||
180 | |||
181 | except Exception as exc: | ||
182 | if not quieterrors: | ||
183 | logger.critical(str(exc)) | ||
184 | os._exit(1) | ||
185 | try: | ||
186 | if not cfg.dry_run: | ||
187 | ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) | ||
188 | os._exit(ret) | ||
189 | except: | ||
190 | os._exit(1) | ||
191 | else: | ||
192 | for key, value in envbackup.iteritems(): | ||
193 | if value is None: | ||
194 | del os.environ[key] | ||
195 | else: | ||
196 | os.environ[key] = value | ||
197 | |||
198 | return pid, pipein, pipeout | ||
199 | |||
200 | class runQueueWorkerPipe(): | ||
201 | """ | ||
202 | Abstraction for a pipe between a worker thread and the worker server | ||
203 | """ | ||
204 | def __init__(self, pipein, pipeout): | ||
205 | self.input = pipein | ||
206 | if pipeout: | ||
207 | pipeout.close() | ||
208 | bb.utils.nonblockingfd(self.input) | ||
209 | self.queue = "" | ||
210 | |||
211 | def read(self): | ||
212 | start = len(self.queue) | ||
213 | try: | ||
214 | self.queue = self.queue + self.input.read(102400) | ||
215 | except (OSError, IOError) as e: | ||
216 | if e.errno != errno.EAGAIN: | ||
217 | raise | ||
218 | |||
219 | end = len(self.queue) | ||
220 | index = self.queue.find("</event>") | ||
221 | while index != -1: | ||
222 | worker_fire_prepickled(self.queue[:index+8]) | ||
223 | self.queue = self.queue[index+8:] | ||
224 | index = self.queue.find("</event>") | ||
225 | return (end > start) | ||
226 | |||
227 | def close(self): | ||
228 | while self.read(): | ||
229 | continue | ||
230 | if len(self.queue) > 0: | ||
231 | print("Warning, worker child left partial message: %s" % self.queue) | ||
232 | self.input.close() | ||
233 | |||
234 | normalexit = False | ||
235 | |||
236 | class BitbakeWorker(object): | ||
237 | def __init__(self, din): | ||
238 | self.input = din | ||
239 | bb.utils.nonblockingfd(self.input) | ||
240 | self.queue = "" | ||
241 | self.cookercfg = None | ||
242 | self.databuilder = None | ||
243 | self.data = None | ||
244 | self.build_pids = {} | ||
245 | self.build_pipes = {} | ||
246 | |||
247 | signal.signal(signal.SIGTERM, self.sigterm_exception) | ||
248 | |||
249 | def sigterm_exception(self, signum, stackframe): | ||
250 | bb.warn("Worker recieved SIGTERM, shutting down...") | ||
251 | self.handle_finishnow(None) | ||
252 | signal.signal(signal.SIGTERM, signal.SIG_DFL) | ||
253 | os.kill(os.getpid(), signal.SIGTERM) | ||
254 | |||
255 | def serve(self): | ||
256 | while True: | ||
257 | (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) | ||
258 | if self.input in ready or len(self.queue): | ||
259 | start = len(self.queue) | ||
260 | try: | ||
261 | self.queue = self.queue + self.input.read() | ||
262 | except (OSError, IOError): | ||
263 | pass | ||
264 | end = len(self.queue) | ||
265 | self.handle_item("cookerconfig", self.handle_cookercfg) | ||
266 | self.handle_item("workerdata", self.handle_workerdata) | ||
267 | self.handle_item("runtask", self.handle_runtask) | ||
268 | self.handle_item("finishnow", self.handle_finishnow) | ||
269 | self.handle_item("ping", self.handle_ping) | ||
270 | self.handle_item("quit", self.handle_quit) | ||
271 | |||
272 | for pipe in self.build_pipes: | ||
273 | self.build_pipes[pipe].read() | ||
274 | if len(self.build_pids): | ||
275 | self.process_waitpid() | ||
276 | worker_flush() | ||
277 | |||
278 | |||
279 | def handle_item(self, item, func): | ||
280 | if self.queue.startswith("<" + item + ">"): | ||
281 | index = self.queue.find("</" + item + ">") | ||
282 | while index != -1: | ||
283 | func(self.queue[(len(item) + 2):index]) | ||
284 | self.queue = self.queue[(index + len(item) + 3):] | ||
285 | index = self.queue.find("</" + item + ">") | ||
286 | |||
287 | def handle_cookercfg(self, data): | ||
288 | self.cookercfg = pickle.loads(data) | ||
289 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) | ||
290 | self.databuilder.parseBaseConfiguration() | ||
291 | self.data = self.databuilder.data | ||
292 | |||
293 | def handle_workerdata(self, data): | ||
294 | self.workerdata = pickle.loads(data) | ||
295 | bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] | ||
296 | bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] | ||
297 | bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] | ||
298 | bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] | ||
299 | self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) | ||
300 | |||
301 | def handle_ping(self, _): | ||
302 | workerlog_write("Handling ping\n") | ||
303 | |||
304 | logger.warn("Pong from bitbake-worker!") | ||
305 | |||
306 | def handle_quit(self, data): | ||
307 | workerlog_write("Handling quit\n") | ||
308 | |||
309 | global normalexit | ||
310 | normalexit = True | ||
311 | sys.exit(0) | ||
312 | |||
313 | def handle_runtask(self, data): | ||
314 | fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data) | ||
315 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) | ||
316 | |||
317 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors) | ||
318 | |||
319 | self.build_pids[pid] = task | ||
320 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) | ||
321 | |||
322 | def process_waitpid(self): | ||
323 | """ | ||
324 | Return none is there are no processes awaiting result collection, otherwise | ||
325 | collect the process exit codes and close the information pipe. | ||
326 | """ | ||
327 | try: | ||
328 | pid, status = os.waitpid(-1, os.WNOHANG) | ||
329 | if pid == 0 or os.WIFSTOPPED(status): | ||
330 | return None | ||
331 | except OSError: | ||
332 | return None | ||
333 | |||
334 | workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) | ||
335 | |||
336 | if os.WIFEXITED(status): | ||
337 | status = os.WEXITSTATUS(status) | ||
338 | elif os.WIFSIGNALED(status): | ||
339 | # Per shell conventions for $?, when a process exits due to | ||
340 | # a signal, we return an exit code of 128 + SIGNUM | ||
341 | status = 128 + os.WTERMSIG(status) | ||
342 | |||
343 | task = self.build_pids[pid] | ||
344 | del self.build_pids[pid] | ||
345 | |||
346 | self.build_pipes[pid].close() | ||
347 | del self.build_pipes[pid] | ||
348 | |||
349 | worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>") | ||
350 | |||
351 | def handle_finishnow(self, _): | ||
352 | if self.build_pids: | ||
353 | logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) | ||
354 | for k, v in self.build_pids.iteritems(): | ||
355 | try: | ||
356 | os.kill(-k, signal.SIGTERM) | ||
357 | os.waitpid(-1, 0) | ||
358 | except: | ||
359 | pass | ||
360 | for pipe in self.build_pipes: | ||
361 | self.build_pipes[pipe].read() | ||
362 | |||
363 | try: | ||
364 | worker = BitbakeWorker(sys.stdin) | ||
365 | worker.serve() | ||
366 | except BaseException as e: | ||
367 | if not normalexit: | ||
368 | import traceback | ||
369 | sys.stderr.write(traceback.format_exc()) | ||
370 | sys.stderr.write(str(e)) | ||
371 | while len(worker_queue): | ||
372 | worker_flush() | ||
373 | workerlog_write("exitting") | ||
374 | sys.exit(0) | ||
375 | |||