summaryrefslogtreecommitdiffstats
path: root/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-xbitbake/bin/bitbake-worker415
1 files changed, 415 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
new file mode 100755
index 0000000000..8a24161250
--- /dev/null
+++ b/bitbake/bin/bitbake-worker
@@ -0,0 +1,415 @@
1#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
13
14# Users shouldn't be running this code directly
15if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
16 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
17 sys.exit(1)
18
19profiling = False
20if sys.argv[1] == "decafbadbad":
21 profiling = True
22 try:
23 import cProfile as profile
24 except:
25 import profile
26
27logger = logging.getLogger("BitBake")
28
29try:
30 import cPickle as pickle
31except ImportError:
32 import pickle
33 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
34
35
36worker_pipe = sys.stdout.fileno()
37bb.utils.nonblockingfd(worker_pipe)
38
39handler = bb.event.LogHandler()
40logger.addHandler(handler)
41
42if 0:
43 # Code to write out a log file of all events passing through the worker
44 logfilename = "/tmp/workerlogfile"
45 format_str = "%(levelname)s: %(message)s"
46 conlogformat = bb.msg.BBLogFormatter(format_str)
47 consolelog = logging.FileHandler(logfilename)
48 bb.msg.addDefaultlogFilter(consolelog)
49 consolelog.setFormatter(conlogformat)
50 logger.addHandler(consolelog)
51
52worker_queue = ""
53
54def worker_fire(event, d):
55 data = "<event>" + pickle.dumps(event) + "</event>"
56 worker_fire_prepickled(data)
57
58def worker_fire_prepickled(event):
59 global worker_queue
60
61 worker_queue = worker_queue + event
62 worker_flush()
63
64def worker_flush():
65 global worker_queue, worker_pipe
66
67 if not worker_queue:
68 return
69
70 try:
71 written = os.write(worker_pipe, worker_queue)
72 worker_queue = worker_queue[written:]
73 except (IOError, OSError) as e:
74 if e.errno != errno.EAGAIN:
75 raise
76
77def worker_child_fire(event, d):
78 global worker_pipe
79
80 data = "<event>" + pickle.dumps(event) + "</event>"
81 worker_pipe.write(data)
82
83bb.event.worker_fire = worker_fire
84
85lf = None
86#lf = open("/tmp/workercommandlog", "w+")
87def workerlog_write(msg):
88 if lf:
89 lf.write(msg)
90 lf.flush()
91
92def sigterm_handler(signum, frame):
93 signal.signal(signal.SIGTERM, signal.SIG_DFL)
94 os.killpg(0, signal.SIGTERM)
95 sys.exit()
96
97def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
98 # We need to setup the environment BEFORE the fork, since
99 # a fork() or exec*() activates PSEUDO...
100
101 envbackup = {}
102 fakeenv = {}
103 umask = None
104
105 taskdep = workerdata["taskdeps"][fn]
106 if 'umask' in taskdep and taskname in taskdep['umask']:
107 # umask might come in as a number or text string..
108 try:
109 umask = int(taskdep['umask'][taskname],8)
110 except TypeError:
111 umask = taskdep['umask'][taskname]
112
113 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
114 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
115 envvars = (workerdata["fakerootenv"][fn] or "").split()
116 for key, value in (var.split('=') for var in envvars):
117 envbackup[key] = os.environ.get(key)
118 os.environ[key] = value
119 fakeenv[key] = value
120
121 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
122 for p in fakedirs:
123 bb.utils.mkdirhier(p)
124 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
125 (fn, taskname, ', '.join(fakedirs)))
126 else:
127 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
128 for key, value in (var.split('=') for var in envvars):
129 envbackup[key] = os.environ.get(key)
130 os.environ[key] = value
131 fakeenv[key] = value
132
133 sys.stdout.flush()
134 sys.stderr.flush()
135
136 try:
137 pipein, pipeout = os.pipe()
138 pipein = os.fdopen(pipein, 'rb', 4096)
139 pipeout = os.fdopen(pipeout, 'wb', 0)
140 pid = os.fork()
141 except OSError as e:
142 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
143
144 if pid == 0:
145 def child():
146 global worker_pipe
147 pipein.close()
148
149 signal.signal(signal.SIGTERM, sigterm_handler)
150 # Let SIGHUP exit as SIGTERM
151 signal.signal(signal.SIGHUP, sigterm_handler)
152
153 # Save out the PID so that the event can include it the
154 # events
155 bb.event.worker_pid = os.getpid()
156 bb.event.worker_fire = worker_child_fire
157 worker_pipe = pipeout
158
159 # Make the child the process group leader and ensure no
160 # child process will be controlled by the current terminal
161 # This ensures signals sent to the controlling terminal like Ctrl+C
162 # don't stop the child processes.
163 os.setsid()
164 # No stdin
165 newsi = os.open(os.devnull, os.O_RDWR)
166 os.dup2(newsi, sys.stdin.fileno())
167
168 if umask:
169 os.umask(umask)
170
171 data.setVar("BB_WORKERCONTEXT", "1")
172 data.setVar("BB_TASKDEPDATA", taskdepdata)
173 data.setVar("BUILDNAME", workerdata["buildname"])
174 data.setVar("DATE", workerdata["date"])
175 data.setVar("TIME", workerdata["time"])
176 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
177 ret = 0
178 try:
179 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
180 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
181
182 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
183 # successfully. We also need to unset anything from the environment which shouldn't be there
184 exports = bb.data.exported_vars(the_data)
185 bb.utils.empty_environment()
186 for e, v in exports:
187 os.environ[e] = v
188 for e in fakeenv:
189 os.environ[e] = fakeenv[e]
190 the_data.setVar(e, fakeenv[e])
191 the_data.setVarFlag(e, 'export', "1")
192
193 if quieterrors:
194 the_data.setVarFlag(taskname, "quieterrors", "1")
195
196 except Exception as exc:
197 if not quieterrors:
198 logger.critical(str(exc))
199 os._exit(1)
200 try:
201 if cfg.dry_run:
202 return 0
203 return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
204 except:
205 os._exit(1)
206 if not profiling:
207 os._exit(child())
208 else:
209 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
210 prof = profile.Profile()
211 try:
212 ret = profile.Profile.runcall(prof, child)
213 finally:
214 prof.dump_stats(profname)
215 bb.utils.process_profilelog(profname)
216 os._exit(ret)
217 else:
218 for key, value in envbackup.iteritems():
219 if value is None:
220 del os.environ[key]
221 else:
222 os.environ[key] = value
223
224 return pid, pipein, pipeout
225
226class runQueueWorkerPipe():
227 """
228 Abstraction for a pipe between a worker thread and the worker server
229 """
230 def __init__(self, pipein, pipeout):
231 self.input = pipein
232 if pipeout:
233 pipeout.close()
234 bb.utils.nonblockingfd(self.input)
235 self.queue = ""
236
237 def read(self):
238 start = len(self.queue)
239 try:
240 self.queue = self.queue + self.input.read(102400)
241 except (OSError, IOError) as e:
242 if e.errno != errno.EAGAIN:
243 raise
244
245 end = len(self.queue)
246 index = self.queue.find("</event>")
247 while index != -1:
248 worker_fire_prepickled(self.queue[:index+8])
249 self.queue = self.queue[index+8:]
250 index = self.queue.find("</event>")
251 return (end > start)
252
253 def close(self):
254 while self.read():
255 continue
256 if len(self.queue) > 0:
257 print("Warning, worker child left partial message: %s" % self.queue)
258 self.input.close()
259
260normalexit = False
261
262class BitbakeWorker(object):
263 def __init__(self, din):
264 self.input = din
265 bb.utils.nonblockingfd(self.input)
266 self.queue = ""
267 self.cookercfg = None
268 self.databuilder = None
269 self.data = None
270 self.build_pids = {}
271 self.build_pipes = {}
272
273 signal.signal(signal.SIGTERM, self.sigterm_exception)
274 # Let SIGHUP exit as SIGTERM
275 signal.signal(signal.SIGHUP, self.sigterm_exception)
276
277 def sigterm_exception(self, signum, stackframe):
278 if signum == signal.SIGTERM:
279 bb.warn("Worker recieved SIGTERM, shutting down...")
280 elif signum == signal.SIGHUP:
281 bb.warn("Worker recieved SIGHUP, shutting down...")
282 self.handle_finishnow(None)
283 signal.signal(signal.SIGTERM, signal.SIG_DFL)
284 os.kill(os.getpid(), signal.SIGTERM)
285
286 def serve(self):
287 while True:
288 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
289 if self.input in ready or len(self.queue):
290 start = len(self.queue)
291 try:
292 self.queue = self.queue + self.input.read()
293 except (OSError, IOError):
294 pass
295 end = len(self.queue)
296 self.handle_item("cookerconfig", self.handle_cookercfg)
297 self.handle_item("workerdata", self.handle_workerdata)
298 self.handle_item("runtask", self.handle_runtask)
299 self.handle_item("finishnow", self.handle_finishnow)
300 self.handle_item("ping", self.handle_ping)
301 self.handle_item("quit", self.handle_quit)
302
303 for pipe in self.build_pipes:
304 self.build_pipes[pipe].read()
305 if len(self.build_pids):
306 self.process_waitpid()
307 worker_flush()
308
309
310 def handle_item(self, item, func):
311 if self.queue.startswith("<" + item + ">"):
312 index = self.queue.find("</" + item + ">")
313 while index != -1:
314 func(self.queue[(len(item) + 2):index])
315 self.queue = self.queue[(index + len(item) + 3):]
316 index = self.queue.find("</" + item + ">")
317
318 def handle_cookercfg(self, data):
319 self.cookercfg = pickle.loads(data)
320 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
321 self.databuilder.parseBaseConfiguration()
322 self.data = self.databuilder.data
323
324 def handle_workerdata(self, data):
325 self.workerdata = pickle.loads(data)
326 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
327 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
328 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
329 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
330 self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
331
332 def handle_ping(self, _):
333 workerlog_write("Handling ping\n")
334
335 logger.warn("Pong from bitbake-worker!")
336
337 def handle_quit(self, data):
338 workerlog_write("Handling quit\n")
339
340 global normalexit
341 normalexit = True
342 sys.exit(0)
343
344 def handle_runtask(self, data):
345 fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
346 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
347
348 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
349
350 self.build_pids[pid] = task
351 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
352
353 def process_waitpid(self):
354 """
355 Return none is there are no processes awaiting result collection, otherwise
356 collect the process exit codes and close the information pipe.
357 """
358 try:
359 pid, status = os.waitpid(-1, os.WNOHANG)
360 if pid == 0 or os.WIFSTOPPED(status):
361 return None
362 except OSError:
363 return None
364
365 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
366
367 if os.WIFEXITED(status):
368 status = os.WEXITSTATUS(status)
369 elif os.WIFSIGNALED(status):
370 # Per shell conventions for $?, when a process exits due to
371 # a signal, we return an exit code of 128 + SIGNUM
372 status = 128 + os.WTERMSIG(status)
373
374 task = self.build_pids[pid]
375 del self.build_pids[pid]
376
377 self.build_pipes[pid].close()
378 del self.build_pipes[pid]
379
380 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
381
382 def handle_finishnow(self, _):
383 if self.build_pids:
384 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
385 for k, v in self.build_pids.iteritems():
386 try:
387 os.kill(-k, signal.SIGTERM)
388 os.waitpid(-1, 0)
389 except:
390 pass
391 for pipe in self.build_pipes:
392 self.build_pipes[pipe].read()
393
394try:
395 worker = BitbakeWorker(sys.stdin)
396 if not profiling:
397 worker.serve()
398 else:
399 profname = "profile-worker.log"
400 prof = profile.Profile()
401 try:
402 profile.Profile.runcall(prof, worker.serve)
403 finally:
404 prof.dump_stats(profname)
405 bb.utils.process_profilelog(profname)
406except BaseException as e:
407 if not normalexit:
408 import traceback
409 sys.stderr.write(traceback.format_exc())
410 sys.stderr.write(str(e))
411while len(worker_queue):
412 worker_flush()
413workerlog_write("exitting")
414sys.exit(0)
415