summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-07 18:11:09 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-14 12:52:56 +0100
commitd0f0e5d9e69cc22f0c6635c7e416de93660c6bca (patch)
tree1877f962137e31bbf93be5c13763488fb2321886 /bitbake
parentcd7b7de91a98e712e796a8d6a3a8e3741950396e (diff)
downloadpoky-d0f0e5d9e69cc22f0c6635c7e416de93660c6bca.tar.gz
bitbake: runqueue: Split runqueue to use bitbake-worker
This is a pretty fundamental change to the way bitbake operates. It splits out the task execution part of runqueue into a completely separately exec'd process called bitbake-worker. This means that the separate process has to build its own datastore and that configuration needs to be passed from the cooker over to the bitbake worker process. Known issues: * Hob is broken with this patch since it writes to the configuration and that configuration isn't preserved in bitbake-worker. * We create a worker for setscene, then a new worker for the main task execution. This is wasteful but shouldn't be hard to fix. * We probably send too much data over to bitbake-worker, need to see if we can streamline it. These are issues which will be followed up in subsequent patches. This patch sets the groundwork for the removal of the double bitbake execution for psuedo which will be in a follow on patch. (Bitbake rev: b2e26f1db28d74f2dd9df8ab4ed3b472503b9a5c) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rwxr-xr-xbitbake/bin/bitbake-worker358
-rw-r--r--bitbake/lib/bb/cache.py6
-rw-r--r--bitbake/lib/bb/cookerdata.py18
-rw-r--r--bitbake/lib/bb/event.py13
-rw-r--r--bitbake/lib/bb/runqueue.py284
-rw-r--r--bitbake/lib/bb/siggen.py3
6 files changed, 492 insertions, 190 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
new file mode 100755
index 0000000000..8edf8dd658
--- /dev/null
+++ b/bitbake/bin/bitbake-worker
@@ -0,0 +1,358 @@
1#!/usr/bin/env python
2
3import os
4import sys
5import warnings
6sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
7from bb import fetch2
8import logging
9import bb
10import select
11import errno
12
13# Users shouldn't be running this code directly
14if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
15 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
16 sys.exit(1)
17
18logger = logging.getLogger("BitBake")
19
20try:
21 import cPickle as pickle
22except ImportError:
23 import pickle
24 bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
25
26
27worker_pipe = sys.stdout.fileno()
28bb.utils.nonblockingfd(worker_pipe)
29
30handler = bb.event.LogHandler()
31logger.addHandler(handler)
32
33if 0:
34 # Code to write out a log file of all events passing through the worker
35 logfilename = "/tmp/workerlogfile"
36 format_str = "%(levelname)s: %(message)s"
37 conlogformat = bb.msg.BBLogFormatter(format_str)
38 consolelog = logging.FileHandler(logfilename)
39 bb.msg.addDefaultlogFilter(consolelog)
40 consolelog.setFormatter(conlogformat)
41 logger.addHandler(consolelog)
42
43worker_queue = ""
44
45def worker_fire(event, d):
46 data = "<event>" + pickle.dumps(event) + "</event>"
47 worker_fire_prepickled(data)
48
49def worker_fire_prepickled(event):
50 global worker_queue
51
52 worker_queue = worker_queue + event
53 worker_flush()
54
55def worker_flush():
56 global worker_queue, worker_pipe
57
58 if not worker_queue:
59 return
60
61 try:
62 written = os.write(worker_pipe, worker_queue)
63 worker_queue = worker_queue[written:]
64 except (IOError, OSError) as e:
65 if e.errno != errno.EAGAIN:
66 raise
67
68def worker_child_fire(event, d):
69 global worker_pipe
70
71 data = "<event>" + pickle.dumps(event) + "</event>"
72 worker_pipe.write(data)
73
74bb.event.worker_fire = worker_fire
75
76lf = None
77#lf = open("/tmp/workercommandlog", "w+")
78def workerlog_write(msg):
79 if lf:
80 lf.write(msg)
81 lf.flush()
82
83def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False):
84 # We need to setup the environment BEFORE the fork, since
85 # a fork() or exec*() activates PSEUDO...
86
87 envbackup = {}
88 fakeenv = {}
89 umask = None
90
91 taskdep = workerdata["taskdeps"][fn]
92 if 'umask' in taskdep and taskname in taskdep['umask']:
93 # umask might come in as a number or text string..
94 try:
95 umask = int(taskdep['umask'][taskname],8)
96 except TypeError:
97 umask = taskdep['umask'][taskname]
98
99 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
100 envvars = (workerdata["fakerootenv"][fn] or "").split()
101 for key, value in (var.split('=') for var in envvars):
102 envbackup[key] = os.environ.get(key)
103 os.environ[key] = value
104 fakeenv[key] = value
105
106 fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
107 for p in fakedirs:
108 bb.utils.mkdirhier(p)
109 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
110 (fn, taskname, ', '.join(fakedirs)))
111 else:
112 envvars = (workerdata["fakerootnoenv"][fn] or "").split()
113 for key, value in (var.split('=') for var in envvars):
114 envbackup[key] = os.environ.get(key)
115 os.environ[key] = value
116 fakeenv[key] = value
117
118 sys.stdout.flush()
119 sys.stderr.flush()
120
121 try:
122 pipein, pipeout = os.pipe()
123 pipein = os.fdopen(pipein, 'rb', 4096)
124 pipeout = os.fdopen(pipeout, 'wb', 0)
125 pid = os.fork()
126 except OSError as e:
127 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
128
129 if pid == 0:
130 global worker_pipe
131 pipein.close()
132
133 # Save out the PID so that the event can include it the
134 # events
135 bb.event.worker_pid = os.getpid()
136 bb.event.worker_fire = worker_child_fire
137 worker_pipe = pipeout
138
139 # Make the child the process group leader
140 os.setpgid(0, 0)
141 # No stdin
142 newsi = os.open(os.devnull, os.O_RDWR)
143 os.dup2(newsi, sys.stdin.fileno())
144
145 if umask:
146 os.umask(umask)
147
148 data.setVar("BB_WORKERCONTEXT", "1")
149 bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
150 ret = 0
151 try:
152 the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
153 the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
154 for h in workerdata["hashes"]:
155 the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
156 for h in workerdata["hash_deps"]:
157 the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
158
159 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
160 # successfully. We also need to unset anything from the environment which shouldn't be there
161 exports = bb.data.exported_vars(the_data)
162 bb.utils.empty_environment()
163 for e, v in exports:
164 os.environ[e] = v
165 for e in fakeenv:
166 os.environ[e] = fakeenv[e]
167 the_data.setVar(e, fakeenv[e])
168 the_data.setVarFlag(e, 'export', "1")
169
170 if quieterrors:
171 the_data.setVarFlag(taskname, "quieterrors", "1")
172
173 except Exception as exc:
174 if not quieterrors:
175 logger.critical(str(exc))
176 os._exit(1)
177 try:
178 if not cfg.dry_run:
179 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
180 os._exit(ret)
181 except:
182 os._exit(1)
183 else:
184 for key, value in envbackup.iteritems():
185 if value is None:
186 del os.environ[key]
187 else:
188 os.environ[key] = value
189
190 return pid, pipein, pipeout
191
192class runQueueWorkerPipe():
193 """
194 Abstraction for a pipe between a worker thread and the worker server
195 """
196 def __init__(self, pipein, pipeout):
197 self.input = pipein
198 if pipeout:
199 pipeout.close()
200 bb.utils.nonblockingfd(self.input)
201 self.queue = ""
202
203 def read(self):
204 start = len(self.queue)
205 try:
206 self.queue = self.queue + self.input.read(102400)
207 except (OSError, IOError) as e:
208 if e.errno != errno.EAGAIN:
209 raise
210
211 end = len(self.queue)
212 index = self.queue.find("</event>")
213 while index != -1:
214 worker_fire_prepickled(self.queue[:index+8])
215 self.queue = self.queue[index+8:]
216 index = self.queue.find("</event>")
217 return (end > start)
218
219 def close(self):
220 while self.read():
221 continue
222 if len(self.queue) > 0:
223 print("Warning, worker child left partial message: %s" % self.queue)
224 self.input.close()
225
226normalexit = False
227
228class BitbakeWorker(object):
229 def __init__(self, din):
230 self.input = din
231 bb.utils.nonblockingfd(self.input)
232 self.queue = ""
233 self.cookercfg = None
234 self.databuilder = None
235 self.data = None
236 self.build_pids = {}
237 self.build_pipes = {}
238
239 def serve(self):
240 while True:
241 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
242 if self.input in ready or len(self.queue):
243 start = len(self.queue)
244 try:
245 self.queue = self.queue + self.input.read()
246 except (OSError, IOError):
247 pass
248 end = len(self.queue)
249 self.handle_item("cookerconfig", self.handle_cookercfg)
250 self.handle_item("workerdata", self.handle_workerdata)
251 self.handle_item("runtask", self.handle_runtask)
252 self.handle_item("finishnow", self.handle_finishnow)
253 self.handle_item("ping", self.handle_ping)
254 self.handle_item("quit", self.handle_quit)
255
256 for pipe in self.build_pipes:
257 self.build_pipes[pipe].read()
258 if len(self.build_pids):
259 self.process_waitpid()
260 worker_flush()
261
262
263 def handle_item(self, item, func):
264 if self.queue.startswith("<" + item + ">"):
265 index = self.queue.find("</" + item + ">")
266 while index != -1:
267 func(self.queue[(len(item) + 2):index])
268 self.queue = self.queue[(index + len(item) + 3):]
269 index = self.queue.find("</" + item + ">")
270
271 def handle_cookercfg(self, data):
272 self.cookercfg = pickle.loads(data)
273 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
274 self.databuilder.parseBaseConfiguration()
275 self.data = self.databuilder.data
276
277 def handle_workerdata(self, data):
278 self.workerdata = pickle.loads(data)
279 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
280 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
281 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
282 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
283
284 def handle_ping(self, _):
285 workerlog_write("Handling ping\n")
286
287 logger.warn("Pong from bitbake-worker!")
288
289 def handle_quit(self, data):
290 workerlog_write("Handling quit\n")
291
292 global normalexit
293 normalexit = True
294 sys.exit(0)
295
296 def handle_runtask(self, data):
297 fn, task, taskname, quieterrors, appends = pickle.loads(data)
298 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
299
300 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)
301
302 self.build_pids[pid] = task
303 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
304
305 def process_waitpid(self):
306 """
307 Return none is there are no processes awaiting result collection, otherwise
308 collect the process exit codes and close the information pipe.
309 """
310 try:
311 pid, status = os.waitpid(-1, os.WNOHANG)
312 if pid == 0 or os.WIFSTOPPED(status):
313 return None
314 except OSError:
315 return None
316
317 workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
318
319 if os.WIFEXITED(status):
320 status = os.WEXITSTATUS(status)
321 elif os.WIFSIGNALED(status):
322 # Per shell conventions for $?, when a process exits due to
323 # a signal, we return an exit code of 128 + SIGNUM
324 status = 128 + os.WTERMSIG(status)
325
326 task = self.build_pids[pid]
327 del self.build_pids[pid]
328
329 self.build_pipes[pid].close()
330 del self.build_pipes[pid]
331
332 worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
333
334 def handle_finishnow(self, _):
335 if self.build_pids:
336 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
337 for k, v in self.build_pids.iteritems():
338 try:
339 os.kill(-k, signal.SIGTERM)
340 os.waitpid(-1, 0)
341 except:
342 pass
343 for pipe in self.build_pipes:
344 self.build_pipes[pipe].read()
345
346try:
347 worker = BitbakeWorker(sys.stdin)
348 worker.serve()
349except BaseException as e:
350 if not normalexit:
351 import traceback
352 sys.stderr.write(traceback.format_exc())
353 sys.stderr.write(str(e))
354while len(worker_queue):
355 worker_flush()
356workerlog_write("exitting")
357sys.exit(0)
358
diff --git a/bitbake/lib/bb/cache.py b/bitbake/lib/bb/cache.py
index fb0f40c602..b99fa99cfb 100644
--- a/bitbake/lib/bb/cache.py
+++ b/bitbake/lib/bb/cache.py
@@ -724,7 +724,6 @@ class CacheData(object):
724 for info in info_array: 724 for info in info_array:
725 info.add_cacheData(self, fn) 725 info.add_cacheData(self, fn)
726 726
727
728class MultiProcessCache(object): 727class MultiProcessCache(object):
729 """ 728 """
730 BitBake multi-process cache implementation 729 BitBake multi-process cache implementation
@@ -746,13 +745,18 @@ class MultiProcessCache(object):
746 self.cachefile = os.path.join(cachedir, self.__class__.cache_file_name) 745 self.cachefile = os.path.join(cachedir, self.__class__.cache_file_name)
747 logger.debug(1, "Using cache in '%s'", self.cachefile) 746 logger.debug(1, "Using cache in '%s'", self.cachefile)
748 747
748 glf = bb.utils.lockfile(self.cachefile + ".lock")
749
749 try: 750 try:
750 with open(self.cachefile, "rb") as f: 751 with open(self.cachefile, "rb") as f:
751 p = pickle.Unpickler(f) 752 p = pickle.Unpickler(f)
752 data, version = p.load() 753 data, version = p.load()
753 except: 754 except:
755 bb.utils.unlockfile(glf)
754 return 756 return
755 757
758 bb.utils.unlockfile(glf)
759
756 if version != self.__class__.CACHE_VERSION: 760 if version != self.__class__.CACHE_VERSION:
757 return 761 return
758 762
diff --git a/bitbake/lib/bb/cookerdata.py b/bitbake/lib/bb/cookerdata.py
index 149878f402..1bed455d16 100644
--- a/bitbake/lib/bb/cookerdata.py
+++ b/bitbake/lib/bb/cookerdata.py
@@ -25,7 +25,9 @@
25import os, sys 25import os, sys
26from functools import wraps 26from functools import wraps
27import logging 27import logging
28import bb
28from bb import data 29from bb import data
30import bb.parse
29 31
30logger = logging.getLogger("BitBake") 32logger = logging.getLogger("BitBake")
31parselog = logging.getLogger("BitBake.Parsing") 33parselog = logging.getLogger("BitBake.Parsing")
@@ -139,6 +141,20 @@ class CookerConfiguration(object):
139 def setServerRegIdleCallback(self, srcb): 141 def setServerRegIdleCallback(self, srcb):
140 self.server_register_idlecallback = srcb 142 self.server_register_idlecallback = srcb
141 143
144 def __getstate__(self):
145 state = {}
146 for key in self.__dict__.keys():
147 if key == "server_register_idlecallback":
148 state[key] = None
149 else:
150 state[key] = getattr(self, key)
151 return state
152
153 def __setstate__(self,state):
154 for k in state:
155 setattr(self, k, state[k])
156
157
142def catch_parse_error(func): 158def catch_parse_error(func):
143 """Exception handling bits for our parsing""" 159 """Exception handling bits for our parsing"""
144 @wraps(func) 160 @wraps(func)
@@ -146,6 +162,8 @@ def catch_parse_error(func):
146 try: 162 try:
147 return func(fn, *args) 163 return func(fn, *args)
148 except (IOError, bb.parse.ParseError, bb.data_smart.ExpansionError) as exc: 164 except (IOError, bb.parse.ParseError, bb.data_smart.ExpansionError) as exc:
165 import traceback
166 parselog.critical( traceback.format_exc())
149 parselog.critical("Unable to parse %s: %s" % (fn, exc)) 167 parselog.critical("Unable to parse %s: %s" % (fn, exc))
150 sys.exit(1) 168 sys.exit(1)
151 return wrapped 169 return wrapped
diff --git a/bitbake/lib/bb/event.py b/bitbake/lib/bb/event.py
index 2826e3554f..d73067fcf9 100644
--- a/bitbake/lib/bb/event.py
+++ b/bitbake/lib/bb/event.py
@@ -33,11 +33,12 @@ import atexit
33import traceback 33import traceback
34import bb.utils 34import bb.utils
35import bb.compat 35import bb.compat
36import bb.exceptions
36 37
37# This is the pid for which we should generate the event. This is set when 38# This is the pid for which we should generate the event. This is set when
38# the runqueue forks off. 39# the runqueue forks off.
39worker_pid = 0 40worker_pid = 0
40worker_pipe = None 41worker_fire = None
41 42
42logger = logging.getLogger('BitBake.Event') 43logger = logging.getLogger('BitBake.Event')
43 44
@@ -150,20 +151,12 @@ def fire(event, d):
150 # don't have a datastore so the datastore context isn't a problem. 151 # don't have a datastore so the datastore context isn't a problem.
151 152
152 fire_class_handlers(event, d) 153 fire_class_handlers(event, d)
153 if worker_pid != 0: 154 if worker_fire:
154 worker_fire(event, d) 155 worker_fire(event, d)
155 else: 156 else:
156 fire_ui_handlers(event, d) 157 fire_ui_handlers(event, d)
157 158
158def worker_fire(event, d):
159 data = "<event>" + pickle.dumps(event) + "</event>"
160 worker_pipe.write(data)
161
162def fire_from_worker(event, d): 159def fire_from_worker(event, d):
163 if not event.startswith("<event>") or not event.endswith("</event>"):
164 print("Error, not an event %s" % event)
165 return
166 event = pickle.loads(event[7:-8])
167 fire_ui_handlers(event, d) 160 fire_ui_handlers(event, d)
168 161
169noop = lambda _: None 162noop = lambda _: None
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 090d1b56a2..dd6e071c37 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -28,10 +28,17 @@ import sys
28import signal 28import signal
29import stat 29import stat
30import fcntl 30import fcntl
31import errno
31import logging 32import logging
32import bb 33import bb
33from bb import msg, data, event 34from bb import msg, data, event
34from bb import monitordisk 35from bb import monitordisk
36import subprocess
37
38try:
39 import cPickle as pickle
40except ImportError:
41 import pickle
35 42
36bblogger = logging.getLogger("BitBake") 43bblogger = logging.getLogger("BitBake")
37logger = logging.getLogger("BitBake.RunQueue") 44logger = logging.getLogger("BitBake.RunQueue")
@@ -938,6 +945,10 @@ class RunQueue:
938 raise 945 raise
939 except: 946 except:
940 logger.error("An uncaught exception occured in runqueue, please see the failure below:") 947 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
948 try:
949 self.rqexe.teardown()
950 except:
951 pass
941 self.state = runQueueComplete 952 self.state = runQueueComplete
942 raise 953 raise
943 954
@@ -979,38 +990,41 @@ class RunQueueExecute:
979 self.runq_buildable = [] 990 self.runq_buildable = []
980 self.runq_running = [] 991 self.runq_running = []
981 self.runq_complete = [] 992 self.runq_complete = []
982 self.build_pids = {} 993
983 self.build_pipes = {}
984 self.build_stamps = {} 994 self.build_stamps = {}
985 self.failed_fnids = [] 995 self.failed_fnids = []
986 996
987 self.stampcache = {} 997 self.stampcache = {}
988 998
989 def runqueue_process_waitpid(self): 999 logger.debug(1, "Starting bitbake-worker")
990 """ 1000 self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
991 Return none is there are no processes awaiting result collection, otherwise 1001 bb.utils.nonblockingfd(self.worker.stdout)
992 collect the process exit codes and close the information pipe. 1002 self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self)
993 """ 1003
994 pid, status = os.waitpid(-1, os.WNOHANG) 1004 workerdata = {
995 if pid == 0 or os.WIFSTOPPED(status): 1005 "taskdeps" : self.rqdata.dataCache.task_deps,
996 return None 1006 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
997 1007 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
998 if os.WIFEXITED(status): 1008 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
999 status = os.WEXITSTATUS(status) 1009 "hashes" : self.rqdata.hashes,
1000 elif os.WIFSIGNALED(status): 1010 "hash_deps" : self.rqdata.hash_deps,
1001 # Per shell conventions for $?, when a process exits due to 1011 "sigchecksums" : bb.parse.siggen.file_checksum_values,
1002 # a signal, we return an exit code of 128 + SIGNUM 1012 "runq_hash" : self.rqdata.runq_hash,
1003 status = 128 + os.WTERMSIG(status) 1013 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
1004 1014 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
1005 task = self.build_pids[pid] 1015 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
1006 del self.build_pids[pid] 1016 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1007 1017 }
1008 self.build_pipes[pid].close() 1018
1009 del self.build_pipes[pid] 1019 self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
1020 self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
1021 self.worker.stdin.flush()
1022
1023 def runqueue_process_waitpid(self, task, status):
1010 1024
1011 # self.build_stamps[pid] may not exist when use shared work directory. 1025 # self.build_stamps[pid] may not exist when use shared work directory.
1012 if pid in self.build_stamps: 1026 if task in self.build_stamps:
1013 del self.build_stamps[pid] 1027 del self.build_stamps[task]
1014 1028
1015 if status != 0: 1029 if status != 0:
1016 self.task_fail(task, status) 1030 self.task_fail(task, status)
@@ -1019,16 +1033,11 @@ class RunQueueExecute:
1019 return True 1033 return True
1020 1034
1021 def finish_now(self): 1035 def finish_now(self):
1022 if self.stats.active: 1036
1023 logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active) 1037 self.worker.stdin.write("<finishnow></finishnow>")
1024 for k, v in self.build_pids.iteritems(): 1038 self.worker.stdin.flush()
1025 try: 1039
1026 os.kill(-k, signal.SIGTERM) 1040 self.teardown()
1027 os.waitpid(-1, 0)
1028 except:
1029 pass
1030 for pipe in self.build_pipes:
1031 self.build_pipes[pipe].read()
1032 1041
1033 if len(self.failed_fnids) != 0: 1042 if len(self.failed_fnids) != 0:
1034 self.rq.state = runQueueFailed 1043 self.rq.state = runQueueFailed
@@ -1040,14 +1049,13 @@ class RunQueueExecute:
1040 def finish(self): 1049 def finish(self):
1041 self.rq.state = runQueueCleanUp 1050 self.rq.state = runQueueCleanUp
1042 1051
1043 for pipe in self.build_pipes:
1044 self.build_pipes[pipe].read()
1045
1046 if self.stats.active > 0: 1052 if self.stats.active > 0:
1047 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) 1053 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1048 self.runqueue_process_waitpid() 1054 self.workerpipe.read()
1049 return 1055 return
1050 1056
1057 self.teardown()
1058
1051 if len(self.failed_fnids) != 0: 1059 if len(self.failed_fnids) != 0:
1052 self.rq.state = runQueueFailed 1060 self.rq.state = runQueueFailed
1053 return 1061 return
@@ -1055,115 +1063,6 @@ class RunQueueExecute:
1055 self.rq.state = runQueueComplete 1063 self.rq.state = runQueueComplete
1056 return 1064 return
1057 1065
1058 def fork_off_task(self, fn, task, taskname, quieterrors=False):
1059 # We need to setup the environment BEFORE the fork, since
1060 # a fork() or exec*() activates PSEUDO...
1061
1062 envbackup = {}
1063 fakeenv = {}
1064 umask = None
1065
1066 taskdep = self.rqdata.dataCache.task_deps[fn]
1067 if 'umask' in taskdep and taskname in taskdep['umask']:
1068 # umask might come in as a number or text string..
1069 try:
1070 umask = int(taskdep['umask'][taskname],8)
1071 except TypeError:
1072 umask = taskdep['umask'][taskname]
1073
1074 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1075 envvars = (self.rqdata.dataCache.fakerootenv[fn] or "").split()
1076 for key, value in (var.split('=') for var in envvars):
1077 envbackup[key] = os.environ.get(key)
1078 os.environ[key] = value
1079 fakeenv[key] = value
1080
1081 fakedirs = (self.rqdata.dataCache.fakerootdirs[fn] or "").split()
1082 for p in fakedirs:
1083 bb.utils.mkdirhier(p)
1084
1085 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
1086 (fn, taskname, ', '.join(fakedirs)))
1087 else:
1088 envvars = (self.rqdata.dataCache.fakerootnoenv[fn] or "").split()
1089 for key, value in (var.split('=') for var in envvars):
1090 envbackup[key] = os.environ.get(key)
1091 os.environ[key] = value
1092 fakeenv[key] = value
1093
1094 sys.stdout.flush()
1095 sys.stderr.flush()
1096 try:
1097 pipein, pipeout = os.pipe()
1098 pipein = os.fdopen(pipein, 'rb', 4096)
1099 pipeout = os.fdopen(pipeout, 'wb', 0)
1100 pid = os.fork()
1101 except OSError as e:
1102 bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
1103
1104 if pid == 0:
1105 pipein.close()
1106
1107 # Save out the PID so that the event can include it the
1108 # events
1109 bb.event.worker_pid = os.getpid()
1110 bb.event.worker_pipe = pipeout
1111
1112 self.rq.state = runQueueChildProcess
1113 # Make the child the process group leader
1114 os.setpgid(0, 0)
1115 # No stdin
1116 newsi = os.open(os.devnull, os.O_RDWR)
1117 os.dup2(newsi, sys.stdin.fileno())
1118
1119 if umask:
1120 os.umask(umask)
1121
1122 self.cooker.data.setVar("BB_WORKERCONTEXT", "1")
1123 bb.parse.siggen.set_taskdata(self.rqdata.hashes, self.rqdata.hash_deps)
1124 ret = 0
1125 try:
1126 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1127 the_data.setVar('BB_TASKHASH', self.rqdata.runq_hash[task])
1128 for h in self.rqdata.hashes:
1129 the_data.setVar("BBHASH_%s" % h, self.rqdata.hashes[h])
1130 for h in self.rqdata.hash_deps:
1131 the_data.setVar("BBHASHDEPS_%s" % h, self.rqdata.hash_deps[h])
1132
1133 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
1134 # successfully. We also need to unset anything from the environment which shouldn't be there
1135 exports = bb.data.exported_vars(the_data)
1136 bb.utils.empty_environment()
1137 for e, v in exports:
1138 os.environ[e] = v
1139 for e in fakeenv:
1140 os.environ[e] = fakeenv[e]
1141 the_data.setVar(e, fakeenv[e])
1142 the_data.setVarFlag(e, 'export', "1")
1143
1144 if quieterrors:
1145 the_data.setVarFlag(taskname, "quieterrors", "1")
1146
1147 except Exception as exc:
1148 if not quieterrors:
1149 logger.critical(str(exc))
1150 os._exit(1)
1151 try:
1152 if not self.cooker.configuration.dry_run:
1153 profile = self.cooker.configuration.profile
1154 ret = bb.build.exec_task(fn, taskname, the_data, profile)
1155 os._exit(ret)
1156 except:
1157 os._exit(1)
1158 else:
1159 for key, value in envbackup.iteritems():
1160 if value is None:
1161 del os.environ[key]
1162 else:
1163 os.environ[key] = value
1164
1165 return pid, pipein, pipeout
1166
1167 def check_dependencies(self, task, taskdeps, setscene = False): 1066 def check_dependencies(self, task, taskdeps, setscene = False):
1168 if not self.rq.depvalidate: 1067 if not self.rq.depvalidate:
1169 return False 1068 return False
@@ -1184,6 +1083,16 @@ class RunQueueExecute:
1184 valid = bb.utils.better_eval(call, locs) 1083 valid = bb.utils.better_eval(call, locs)
1185 return valid 1084 return valid
1186 1085
1086 def teardown(self):
1087 logger.debug(1, "Teardown for bitbake-worker")
1088 self.worker.stdin.write("<quit></quit>")
1089 self.worker.stdin.flush()
1090 while self.worker.returncode is None:
1091 self.workerpipe.read()
1092 self.worker.poll()
1093 while self.workerpipe.read():
1094 continue
1095
1187class RunQueueExecuteDummy(RunQueueExecute): 1096class RunQueueExecuteDummy(RunQueueExecute):
1188 def __init__(self, rq): 1097 def __init__(self, rq):
1189 self.rq = rq 1098 self.rq = rq
@@ -1275,7 +1184,6 @@ class RunQueueExecuteTasks(RunQueueExecute):
1275 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1184 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1276 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1185 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1277 1186
1278
1279 def get_schedulers(self): 1187 def get_schedulers(self):
1280 schedulers = set(obj for obj in globals().values() 1188 schedulers = set(obj for obj in globals().values()
1281 if type(obj) is type and 1189 if type(obj) is type and
@@ -1349,6 +1257,9 @@ class RunQueueExecuteTasks(RunQueueExecute):
1349 Run the tasks in a queue prepared by rqdata.prepare() 1257 Run the tasks in a queue prepared by rqdata.prepare()
1350 """ 1258 """
1351 1259
1260 self.workerpipe.read()
1261
1262
1352 if self.stats.total == 0: 1263 if self.stats.total == 0:
1353 # nothing to do 1264 # nothing to do
1354 self.rq.state = runQueueCleanUp 1265 self.rq.state = runQueueCleanUp
@@ -1384,23 +1295,20 @@ class RunQueueExecuteTasks(RunQueueExecute):
1384 startevent = runQueueTaskStarted(task, self.stats, self.rq) 1295 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1385 bb.event.fire(startevent, self.cfgData) 1296 bb.event.fire(startevent, self.cfgData)
1386 1297
1387 pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) 1298 self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1299 self.worker.stdin.flush()
1388 1300
1389 self.build_pids[pid] = task 1301 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1390 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1391 self.build_stamps[pid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1392 self.runq_running[task] = 1 1302 self.runq_running[task] = 1
1393 self.stats.taskActive() 1303 self.stats.taskActive()
1394 if self.stats.active < self.number_tasks: 1304 if self.stats.active < self.number_tasks:
1395 return True 1305 return True
1396 1306
1397 for pipe in self.build_pipes:
1398 self.build_pipes[pipe].read()
1399
1400 if self.stats.active > 0: 1307 if self.stats.active > 0:
1401 if self.runqueue_process_waitpid() is None: 1308 self.workerpipe.read()
1402 return 0.5 1309 return 0.5
1403 return True 1310
1311 self.teardown()
1404 1312
1405 if len(self.failed_fnids) != 0: 1313 if len(self.failed_fnids) != 0:
1406 self.rq.state = runQueueFailed 1314 self.rq.state = runQueueFailed
@@ -1415,6 +1323,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
1415 if self.runq_complete[task] == 0: 1323 if self.runq_complete[task] == 0:
1416 logger.error("Task %s never completed!", task) 1324 logger.error("Task %s never completed!", task)
1417 self.rq.state = runQueueComplete 1325 self.rq.state = runQueueComplete
1326
1418 return True 1327 return True
1419 1328
1420class RunQueueExecuteScenequeue(RunQueueExecute): 1329class RunQueueExecuteScenequeue(RunQueueExecute):
@@ -1428,6 +1337,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1428 # If we don't have any setscene functions, skip this step 1337 # If we don't have any setscene functions, skip this step
1429 if len(self.rqdata.runq_setscene) == 0: 1338 if len(self.rqdata.runq_setscene) == 0:
1430 rq.scenequeue_covered = set() 1339 rq.scenequeue_covered = set()
1340 self.teardown()
1431 rq.state = runQueueRunInit 1341 rq.state = runQueueRunInit
1432 return 1342 return
1433 1343
@@ -1676,6 +1586,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1676 Run the tasks in a queue prepared by prepare_runqueue 1586 Run the tasks in a queue prepared by prepare_runqueue
1677 """ 1587 """
1678 1588
1589 self.workerpipe.read()
1590
1679 task = None 1591 task = None
1680 if self.stats.active < self.number_tasks: 1592 if self.stats.active < self.number_tasks:
1681 # Find the next setscene to run 1593 # Find the next setscene to run
@@ -1716,22 +1628,17 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1716 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 1628 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1717 bb.event.fire(startevent, self.cfgData) 1629 bb.event.fire(startevent, self.cfgData)
1718 1630
1719 pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) 1631 self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1632 self.worker.stdin.flush()
1720 1633
1721 self.build_pids[pid] = task
1722 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
1723 self.runq_running[task] = 1 1634 self.runq_running[task] = 1
1724 self.stats.taskActive() 1635 self.stats.taskActive()
1725 if self.stats.active < self.number_tasks: 1636 if self.stats.active < self.number_tasks:
1726 return True 1637 return True
1727 1638
1728 for pipe in self.build_pipes:
1729 self.build_pipes[pipe].read()
1730
1731 if self.stats.active > 0: 1639 if self.stats.active > 0:
1732 if self.runqueue_process_waitpid() is None: 1640 self.workerpipe.read()
1733 return 0.5 1641 return 0.5
1734 return True
1735 1642
1736 # Convert scenequeue_covered task numbers into full taskgraph ids 1643 # Convert scenequeue_covered task numbers into full taskgraph ids
1737 oldcovered = self.scenequeue_covered 1644 oldcovered = self.scenequeue_covered
@@ -1745,10 +1652,13 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1745 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) 1652 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1746 1653
1747 self.rq.state = runQueueRunInit 1654 self.rq.state = runQueueRunInit
1655 self.teardown()
1748 return True 1656 return True
1749 1657
1750 def fork_off_task(self, fn, task, taskname): 1658 def runqueue_process_waitpid(self, task, status):
1751 return RunQueueExecute.fork_off_task(self, fn, task, taskname, quieterrors=True) 1659 task = self.rq.rqdata.runq_setscene.index(task)
1660
1661 RunQueueExecute.runqueue_process_waitpid(self, task, status)
1752 1662
1753class TaskFailure(Exception): 1663class TaskFailure(Exception):
1754 """ 1664 """
@@ -1828,25 +1738,43 @@ class runQueuePipe():
1828 """ 1738 """
1829 Abstraction for a pipe between a worker thread and the server 1739 Abstraction for a pipe between a worker thread and the server
1830 """ 1740 """
1831 def __init__(self, pipein, pipeout, d): 1741 def __init__(self, pipein, pipeout, d, rq):
1832 self.input = pipein 1742 self.input = pipein
1833 pipeout.close() 1743 if pipeout:
1744 pipeout.close()
1834 bb.utils.nonblockingfd(self.input) 1745 bb.utils.nonblockingfd(self.input)
1835 self.queue = "" 1746 self.queue = ""
1836 self.d = d 1747 self.d = d
1748 self.rq = rq
1749
1750 def setrunqueue(self, rq):
1751 self.rq = rq
1837 1752
1838 def read(self): 1753 def read(self):
1839 start = len(self.queue) 1754 start = len(self.queue)
1840 try: 1755 try:
1841 self.queue = self.queue + self.input.read(102400) 1756 self.queue = self.queue + self.input.read(102400)
1842 except (OSError, IOError): 1757 except (OSError, IOError) as e:
1843 pass 1758 if e.errno != errno.EAGAIN:
1759 raise
1844 end = len(self.queue) 1760 end = len(self.queue)
1845 index = self.queue.find("</event>") 1761 found = True
1846 while index != -1: 1762 while found and len(self.queue):
1847 bb.event.fire_from_worker(self.queue[:index+8], self.d) 1763 found = False
1848 self.queue = self.queue[index+8:]
1849 index = self.queue.find("</event>") 1764 index = self.queue.find("</event>")
1765 while index != -1 and self.queue.startswith("<event>"):
1766 event = pickle.loads(self.queue[7:index])
1767 bb.event.fire_from_worker(event, self.d)
1768 found = True
1769 self.queue = self.queue[index+8:]
1770 index = self.queue.find("</event>")
1771 index = self.queue.find("</exitcode>")
1772 while index != -1 and self.queue.startswith("<exitcode>"):
1773 task, status = pickle.loads(self.queue[10:index])
1774 self.rq.runqueue_process_waitpid(task, status)
1775 found = True
1776 self.queue = self.queue[index+11:]
1777 index = self.queue.find("</exitcode>")
1850 return (end > start) 1778 return (end > start)
1851 1779
1852 def close(self): 1780 def close(self):
diff --git a/bitbake/lib/bb/siggen.py b/bitbake/lib/bb/siggen.py
index 1ff2ecc482..fb8b678508 100644
--- a/bitbake/lib/bb/siggen.py
+++ b/bitbake/lib/bb/siggen.py
@@ -201,9 +201,10 @@ class SignatureGeneratorBasic(SignatureGenerator):
201 #d.setVar("BB_TASKHASH_task-%s" % task, taskhash[task]) 201 #d.setVar("BB_TASKHASH_task-%s" % task, taskhash[task])
202 return h 202 return h
203 203
204 def set_taskdata(self, hashes, deps): 204 def set_taskdata(self, hashes, deps, checksums):
205 self.runtaskdeps = deps 205 self.runtaskdeps = deps
206 self.taskhash = hashes 206 self.taskhash = hashes
207 self.file_checksum_values = checksums
207 208
208 def dump_sigtask(self, fn, task, stampbase, runtime): 209 def dump_sigtask(self, fn, task, stampbase, runtime):
209 k = fn + "." + task 210 k = fn + "." + task