diff options
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker new file mode 100755 index 0000000000..8edf8dd658 --- /dev/null +++ b/bitbake/bin/bitbake-worker | |||
@@ -0,0 +1,358 @@ | |||
1 | #!/usr/bin/env python | ||
2 | |||
3 | import os | ||
4 | import sys | ||
5 | import warnings | ||
6 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) | ||
7 | from bb import fetch2 | ||
8 | import logging | ||
9 | import bb | ||
10 | import select | ||
11 | import errno | ||
12 | |||
13 | # Users shouldn't be running this code directly | ||
14 | if len(sys.argv) != 2 or sys.argv[1] != "decafbad": | ||
15 | print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") | ||
16 | sys.exit(1) | ||
17 | |||
18 | logger = logging.getLogger("BitBake") | ||
19 | |||
20 | try: | ||
21 | import cPickle as pickle | ||
22 | except ImportError: | ||
23 | import pickle | ||
24 | bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") | ||
25 | |||
26 | |||
27 | worker_pipe = sys.stdout.fileno() | ||
28 | bb.utils.nonblockingfd(worker_pipe) | ||
29 | |||
30 | handler = bb.event.LogHandler() | ||
31 | logger.addHandler(handler) | ||
32 | |||
33 | if 0: | ||
34 | # Code to write out a log file of all events passing through the worker | ||
35 | logfilename = "/tmp/workerlogfile" | ||
36 | format_str = "%(levelname)s: %(message)s" | ||
37 | conlogformat = bb.msg.BBLogFormatter(format_str) | ||
38 | consolelog = logging.FileHandler(logfilename) | ||
39 | bb.msg.addDefaultlogFilter(consolelog) | ||
40 | consolelog.setFormatter(conlogformat) | ||
41 | logger.addHandler(consolelog) | ||
42 | |||
43 | worker_queue = "" | ||
44 | |||
45 | def worker_fire(event, d): | ||
46 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
47 | worker_fire_prepickled(data) | ||
48 | |||
49 | def worker_fire_prepickled(event): | ||
50 | global worker_queue | ||
51 | |||
52 | worker_queue = worker_queue + event | ||
53 | worker_flush() | ||
54 | |||
55 | def worker_flush(): | ||
56 | global worker_queue, worker_pipe | ||
57 | |||
58 | if not worker_queue: | ||
59 | return | ||
60 | |||
61 | try: | ||
62 | written = os.write(worker_pipe, worker_queue) | ||
63 | worker_queue = worker_queue[written:] | ||
64 | except (IOError, OSError) as e: | ||
65 | if e.errno != errno.EAGAIN: | ||
66 | raise | ||
67 | |||
68 | def worker_child_fire(event, d): | ||
69 | global worker_pipe | ||
70 | |||
71 | data = "<event>" + pickle.dumps(event) + "</event>" | ||
72 | worker_pipe.write(data) | ||
73 | |||
74 | bb.event.worker_fire = worker_fire | ||
75 | |||
76 | lf = None | ||
77 | #lf = open("/tmp/workercommandlog", "w+") | ||
78 | def workerlog_write(msg): | ||
79 | if lf: | ||
80 | lf.write(msg) | ||
81 | lf.flush() | ||
82 | |||
83 | def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False): | ||
84 | # We need to setup the environment BEFORE the fork, since | ||
85 | # a fork() or exec*() activates PSEUDO... | ||
86 | |||
87 | envbackup = {} | ||
88 | fakeenv = {} | ||
89 | umask = None | ||
90 | |||
91 | taskdep = workerdata["taskdeps"][fn] | ||
92 | if 'umask' in taskdep and taskname in taskdep['umask']: | ||
93 | # umask might come in as a number or text string.. | ||
94 | try: | ||
95 | umask = int(taskdep['umask'][taskname],8) | ||
96 | except TypeError: | ||
97 | umask = taskdep['umask'][taskname] | ||
98 | |||
99 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']: | ||
100 | envvars = (workerdata["fakerootenv"][fn] or "").split() | ||
101 | for key, value in (var.split('=') for var in envvars): | ||
102 | envbackup[key] = os.environ.get(key) | ||
103 | os.environ[key] = value | ||
104 | fakeenv[key] = value | ||
105 | |||
106 | fakedirs = (workerdata["fakerootdirs"][fn] or "").split() | ||
107 | for p in fakedirs: | ||
108 | bb.utils.mkdirhier(p) | ||
109 | logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % | ||
110 | (fn, taskname, ', '.join(fakedirs))) | ||
111 | else: | ||
112 | envvars = (workerdata["fakerootnoenv"][fn] or "").split() | ||
113 | for key, value in (var.split('=') for var in envvars): | ||
114 | envbackup[key] = os.environ.get(key) | ||
115 | os.environ[key] = value | ||
116 | fakeenv[key] = value | ||
117 | |||
118 | sys.stdout.flush() | ||
119 | sys.stderr.flush() | ||
120 | |||
121 | try: | ||
122 | pipein, pipeout = os.pipe() | ||
123 | pipein = os.fdopen(pipein, 'rb', 4096) | ||
124 | pipeout = os.fdopen(pipeout, 'wb', 0) | ||
125 | pid = os.fork() | ||
126 | except OSError as e: | ||
127 | bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
128 | |||
129 | if pid == 0: | ||
130 | global worker_pipe | ||
131 | pipein.close() | ||
132 | |||
133 | # Save out the PID so that the event can include it the | ||
134 | # events | ||
135 | bb.event.worker_pid = os.getpid() | ||
136 | bb.event.worker_fire = worker_child_fire | ||
137 | worker_pipe = pipeout | ||
138 | |||
139 | # Make the child the process group leader | ||
140 | os.setpgid(0, 0) | ||
141 | # No stdin | ||
142 | newsi = os.open(os.devnull, os.O_RDWR) | ||
143 | os.dup2(newsi, sys.stdin.fileno()) | ||
144 | |||
145 | if umask: | ||
146 | os.umask(umask) | ||
147 | |||
148 | data.setVar("BB_WORKERCONTEXT", "1") | ||
149 | bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"]) | ||
150 | ret = 0 | ||
151 | try: | ||
152 | the_data = bb.cache.Cache.loadDataFull(fn, appends, data) | ||
153 | the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) | ||
154 | for h in workerdata["hashes"]: | ||
155 | the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h]) | ||
156 | for h in workerdata["hash_deps"]: | ||
157 | the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h]) | ||
158 | |||
159 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | ||
160 | # successfully. We also need to unset anything from the environment which shouldn't be there | ||
161 | exports = bb.data.exported_vars(the_data) | ||
162 | bb.utils.empty_environment() | ||
163 | for e, v in exports: | ||
164 | os.environ[e] = v | ||
165 | for e in fakeenv: | ||
166 | os.environ[e] = fakeenv[e] | ||
167 | the_data.setVar(e, fakeenv[e]) | ||
168 | the_data.setVarFlag(e, 'export', "1") | ||
169 | |||
170 | if quieterrors: | ||
171 | the_data.setVarFlag(taskname, "quieterrors", "1") | ||
172 | |||
173 | except Exception as exc: | ||
174 | if not quieterrors: | ||
175 | logger.critical(str(exc)) | ||
176 | os._exit(1) | ||
177 | try: | ||
178 | if not cfg.dry_run: | ||
179 | ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) | ||
180 | os._exit(ret) | ||
181 | except: | ||
182 | os._exit(1) | ||
183 | else: | ||
184 | for key, value in envbackup.iteritems(): | ||
185 | if value is None: | ||
186 | del os.environ[key] | ||
187 | else: | ||
188 | os.environ[key] = value | ||
189 | |||
190 | return pid, pipein, pipeout | ||
191 | |||
192 | class runQueueWorkerPipe(): | ||
193 | """ | ||
194 | Abstraction for a pipe between a worker thread and the worker server | ||
195 | """ | ||
196 | def __init__(self, pipein, pipeout): | ||
197 | self.input = pipein | ||
198 | if pipeout: | ||
199 | pipeout.close() | ||
200 | bb.utils.nonblockingfd(self.input) | ||
201 | self.queue = "" | ||
202 | |||
203 | def read(self): | ||
204 | start = len(self.queue) | ||
205 | try: | ||
206 | self.queue = self.queue + self.input.read(102400) | ||
207 | except (OSError, IOError) as e: | ||
208 | if e.errno != errno.EAGAIN: | ||
209 | raise | ||
210 | |||
211 | end = len(self.queue) | ||
212 | index = self.queue.find("</event>") | ||
213 | while index != -1: | ||
214 | worker_fire_prepickled(self.queue[:index+8]) | ||
215 | self.queue = self.queue[index+8:] | ||
216 | index = self.queue.find("</event>") | ||
217 | return (end > start) | ||
218 | |||
219 | def close(self): | ||
220 | while self.read(): | ||
221 | continue | ||
222 | if len(self.queue) > 0: | ||
223 | print("Warning, worker child left partial message: %s" % self.queue) | ||
224 | self.input.close() | ||
225 | |||
226 | normalexit = False | ||
227 | |||
228 | class BitbakeWorker(object): | ||
229 | def __init__(self, din): | ||
230 | self.input = din | ||
231 | bb.utils.nonblockingfd(self.input) | ||
232 | self.queue = "" | ||
233 | self.cookercfg = None | ||
234 | self.databuilder = None | ||
235 | self.data = None | ||
236 | self.build_pids = {} | ||
237 | self.build_pipes = {} | ||
238 | |||
239 | def serve(self): | ||
240 | while True: | ||
241 | (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) | ||
242 | if self.input in ready or len(self.queue): | ||
243 | start = len(self.queue) | ||
244 | try: | ||
245 | self.queue = self.queue + self.input.read() | ||
246 | except (OSError, IOError): | ||
247 | pass | ||
248 | end = len(self.queue) | ||
249 | self.handle_item("cookerconfig", self.handle_cookercfg) | ||
250 | self.handle_item("workerdata", self.handle_workerdata) | ||
251 | self.handle_item("runtask", self.handle_runtask) | ||
252 | self.handle_item("finishnow", self.handle_finishnow) | ||
253 | self.handle_item("ping", self.handle_ping) | ||
254 | self.handle_item("quit", self.handle_quit) | ||
255 | |||
256 | for pipe in self.build_pipes: | ||
257 | self.build_pipes[pipe].read() | ||
258 | if len(self.build_pids): | ||
259 | self.process_waitpid() | ||
260 | worker_flush() | ||
261 | |||
262 | |||
263 | def handle_item(self, item, func): | ||
264 | if self.queue.startswith("<" + item + ">"): | ||
265 | index = self.queue.find("</" + item + ">") | ||
266 | while index != -1: | ||
267 | func(self.queue[(len(item) + 2):index]) | ||
268 | self.queue = self.queue[(index + len(item) + 3):] | ||
269 | index = self.queue.find("</" + item + ">") | ||
270 | |||
271 | def handle_cookercfg(self, data): | ||
272 | self.cookercfg = pickle.loads(data) | ||
273 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) | ||
274 | self.databuilder.parseBaseConfiguration() | ||
275 | self.data = self.databuilder.data | ||
276 | |||
277 | def handle_workerdata(self, data): | ||
278 | self.workerdata = pickle.loads(data) | ||
279 | bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] | ||
280 | bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] | ||
281 | bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] | ||
282 | bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] | ||
283 | |||
284 | def handle_ping(self, _): | ||
285 | workerlog_write("Handling ping\n") | ||
286 | |||
287 | logger.warn("Pong from bitbake-worker!") | ||
288 | |||
289 | def handle_quit(self, data): | ||
290 | workerlog_write("Handling quit\n") | ||
291 | |||
292 | global normalexit | ||
293 | normalexit = True | ||
294 | sys.exit(0) | ||
295 | |||
296 | def handle_runtask(self, data): | ||
297 | fn, task, taskname, quieterrors, appends = pickle.loads(data) | ||
298 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) | ||
299 | |||
300 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors) | ||
301 | |||
302 | self.build_pids[pid] = task | ||
303 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) | ||
304 | |||
305 | def process_waitpid(self): | ||
306 | """ | ||
307 | Return none is there are no processes awaiting result collection, otherwise | ||
308 | collect the process exit codes and close the information pipe. | ||
309 | """ | ||
310 | try: | ||
311 | pid, status = os.waitpid(-1, os.WNOHANG) | ||
312 | if pid == 0 or os.WIFSTOPPED(status): | ||
313 | return None | ||
314 | except OSError: | ||
315 | return None | ||
316 | |||
317 | workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) | ||
318 | |||
319 | if os.WIFEXITED(status): | ||
320 | status = os.WEXITSTATUS(status) | ||
321 | elif os.WIFSIGNALED(status): | ||
322 | # Per shell conventions for $?, when a process exits due to | ||
323 | # a signal, we return an exit code of 128 + SIGNUM | ||
324 | status = 128 + os.WTERMSIG(status) | ||
325 | |||
326 | task = self.build_pids[pid] | ||
327 | del self.build_pids[pid] | ||
328 | |||
329 | self.build_pipes[pid].close() | ||
330 | del self.build_pipes[pid] | ||
331 | |||
332 | worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>") | ||
333 | |||
334 | def handle_finishnow(self, _): | ||
335 | if self.build_pids: | ||
336 | logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) | ||
337 | for k, v in self.build_pids.iteritems(): | ||
338 | try: | ||
339 | os.kill(-k, signal.SIGTERM) | ||
340 | os.waitpid(-1, 0) | ||
341 | except: | ||
342 | pass | ||
343 | for pipe in self.build_pipes: | ||
344 | self.build_pipes[pipe].read() | ||
345 | |||
346 | try: | ||
347 | worker = BitbakeWorker(sys.stdin) | ||
348 | worker.serve() | ||
349 | except BaseException as e: | ||
350 | if not normalexit: | ||
351 | import traceback | ||
352 | sys.stderr.write(traceback.format_exc()) | ||
353 | sys.stderr.write(str(e)) | ||
354 | while len(worker_queue): | ||
355 | worker_flush() | ||
356 | workerlog_write("exitting") | ||
357 | sys.exit(0) | ||
358 | |||