summaryrefslogtreecommitdiffstats
path: root/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-xbitbake/bin/bitbake-worker375
1 files changed, 375 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
new file mode 100755
index 0000000000..68e2bf4571
--- /dev/null
+++ b/bitbake/bin/bitbake-worker
@@ -0,0 +1,375 @@
1#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12import signal
13
14# Users shouldn't be running this code directly
15if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
16 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
17 sys.exit(1)
18
19logger = logging.getLogger("BitBake")
20
21try:
22 import cPickle as pickle
23except ImportError:
24 import pickle
25 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
26
27
28worker_pipe = sys.stdout.fileno()
29bb.utils.nonblockingfd(worker_pipe)
30
31handler = bb.event.LogHandler()
32logger.addHandler(handler)
33
34if 0:
35 # Code to write out a log file of all events passing through the worker
36 logfilename = "/tmp/workerlogfile"
37 format_str = "%(levelname)s: %(message)s"
38 conlogformat = bb.msg.BBLogFormatter(format_str)
39 consolelog = logging.FileHandler(logfilename)
40 bb.msg.addDefaultlogFilter(consolelog)
41 consolelog.setFormatter(conlogformat)
42 logger.addHandler(consolelog)
43
44worker_queue = ""
45
46def worker_fire(event, d):
47 data = "<event>" + pickle.dumps(event) + "</event>"
48 worker_fire_prepickled(data)
49
50def worker_fire_prepickled(event):
51 global worker_queue
52
53 worker_queue = worker_queue + event
54 worker_flush()
55
56def worker_flush():
57 global worker_queue, worker_pipe
58
59 if not worker_queue:
60 return
61
62 try:
63 written = os.write(worker_pipe, worker_queue)
64 worker_queue = worker_queue[written:]
65 except (IOError, OSError) as e:
66 if e.errno != errno.EAGAIN:
67 raise
68
69def worker_child_fire(event, d):
70 global worker_pipe
71
72 data = "<event>" + pickle.dumps(event) + "</event>"
73 worker_pipe.write(data)
74
75bb.event.worker_fire = worker_fire
76
77lf = None
78#lf = open("/tmp/workercommandlog", "w+")
79def workerlog_write(msg):
80 if lf:
81 lf.write(msg)
82 lf.flush()
83
84def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
85 # We need to setup the environment BEFORE the fork, since
86 # a fork() or exec*() activates PSEUDO...
87
88 envbackup = {}
89 fakeenv = {}
90 umask = None
91
92 taskdep = workerdata["taskdeps"][fn]
93 if 'umask' in taskdep and taskname in taskdep['umask']:
94 # umask might come in as a number or text string..
95 try:
96 umask = int(taskdep['umask'][taskname],8)
97 except TypeError:
98 umask = taskdep['umask'][taskname]
99
100 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
101 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
102 envvars = (workerdata["fakerootenv"][fn] or "").split()
103 for key, value in (var.split('=') for var in envvars):
104 envbackup[key] = os.environ.get(key)
105 os.environ[key] = value
106 fakeenv[key] = value
107
108 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
109 for p in fakedirs:
110 bb.utils.mkdirhier(p)
111 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
112 (fn, taskname, ', '.join(fakedirs)))
113 else:
114 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
115 for key, value in (var.split('=') for var in envvars):
116 envbackup[key] = os.environ.get(key)
117 os.environ[key] = value
118 fakeenv[key] = value
119
120 sys.stdout.flush()
121 sys.stderr.flush()
122
123 try:
124 pipein, pipeout = os.pipe()
125 pipein = os.fdopen(pipein, 'rb', 4096)
126 pipeout = os.fdopen(pipeout, 'wb', 0)
127 pid = os.fork()
128 except OSError as e:
129 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
130
131 if pid == 0:
132 global worker_pipe
133 pipein.close()
134
135 signal.signal(signal.SIGTERM, signal.SIG_DFL)
136
137 # Save out the PID so that the event can include it the
138 # events
139 bb.event.worker_pid = os.getpid()
140 bb.event.worker_fire = worker_child_fire
141 worker_pipe = pipeout
142
143 # Make the child the process group leader
144 os.setpgid(0, 0)
145 # No stdin
146 newsi = os.open(os.devnull, os.O_RDWR)
147 os.dup2(newsi, sys.stdin.fileno())
148
149 if umask:
150 os.umask(umask)
151
152 data.setVar("BB_WORKERCONTEXT", "1")
153 data.setVar("BB_TASKDEPDATA", taskdepdata)
154 data.setVar("BUILDNAME", workerdata["buildname"])
155 data.setVar("DATE", workerdata["date"])
156 data.setVar("TIME", workerdata["time"])
157 bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
158 ret = 0
159 try:
160 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
161 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
162 for h in workerdata["hashes"]:
163 the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
164 for h in workerdata["hash_deps"]:
165 the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
166
167 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
168 # successfully. We also need to unset anything from the environment which shouldn't be there
169 exports = bb.data.exported_vars(the_data)
170 bb.utils.empty_environment()
171 for e, v in exports:
172 os.environ[e] = v
173 for e in fakeenv:
174 os.environ[e] = fakeenv[e]
175 the_data.setVar(e, fakeenv[e])
176 the_data.setVarFlag(e, 'export', "1")
177
178 if quieterrors:
179 the_data.setVarFlag(taskname, "quieterrors", "1")
180
181 except Exception as exc:
182 if not quieterrors:
183 logger.critical(str(exc))
184 os._exit(1)
185 try:
186 if not cfg.dry_run:
187 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
188 os._exit(ret)
189 except:
190 os._exit(1)
191 else:
192 for key, value in envbackup.iteritems():
193 if value is None:
194 del os.environ[key]
195 else:
196 os.environ[key] = value
197
198 return pid, pipein, pipeout
199
200class runQueueWorkerPipe():
201 """
202 Abstraction for a pipe between a worker thread and the worker server
203 """
204 def __init__(self, pipein, pipeout):
205 self.input = pipein
206 if pipeout:
207 pipeout.close()
208 bb.utils.nonblockingfd(self.input)
209 self.queue = ""
210
211 def read(self):
212 start = len(self.queue)
213 try:
214 self.queue = self.queue + self.input.read(102400)
215 except (OSError, IOError) as e:
216 if e.errno != errno.EAGAIN:
217 raise
218
219 end = len(self.queue)
220 index = self.queue.find("</event>")
221 while index != -1:
222 worker_fire_prepickled(self.queue[:index+8])
223 self.queue = self.queue[index+8:]
224 index = self.queue.find("</event>")
225 return (end > start)
226
227 def close(self):
228 while self.read():
229 continue
230 if len(self.queue) > 0:
231 print("Warning, worker child left partial message: %s" % self.queue)
232 self.input.close()
233
234normalexit = False
235
236class BitbakeWorker(object):
237 def __init__(self, din):
238 self.input = din
239 bb.utils.nonblockingfd(self.input)
240 self.queue = ""
241 self.cookercfg = None
242 self.databuilder = None
243 self.data = None
244 self.build_pids = {}
245 self.build_pipes = {}
246
247 signal.signal(signal.SIGTERM, self.sigterm_exception)
248
249 def sigterm_exception(self, signum, stackframe):
250 bb.warn("Worker recieved SIGTERM, shutting down...")
251 self.handle_finishnow(None)
252 signal.signal(signal.SIGTERM, signal.SIG_DFL)
253 os.kill(os.getpid(), signal.SIGTERM)
254
255 def serve(self):
256 while True:
257 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
258 if self.input in ready or len(self.queue):
259 start = len(self.queue)
260 try:
261 self.queue = self.queue + self.input.read()
262 except (OSError, IOError):
263 pass
264 end = len(self.queue)
265 self.handle_item("cookerconfig", self.handle_cookercfg)
266 self.handle_item("workerdata", self.handle_workerdata)
267 self.handle_item("runtask", self.handle_runtask)
268 self.handle_item("finishnow", self.handle_finishnow)
269 self.handle_item("ping", self.handle_ping)
270 self.handle_item("quit", self.handle_quit)
271
272 for pipe in self.build_pipes:
273 self.build_pipes[pipe].read()
274 if len(self.build_pids):
275 self.process_waitpid()
276 worker_flush()
277
278
279 def handle_item(self, item, func):
280 if self.queue.startswith("<" + item + ">"):
281 index = self.queue.find("</" + item + ">")
282 while index != -1:
283 func(self.queue[(len(item) + 2):index])
284 self.queue = self.queue[(index + len(item) + 3):]
285 index = self.queue.find("</" + item + ">")
286
287 def handle_cookercfg(self, data):
288 self.cookercfg = pickle.loads(data)
289 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
290 self.databuilder.parseBaseConfiguration()
291 self.data = self.databuilder.data
292
293 def handle_workerdata(self, data):
294 self.workerdata = pickle.loads(data)
295 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
296 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
297 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
298 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
299 self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
300
301 def handle_ping(self, _):
302 workerlog_write("Handling ping\n")
303
304 logger.warn("Pong from bitbake-worker!")
305
306 def handle_quit(self, data):
307 workerlog_write("Handling quit\n")
308
309 global normalexit
310 normalexit = True
311 sys.exit(0)
312
313 def handle_runtask(self, data):
314 fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
315 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
316
317 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
318
319 self.build_pids[pid] = task
320 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
321
322 def process_waitpid(self):
323 """
324 Return none is there are no processes awaiting result collection, otherwise
325 collect the process exit codes and close the information pipe.
326 """
327 try:
328 pid, status = os.waitpid(-1, os.WNOHANG)
329 if pid == 0 or os.WIFSTOPPED(status):
330 return None
331 except OSError:
332 return None
333
334 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
335
336 if os.WIFEXITED(status):
337 status = os.WEXITSTATUS(status)
338 elif os.WIFSIGNALED(status):
339 # Per shell conventions for $?, when a process exits due to
340 # a signal, we return an exit code of 128 + SIGNUM
341 status = 128 + os.WTERMSIG(status)
342
343 task = self.build_pids[pid]
344 del self.build_pids[pid]
345
346 self.build_pipes[pid].close()
347 del self.build_pipes[pid]
348
349 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
350
351 def handle_finishnow(self, _):
352 if self.build_pids:
353 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
354 for k, v in self.build_pids.iteritems():
355 try:
356 os.kill(-k, signal.SIGTERM)
357 os.waitpid(-1, 0)
358 except:
359 pass
360 for pipe in self.build_pipes:
361 self.build_pipes[pipe].read()
362
363try:
364 worker = BitbakeWorker(sys.stdin)
365 worker.serve()
366except BaseException as e:
367 if not normalexit:
368 import traceback
369 sys.stderr.write(traceback.format_exc())
370 sys.stderr.write(str(e))
371while len(worker_queue):
372 worker_flush()
373workerlog_write("exitting")
374sys.exit(0)
375