summaryrefslogtreecommitdiffstats
path: root/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-xbitbake/bin/bitbake-worker149
1 files changed, 105 insertions, 44 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index 7765b9368b..e8073f2ac3 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -1,11 +1,14 @@
1#!/usr/bin/env python3 1#!/usr/bin/env python3
2# 2#
3# Copyright BitBake Contributors
4#
3# SPDX-License-Identifier: GPL-2.0-only 5# SPDX-License-Identifier: GPL-2.0-only
4# 6#
5 7
6import os 8import os
7import sys 9import sys
8import warnings 10import warnings
11warnings.simplefilter("default")
9sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
10from bb import fetch2 13from bb import fetch2
11import logging 14import logging
@@ -16,11 +19,12 @@ import signal
16import pickle 19import pickle
17import traceback 20import traceback
18import queue 21import queue
22import shlex
23import subprocess
19from multiprocessing import Lock 24from multiprocessing import Lock
20from threading import Thread 25from threading import Thread
21 26
22if sys.getfilesystemencoding() != "utf-8": 27bb.utils.check_system_locale()
23 sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
24 28
25# Users shouldn't be running this code directly 29# Users shouldn't be running this code directly
26if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 30if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
@@ -87,19 +91,19 @@ def worker_fire_prepickled(event):
87worker_thread_exit = False 91worker_thread_exit = False
88 92
89def worker_flush(worker_queue): 93def worker_flush(worker_queue):
90 worker_queue_int = b"" 94 worker_queue_int = bytearray()
91 global worker_pipe, worker_thread_exit 95 global worker_pipe, worker_thread_exit
92 96
93 while True: 97 while True:
94 try: 98 try:
95 worker_queue_int = worker_queue_int + worker_queue.get(True, 1) 99 worker_queue_int.extend(worker_queue.get(True, 1))
96 except queue.Empty: 100 except queue.Empty:
97 pass 101 pass
98 while (worker_queue_int or not worker_queue.empty()): 102 while (worker_queue_int or not worker_queue.empty()):
99 try: 103 try:
100 (_, ready, _) = select.select([], [worker_pipe], [], 1) 104 (_, ready, _) = select.select([], [worker_pipe], [], 1)
101 if not worker_queue.empty(): 105 if not worker_queue.empty():
102 worker_queue_int = worker_queue_int + worker_queue.get() 106 worker_queue_int.extend(worker_queue.get())
103 written = os.write(worker_pipe, worker_queue_int) 107 written = os.write(worker_pipe, worker_queue_int)
104 worker_queue_int = worker_queue_int[written:] 108 worker_queue_int = worker_queue_int[written:]
105 except (IOError, OSError) as e: 109 except (IOError, OSError) as e:
@@ -117,11 +121,10 @@ def worker_child_fire(event, d):
117 121
118 data = b"<event>" + pickle.dumps(event) + b"</event>" 122 data = b"<event>" + pickle.dumps(event) + b"</event>"
119 try: 123 try:
120 worker_pipe_lock.acquire() 124 with bb.utils.lock_timeout(worker_pipe_lock):
121 while(len(data)): 125 while(len(data)):
122 written = worker_pipe.write(data) 126 written = worker_pipe.write(data)
123 data = data[written:] 127 data = data[written:]
124 worker_pipe_lock.release()
125 except IOError: 128 except IOError:
126 sigterm_handler(None, None) 129 sigterm_handler(None, None)
127 raise 130 raise
@@ -140,15 +143,29 @@ def sigterm_handler(signum, frame):
140 os.killpg(0, signal.SIGTERM) 143 os.killpg(0, signal.SIGTERM)
141 sys.exit() 144 sys.exit()
142 145
143def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): 146def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
147
148 fn = runtask['fn']
149 task = runtask['task']
150 taskname = runtask['taskname']
151 taskhash = runtask['taskhash']
152 unihash = runtask['unihash']
153 appends = runtask['appends']
154 layername = runtask['layername']
155 taskdepdata = runtask['taskdepdata']
156 quieterrors = runtask['quieterrors']
144 # We need to setup the environment BEFORE the fork, since 157 # We need to setup the environment BEFORE the fork, since
145 # a fork() or exec*() activates PSEUDO... 158 # a fork() or exec*() activates PSEUDO...
146 159
147 envbackup = {} 160 envbackup = {}
161 fakeroot = False
148 fakeenv = {} 162 fakeenv = {}
149 umask = None 163 umask = None
150 164
151 taskdep = workerdata["taskdeps"][fn] 165 uid = os.getuid()
166 gid = os.getgid()
167
168 taskdep = runtask['taskdep']
152 if 'umask' in taskdep and taskname in taskdep['umask']: 169 if 'umask' in taskdep and taskname in taskdep['umask']:
153 umask = taskdep['umask'][taskname] 170 umask = taskdep['umask'][taskname]
154 elif workerdata["umask"]: 171 elif workerdata["umask"]:
@@ -160,24 +177,25 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
160 except TypeError: 177 except TypeError:
161 pass 178 pass
162 179
163 dry_run = cfg.dry_run or dry_run_exec 180 dry_run = cfg.dry_run or runtask['dry_run']
164 181
165 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 182 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
166 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 183 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
167 envvars = (workerdata["fakerootenv"][fn] or "").split() 184 fakeroot = True
168 for key, value in (var.split('=') for var in envvars): 185 envvars = (runtask['fakerootenv'] or "").split()
186 for key, value in (var.split('=',1) for var in envvars):
169 envbackup[key] = os.environ.get(key) 187 envbackup[key] = os.environ.get(key)
170 os.environ[key] = value 188 os.environ[key] = value
171 fakeenv[key] = value 189 fakeenv[key] = value
172 190
173 fakedirs = (workerdata["fakerootdirs"][fn] or "").split() 191 fakedirs = (runtask['fakerootdirs'] or "").split()
174 for p in fakedirs: 192 for p in fakedirs:
175 bb.utils.mkdirhier(p) 193 bb.utils.mkdirhier(p)
176 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 194 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
177 (fn, taskname, ', '.join(fakedirs))) 195 (fn, taskname, ', '.join(fakedirs)))
178 else: 196 else:
179 envvars = (workerdata["fakerootnoenv"][fn] or "").split() 197 envvars = (runtask['fakerootnoenv'] or "").split()
180 for key, value in (var.split('=') for var in envvars): 198 for key, value in (var.split('=',1) for var in envvars):
181 envbackup[key] = os.environ.get(key) 199 envbackup[key] = os.environ.get(key)
182 os.environ[key] = value 200 os.environ[key] = value
183 fakeenv[key] = value 201 fakeenv[key] = value
@@ -219,19 +237,21 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
219 # Let SIGHUP exit as SIGTERM 237 # Let SIGHUP exit as SIGTERM
220 signal.signal(signal.SIGHUP, sigterm_handler) 238 signal.signal(signal.SIGHUP, sigterm_handler)
221 239
222 # No stdin 240 # No stdin & stdout
223 newsi = os.open(os.devnull, os.O_RDWR) 241 # stdout is used as a status report channel and must not be used by child processes.
224 os.dup2(newsi, sys.stdin.fileno()) 242 dumbio = os.open(os.devnull, os.O_RDWR)
243 os.dup2(dumbio, sys.stdin.fileno())
244 os.dup2(dumbio, sys.stdout.fileno())
225 245
226 if umask: 246 if umask is not None:
227 os.umask(umask) 247 os.umask(umask)
228 248
229 try: 249 try:
230 bb_cache = bb.cache.NoCache(databuilder)
231 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 250 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
232 the_data = databuilder.mcdata[mc] 251 the_data = databuilder.mcdata[mc]
233 the_data.setVar("BB_WORKERCONTEXT", "1") 252 the_data.setVar("BB_WORKERCONTEXT", "1")
234 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 253 the_data.setVar("BB_TASKDEPDATA", taskdepdata)
254 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
235 if cfg.limited_deps: 255 if cfg.limited_deps:
236 the_data.setVar("BB_LIMITEDDEPS", "1") 256 the_data.setVar("BB_LIMITEDDEPS", "1")
237 the_data.setVar("BUILDNAME", workerdata["buildname"]) 257 the_data.setVar("BUILDNAME", workerdata["buildname"])
@@ -245,12 +265,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
245 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 265 bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
246 ret = 0 266 ret = 0
247 267
248 the_data = bb_cache.loadDataFull(fn, appends) 268 the_data = databuilder.parseRecipe(fn, appends, layername)
249 the_data.setVar('BB_TASKHASH', taskhash) 269 the_data.setVar('BB_TASKHASH', taskhash)
250 the_data.setVar('BB_UNIHASH', unihash) 270 the_data.setVar('BB_UNIHASH', unihash)
271 bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
251 272
252 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 273 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
253 274
275 if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
276 if bb.utils.is_local_uid(uid):
277 logger.debug("Attempting to disable network for %s" % taskname)
278 bb.utils.disable_network(uid, gid)
279 else:
280 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
281
254 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 282 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
255 # successfully. We also need to unset anything from the environment which shouldn't be there 283 # successfully. We also need to unset anything from the environment which shouldn't be there
256 exports = bb.data.exported_vars(the_data) 284 exports = bb.data.exported_vars(the_data)
@@ -279,10 +307,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
279 if not quieterrors: 307 if not quieterrors:
280 logger.critical(traceback.format_exc()) 308 logger.critical(traceback.format_exc())
281 os._exit(1) 309 os._exit(1)
310
311 sys.stdout.flush()
312 sys.stderr.flush()
313
282 try: 314 try:
283 if dry_run: 315 if dry_run:
284 return 0 316 return 0
285 return bb.build.exec_task(fn, taskname, the_data, cfg.profile) 317 try:
318 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
319 finally:
320 if fakeroot:
321 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
322 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
323 return ret
286 except: 324 except:
287 os._exit(1) 325 os._exit(1)
288 if not profiling: 326 if not profiling:
@@ -314,12 +352,12 @@ class runQueueWorkerPipe():
314 if pipeout: 352 if pipeout:
315 pipeout.close() 353 pipeout.close()
316 bb.utils.nonblockingfd(self.input) 354 bb.utils.nonblockingfd(self.input)
317 self.queue = b"" 355 self.queue = bytearray()
318 356
319 def read(self): 357 def read(self):
320 start = len(self.queue) 358 start = len(self.queue)
321 try: 359 try:
322 self.queue = self.queue + (self.input.read(102400) or b"") 360 self.queue.extend(self.input.read(102400) or b"")
323 except (OSError, IOError) as e: 361 except (OSError, IOError) as e:
324 if e.errno != errno.EAGAIN: 362 if e.errno != errno.EAGAIN:
325 raise 363 raise
@@ -347,7 +385,7 @@ class BitbakeWorker(object):
347 def __init__(self, din): 385 def __init__(self, din):
348 self.input = din 386 self.input = din
349 bb.utils.nonblockingfd(self.input) 387 bb.utils.nonblockingfd(self.input)
350 self.queue = b"" 388 self.queue = bytearray()
351 self.cookercfg = None 389 self.cookercfg = None
352 self.databuilder = None 390 self.databuilder = None
353 self.data = None 391 self.data = None
@@ -381,7 +419,7 @@ class BitbakeWorker(object):
381 if len(r) == 0: 419 if len(r) == 0:
382 # EOF on pipe, server must have terminated 420 # EOF on pipe, server must have terminated
383 self.sigterm_exception(signal.SIGTERM, None) 421 self.sigterm_exception(signal.SIGTERM, None)
384 self.queue = self.queue + r 422 self.queue.extend(r)
385 except (OSError, IOError): 423 except (OSError, IOError):
386 pass 424 pass
387 if len(self.queue): 425 if len(self.queue):
@@ -401,19 +439,35 @@ class BitbakeWorker(object):
401 while self.process_waitpid(): 439 while self.process_waitpid():
402 continue 440 continue
403 441
404
405 def handle_item(self, item, func): 442 def handle_item(self, item, func):
406 if self.queue.startswith(b"<" + item + b">"): 443 opening_tag = b"<" + item + b">"
407 index = self.queue.find(b"</" + item + b">") 444 if not self.queue.startswith(opening_tag):
408 while index != -1: 445 return
409 func(self.queue[(len(item) + 2):index]) 446
410 self.queue = self.queue[(index + len(item) + 3):] 447 tag_len = len(opening_tag)
411 index = self.queue.find(b"</" + item + b">") 448 if len(self.queue) < tag_len + 4:
449 # we need to receive more data
450 return
451 header = self.queue[tag_len:tag_len + 4]
452 payload_len = int.from_bytes(header, 'big')
453 # closing tag has length (tag_len + 1)
454 if len(self.queue) < tag_len * 2 + 1 + payload_len:
455 # we need to receive more data
456 return
457
458 index = self.queue.find(b"</" + item + b">")
459 if index != -1:
460 try:
461 func(self.queue[(tag_len + 4):index])
462 except pickle.UnpicklingError:
463 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
464 raise
465 self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
412 466
413 def handle_cookercfg(self, data): 467 def handle_cookercfg(self, data):
414 self.cookercfg = pickle.loads(data) 468 self.cookercfg = pickle.loads(data)
415 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 469 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
416 self.databuilder.parseBaseConfiguration() 470 self.databuilder.parseBaseConfiguration(worker=True)
417 self.data = self.databuilder.data 471 self.data = self.databuilder.data
418 472
419 def handle_extraconfigdata(self, data): 473 def handle_extraconfigdata(self, data):
@@ -428,6 +482,7 @@ class BitbakeWorker(object):
428 for mc in self.databuilder.mcdata: 482 for mc in self.databuilder.mcdata:
429 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 483 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
430 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 484 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
485 self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
431 486
432 def handle_newtaskhashes(self, data): 487 def handle_newtaskhashes(self, data):
433 self.workerdata["newhashes"] = pickle.loads(data) 488 self.workerdata["newhashes"] = pickle.loads(data)
@@ -445,11 +500,15 @@ class BitbakeWorker(object):
445 sys.exit(0) 500 sys.exit(0)
446 501
447 def handle_runtask(self, data): 502 def handle_runtask(self, data):
448 fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) 503 runtask = pickle.loads(data)
449 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
450 504
451 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) 505 fn = runtask['fn']
506 task = runtask['task']
507 taskname = runtask['taskname']
452 508
509 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
510
511 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
453 self.build_pids[pid] = task 512 self.build_pids[pid] = task
454 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 513 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
455 514
@@ -513,9 +572,11 @@ except BaseException as e:
513 import traceback 572 import traceback
514 sys.stderr.write(traceback.format_exc()) 573 sys.stderr.write(traceback.format_exc())
515 sys.stderr.write(str(e)) 574 sys.stderr.write(str(e))
575finally:
576 worker_thread_exit = True
577 worker_thread.join()
516 578
517worker_thread_exit = True 579workerlog_write("exiting")
518worker_thread.join() 580if not normalexit:
519 581 sys.exit(1)
520workerlog_write("exitting")
521sys.exit(0) 582sys.exit(0)