summaryrefslogtreecommitdiffstats
path: root/bitbake/bin
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-07 18:11:09 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-14 12:52:56 +0100
commitd0f0e5d9e69cc22f0c6635c7e416de93660c6bca (patch)
tree1877f962137e31bbf93be5c13763488fb2321886 /bitbake/bin
parentcd7b7de91a98e712e796a8d6a3a8e3741950396e (diff)
downloadpoky-d0f0e5d9e69cc22f0c6635c7e416de93660c6bca.tar.gz
bitbake: runqueue: Split runqueue to use bitbake-worker
This is a pretty fundamental change to the way bitbake operates. It splits out the task execution part of runqueue into a completely separately exec'd process called bitbake-worker. This means that the separate process has to build its own datastore and that configuration needs to be passed from the cooker over to the bitbake worker process. Known issues: * Hob is broken with this patch since it writes to the configuration and that configuration isn't preserved in bitbake-worker. * We create a worker for setscene, then a new worker for the main task execution. This is wasteful but shouldn't be hard to fix. * We probably send too much data over to bitbake-worker, need to see if we can streamline it. These are issues which will be followed up in subsequent patches. This patch sets the groundwork for the removal of the double bitbake execution for psuedo which will be in a follow on patch. (Bitbake rev: b2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/bin')
-rwxr-xr-xbitbake/bin/bitbake-worker358
1 files changed, 358 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
new file mode 100755
index 0000000000..8edf8dd658
--- /dev/null
+++ b/bitbake/bin/bitbake-worker
@@ -0,0 +1,358 @@
1#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12
13# Users shouldn't be running this code directly
14if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
15 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
16 sys.exit(1)
17
18logger = logging.getLogger("BitBake")
19
20try:
21 import cPickle as pickle
22except ImportError:
23 import pickle
24 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
25
26
27worker_pipe = sys.stdout.fileno()
28bb.utils.nonblockingfd(worker_pipe)
29
30handler = bb.event.LogHandler()
31logger.addHandler(handler)
32
33if 0:
34 # Code to write out a log file of all events passing through the worker
35 logfilename = "/tmp/workerlogfile"
36 format_str = "%(levelname)s: %(message)s"
37 conlogformat = bb.msg.BBLogFormatter(format_str)
38 consolelog = logging.FileHandler(logfilename)
39 bb.msg.addDefaultlogFilter(consolelog)
40 consolelog.setFormatter(conlogformat)
41 logger.addHandler(consolelog)
42
43worker_queue = ""
44
45def worker_fire(event, d):
46 data = "<event>" + pickle.dumps(event) + "</event>"
47 worker_fire_prepickled(data)
48
49def worker_fire_prepickled(event):
50 global worker_queue
51
52 worker_queue = worker_queue + event
53 worker_flush()
54
55def worker_flush():
56 global worker_queue, worker_pipe
57
58 if not worker_queue:
59 return
60
61 try:
62 written = os.write(worker_pipe, worker_queue)
63 worker_queue = worker_queue[written:]
64 except (IOError, OSError) as e:
65 if e.errno != errno.EAGAIN:
66 raise
67
68def worker_child_fire(event, d):
69 global worker_pipe
70
71 data = "<event>" + pickle.dumps(event) + "</event>"
72 worker_pipe.write(data)
73
74bb.event.worker_fire = worker_fire
75
76lf = None
77#lf = open("/tmp/workercommandlog", "w+")
78def workerlog_write(msg):
79 if lf:
80 lf.write(msg)
81 lf.flush()
82
83def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False):
84 # We need to setup the environment BEFORE the fork, since
85 # a fork() or exec*() activates PSEUDO...
86
87 envbackup = {}
88 fakeenv = {}
89 umask = None
90
91 taskdep = workerdata["taskdeps"][fn]
92 if 'umask' in taskdep and taskname in taskdep['umask']:
93 # umask might come in as a number or text string..
94 try:
95 umask = int(taskdep['umask'][taskname],8)
96 except TypeError:
97 umask = taskdep['umask'][taskname]
98
99 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
100 envvars = (workerdata["fakerootenv"][fn] or "").split()
101 for key, value in (var.split('=') for var in envvars):
102 envbackup[key] = os.environ.get(key)
103 os.environ[key] = value
104 fakeenv[key] = value
105
106 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
107 for p in fakedirs:
108 bb.utils.mkdirhier(p)
109 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
110 (fn, taskname, ', '.join(fakedirs)))
111 else:
112 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
113 for key, value in (var.split('=') for var in envvars):
114 envbackup[key] = os.environ.get(key)
115 os.environ[key] = value
116 fakeenv[key] = value
117
118 sys.stdout.flush()
119 sys.stderr.flush()
120
121 try:
122 pipein, pipeout = os.pipe()
123 pipein = os.fdopen(pipein, 'rb', 4096)
124 pipeout = os.fdopen(pipeout, 'wb', 0)
125 pid = os.fork()
126 except OSError as e:
127 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
128
129 if pid == 0:
130 global worker_pipe
131 pipein.close()
132
133 # Save out the PID so that the event can include it the
134 # events
135 bb.event.worker_pid = os.getpid()
136 bb.event.worker_fire = worker_child_fire
137 worker_pipe = pipeout
138
139 # Make the child the process group leader
140 os.setpgid(0, 0)
141 # No stdin
142 newsi = os.open(os.devnull, os.O_RDWR)
143 os.dup2(newsi, sys.stdin.fileno())
144
145 if umask:
146 os.umask(umask)
147
148 data.setVar("BB_WORKERCONTEXT", "1")
149 bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
150 ret = 0
151 try:
152 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
153 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
154 for h in workerdata["hashes"]:
155 the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
156 for h in workerdata["hash_deps"]:
157 the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
158
159 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
160 # successfully. We also need to unset anything from the environment which shouldn't be there
161 exports = bb.data.exported_vars(the_data)
162 bb.utils.empty_environment()
163 for e, v in exports:
164 os.environ[e] = v
165 for e in fakeenv:
166 os.environ[e] = fakeenv[e]
167 the_data.setVar(e, fakeenv[e])
168 the_data.setVarFlag(e, 'export', "1")
169
170 if quieterrors:
171 the_data.setVarFlag(taskname, "quieterrors", "1")
172
173 except Exception as exc:
174 if not quieterrors:
175 logger.critical(str(exc))
176 os._exit(1)
177 try:
178 if not cfg.dry_run:
179 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
180 os._exit(ret)
181 except:
182 os._exit(1)
183 else:
184 for key, value in envbackup.iteritems():
185 if value is None:
186 del os.environ[key]
187 else:
188 os.environ[key] = value
189
190 return pid, pipein, pipeout
191
192class runQueueWorkerPipe():
193 """
194 Abstraction for a pipe between a worker thread and the worker server
195 """
196 def __init__(self, pipein, pipeout):
197 self.input = pipein
198 if pipeout:
199 pipeout.close()
200 bb.utils.nonblockingfd(self.input)
201 self.queue = ""
202
203 def read(self):
204 start = len(self.queue)
205 try:
206 self.queue = self.queue + self.input.read(102400)
207 except (OSError, IOError) as e:
208 if e.errno != errno.EAGAIN:
209 raise
210
211 end = len(self.queue)
212 index = self.queue.find("</event>")
213 while index != -1:
214 worker_fire_prepickled(self.queue[:index+8])
215 self.queue = self.queue[index+8:]
216 index = self.queue.find("</event>")
217 return (end > start)
218
219 def close(self):
220 while self.read():
221 continue
222 if len(self.queue) > 0:
223 print("Warning, worker child left partial message: %s" % self.queue)
224 self.input.close()
225
226normalexit = False
227
228class BitbakeWorker(object):
229 def __init__(self, din):
230 self.input = din
231 bb.utils.nonblockingfd(self.input)
232 self.queue = ""
233 self.cookercfg = None
234 self.databuilder = None
235 self.data = None
236 self.build_pids = {}
237 self.build_pipes = {}
238
239 def serve(self):
240 while True:
241 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
242 if self.input in ready or len(self.queue):
243 start = len(self.queue)
244 try:
245 self.queue = self.queue + self.input.read()
246 except (OSError, IOError):
247 pass
248 end = len(self.queue)
249 self.handle_item("cookerconfig", self.handle_cookercfg)
250 self.handle_item("workerdata", self.handle_workerdata)
251 self.handle_item("runtask", self.handle_runtask)
252 self.handle_item("finishnow", self.handle_finishnow)
253 self.handle_item("ping", self.handle_ping)
254 self.handle_item("quit", self.handle_quit)
255
256 for pipe in self.build_pipes:
257 self.build_pipes[pipe].read()
258 if len(self.build_pids):
259 self.process_waitpid()
260 worker_flush()
261
262
263 def handle_item(self, item, func):
264 if self.queue.startswith("<" + item + ">"):
265 index = self.queue.find("</" + item + ">")
266 while index != -1:
267 func(self.queue[(len(item) + 2):index])
268 self.queue = self.queue[(index + len(item) + 3):]
269 index = self.queue.find("</" + item + ">")
270
271 def handle_cookercfg(self, data):
272 self.cookercfg = pickle.loads(data)
273 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
274 self.databuilder.parseBaseConfiguration()
275 self.data = self.databuilder.data
276
277 def handle_workerdata(self, data):
278 self.workerdata = pickle.loads(data)
279 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
280 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
281 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
282 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
283
284 def handle_ping(self, _):
285 workerlog_write("Handling ping\n")
286
287 logger.warn("Pong from bitbake-worker!")
288
289 def handle_quit(self, data):
290 workerlog_write("Handling quit\n")
291
292 global normalexit
293 normalexit = True
294 sys.exit(0)
295
296 def handle_runtask(self, data):
297 fn, task, taskname, quieterrors, appends = pickle.loads(data)
298 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
299
300 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)
301
302 self.build_pids[pid] = task
303 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
304
305 def process_waitpid(self):
306 """
307 Return none is there are no processes awaiting result collection, otherwise
308 collect the process exit codes and close the information pipe.
309 """
310 try:
311 pid, status = os.waitpid(-1, os.WNOHANG)
312 if pid == 0 or os.WIFSTOPPED(status):
313 return None
314 except OSError:
315 return None
316
317 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
318
319 if os.WIFEXITED(status):
320 status = os.WEXITSTATUS(status)
321 elif os.WIFSIGNALED(status):
322 # Per shell conventions for $?, when a process exits due to
323 # a signal, we return an exit code of 128 + SIGNUM
324 status = 128 + os.WTERMSIG(status)
325
326 task = self.build_pids[pid]
327 del self.build_pids[pid]
328
329 self.build_pipes[pid].close()
330 del self.build_pipes[pid]
331
332 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
333
334 def handle_finishnow(self, _):
335 if self.build_pids:
336 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
337 for k, v in self.build_pids.iteritems():
338 try:
339 os.kill(-k, signal.SIGTERM)
340 os.waitpid(-1, 0)
341 except:
342 pass
343 for pipe in self.build_pipes:
344 self.build_pipes[pipe].read()
345
346try:
347 worker = BitbakeWorker(sys.stdin)
348 worker.serve()
349except BaseException as e:
350 if not normalexit:
351 import traceback
352 sys.stderr.write(traceback.format_exc())
353 sys.stderr.write(str(e))
354while len(worker_queue):
355 worker_flush()
356workerlog_write("exitting")
357sys.exit(0)
358