summaryrefslogtreecommitdiffstats
path: root/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-xbitbake/bin/bitbake-worker358
1 files changed, 358 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
new file mode 100755
index 0000000000..8edf8dd658
--- /dev/null
+++ b/bitbake/bin/bitbake-worker
@@ -0,0 +1,358 @@
1#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12
13# Users shouldn't be running this code directly
14if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
15 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
16 sys.exit(1)
17
18logger = logging.getLogger("BitBake")
19
20try:
21 import cPickle as pickle
22except ImportError:
23 import pickle
24 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
25
26
27worker_pipe = sys.stdout.fileno()
28bb.utils.nonblockingfd(worker_pipe)
29
30handler = bb.event.LogHandler()
31logger.addHandler(handler)
32
33if 0:
34 # Code to write out a log file of all events passing through the worker
35 logfilename = "/tmp/workerlogfile"
36 format_str = "%(levelname)s: %(message)s"
37 conlogformat = bb.msg.BBLogFormatter(format_str)
38 consolelog = logging.FileHandler(logfilename)
39 bb.msg.addDefaultlogFilter(consolelog)
40 consolelog.setFormatter(conlogformat)
41 logger.addHandler(consolelog)
42
43worker_queue = ""
44
45def worker_fire(event, d):
46 data = "<event>" + pickle.dumps(event) + "</event>"
47 worker_fire_prepickled(data)
48
49def worker_fire_prepickled(event):
50 global worker_queue
51
52 worker_queue = worker_queue + event
53 worker_flush()
54
55def worker_flush():
56 global worker_queue, worker_pipe
57
58 if not worker_queue:
59 return
60
61 try:
62 written = os.write(worker_pipe, worker_queue)
63 worker_queue = worker_queue[written:]
64 except (IOError, OSError) as e:
65 if e.errno != errno.EAGAIN:
66 raise
67
68def worker_child_fire(event, d):
69 global worker_pipe
70
71 data = "<event>" + pickle.dumps(event) + "</event>"
72 worker_pipe.write(data)
73
74bb.event.worker_fire = worker_fire
75
76lf = None
77#lf = open("/tmp/workercommandlog", "w+")
78def workerlog_write(msg):
79 if lf:
80 lf.write(msg)
81 lf.flush()
82
83def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False):
84 # We need to setup the environment BEFORE the fork, since
85 # a fork() or exec*() activates PSEUDO...
86
87 envbackup = {}
88 fakeenv = {}
89 umask = None
90
91 taskdep = workerdata["taskdeps"][fn]
92 if 'umask' in taskdep and taskname in taskdep['umask']:
93 # umask might come in as a number or text string..
94 try:
95 umask = int(taskdep['umask'][taskname],8)
96 except TypeError:
97 umask = taskdep['umask'][taskname]
98
99 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
100 envvars = (workerdata["fakerootenv"][fn] or "").split()
101 for key, value in (var.split('=') for var in envvars):
102 envbackup[key] = os.environ.get(key)
103 os.environ[key] = value
104 fakeenv[key] = value
105
106 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
107 for p in fakedirs:
108 bb.utils.mkdirhier(p)
109 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
110 (fn, taskname, ', '.join(fakedirs)))
111 else:
112 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
113 for key, value in (var.split('=') for var in envvars):
114 envbackup[key] = os.environ.get(key)
115 os.environ[key] = value
116 fakeenv[key] = value
117
118 sys.stdout.flush()
119 sys.stderr.flush()
120
121 try:
122 pipein, pipeout = os.pipe()
123 pipein = os.fdopen(pipein, 'rb', 4096)
124 pipeout = os.fdopen(pipeout, 'wb', 0)
125 pid = os.fork()
126 except OSError as e:
127 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
128
129 if pid == 0:
130 global worker_pipe
131 pipein.close()
132
133 # Save out the PID so that the event can include it the
134 # events
135 bb.event.worker_pid = os.getpid()
136 bb.event.worker_fire = worker_child_fire
137 worker_pipe = pipeout
138
139 # Make the child the process group leader
140 os.setpgid(0, 0)
141 # No stdin
142 newsi = os.open(os.devnull, os.O_RDWR)
143 os.dup2(newsi, sys.stdin.fileno())
144
145 if umask:
146 os.umask(umask)
147
148 data.setVar("BB_WORKERCONTEXT", "1")
149 bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
150 ret = 0
151 try:
152 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
153 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
154 for h in workerdata["hashes"]:
155 the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
156 for h in workerdata["hash_deps"]:
157 the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
158
159 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
160 # successfully. We also need to unset anything from the environment which shouldn't be there
161 exports = bb.data.exported_vars(the_data)
162 bb.utils.empty_environment()
163 for e, v in exports:
164 os.environ[e] = v
165 for e in fakeenv:
166 os.environ[e] = fakeenv[e]
167 the_data.setVar(e, fakeenv[e])
168 the_data.setVarFlag(e, 'export', "1")
169
170 if quieterrors:
171 the_data.setVarFlag(taskname, "quieterrors", "1")
172
173 except Exception as exc:
174 if not quieterrors:
175 logger.critical(str(exc))
176 os._exit(1)
177 try:
178 if not cfg.dry_run:
179 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
180 os._exit(ret)
181 except:
182 os._exit(1)
183 else:
184 for key, value in envbackup.iteritems():
185 if value is None:
186 del os.environ[key]
187 else:
188 os.environ[key] = value
189
190 return pid, pipein, pipeout
191
192class runQueueWorkerPipe():
193 """
194 Abstraction for a pipe between a worker thread and the worker server
195 """
196 def __init__(self, pipein, pipeout):
197 self.input = pipein
198 if pipeout:
199 pipeout.close()
200 bb.utils.nonblockingfd(self.input)
201 self.queue = ""
202
203 def read(self):
204 start = len(self.queue)
205 try:
206 self.queue = self.queue + self.input.read(102400)
207 except (OSError, IOError) as e:
208 if e.errno != errno.EAGAIN:
209 raise
210
211 end = len(self.queue)
212 index = self.queue.find("</event>")
213 while index != -1:
214 worker_fire_prepickled(self.queue[:index+8])
215 self.queue = self.queue[index+8:]
216 index = self.queue.find("</event>")
217 return (end > start)
218
219 def close(self):
220 while self.read():
221 continue
222 if len(self.queue) > 0:
223 print("Warning, worker child left partial message: %s" % self.queue)
224 self.input.close()
225
226normalexit = False
227
228class BitbakeWorker(object):
229 def __init__(self, din):
230 self.input = din
231 bb.utils.nonblockingfd(self.input)
232 self.queue = ""
233 self.cookercfg = None
234 self.databuilder = None
235 self.data = None
236 self.build_pids = {}
237 self.build_pipes = {}
238
239 def serve(self):
240 while True:
241 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
242 if self.input in ready or len(self.queue):
243 start = len(self.queue)
244 try:
245 self.queue = self.queue + self.input.read()
246 except (OSError, IOError):
247 pass
248 end = len(self.queue)
249 self.handle_item("cookerconfig", self.handle_cookercfg)
250 self.handle_item("workerdata", self.handle_workerdata)
251 self.handle_item("runtask", self.handle_runtask)
252 self.handle_item("finishnow", self.handle_finishnow)
253 self.handle_item("ping", self.handle_ping)
254 self.handle_item("quit", self.handle_quit)
255
256 for pipe in self.build_pipes:
257 self.build_pipes[pipe].read()
258 if len(self.build_pids):
259 self.process_waitpid()
260 worker_flush()
261
262
263 def handle_item(self, item, func):
264 if self.queue.startswith("<" + item + ">"):
265 index = self.queue.find("</" + item + ">")
266 while index != -1:
267 func(self.queue[(len(item) + 2):index])
268 self.queue = self.queue[(index + len(item) + 3):]
269 index = self.queue.find("</" + item + ">")
270
271 def handle_cookercfg(self, data):
272 self.cookercfg = pickle.loads(data)
273 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
274 self.databuilder.parseBaseConfiguration()
275 self.data = self.databuilder.data
276
277 def handle_workerdata(self, data):
278 self.workerdata = pickle.loads(data)
279 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
280 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
281 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
282 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
283
284 def handle_ping(self, _):
285 workerlog_write("Handling ping\n")
286
287 logger.warn("Pong from bitbake-worker!")
288
289 def handle_quit(self, data):
290 workerlog_write("Handling quit\n")
291
292 global normalexit
293 normalexit = True
294 sys.exit(0)
295
296 def handle_runtask(self, data):
297 fn, task, taskname, quieterrors, appends = pickle.loads(data)
298 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
299
300 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)
301
302 self.build_pids[pid] = task
303 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
304
305 def process_waitpid(self):
306 """
307 Return none is there are no processes awaiting result collection, otherwise
308 collect the process exit codes and close the information pipe.
309 """
310 try:
311 pid, status = os.waitpid(-1, os.WNOHANG)
312 if pid == 0 or os.WIFSTOPPED(status):
313 return None
314 except OSError:
315 return None
316
317 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
318
319 if os.WIFEXITED(status):
320 status = os.WEXITSTATUS(status)
321 elif os.WIFSIGNALED(status):
322 # Per shell conventions for $?, when a process exits due to
323 # a signal, we return an exit code of 128 + SIGNUM
324 status = 128 + os.WTERMSIG(status)
325
326 task = self.build_pids[pid]
327 del self.build_pids[pid]
328
329 self.build_pipes[pid].close()
330 del self.build_pipes[pid]
331
332 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
333
334 def handle_finishnow(self, _):
335 if self.build_pids:
336 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
337 for k, v in self.build_pids.iteritems():
338 try:
339 os.kill(-k, signal.SIGTERM)
340 os.waitpid(-1, 0)
341 except:
342 pass
343 for pipe in self.build_pipes:
344 self.build_pipes[pipe].read()
345
346try:
347 worker = BitbakeWorker(sys.stdin)
348 worker.serve()
349except BaseException as e:
350 if not normalexit:
351 import traceback
352 sys.stderr.write(traceback.format_exc())
353 sys.stderr.write(str(e))
354while len(worker_queue):
355 worker_flush()
356workerlog_write("exitting")
357sys.exit(0)
358