summaryrefslogtreecommitdiffstats
path: root/bitbake/bin/bitbake-worker
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-xbitbake/bin/bitbake-worker171
1 files changed, 120 insertions, 51 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index 7765b9368b..d2b146a6a9 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -1,11 +1,15 @@
1#!/usr/bin/env python3 1#!/usr/bin/env python3
2# 2#
3# Copyright BitBake Contributors
4#
3# SPDX-License-Identifier: GPL-2.0-only 5# SPDX-License-Identifier: GPL-2.0-only
4# 6#
5 7
6import os 8import os
7import sys 9import sys
8import warnings 10import warnings
11warnings.simplefilter("default")
12warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.*may.lead.to.deadlocks.in.the.child.*")
9sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 13sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
10from bb import fetch2 14from bb import fetch2
11import logging 15import logging
@@ -16,11 +20,17 @@ import signal
16import pickle 20import pickle
17import traceback 21import traceback
18import queue 22import queue
23import shlex
24import subprocess
25import fcntl
19from multiprocessing import Lock 26from multiprocessing import Lock
20from threading import Thread 27from threading import Thread
21 28
22if sys.getfilesystemencoding() != "utf-8": 29# Remove when we have a minimum of python 3.10
23 sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") 30if not hasattr(fcntl, 'F_SETPIPE_SZ'):
31 fcntl.F_SETPIPE_SZ = 1031
32
33bb.utils.check_system_locale()
24 34
25# Users shouldn't be running this code directly 35# Users shouldn't be running this code directly
26if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 36if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
@@ -40,7 +50,6 @@ if sys.argv[1].startswith("decafbadbad"):
40# updates to log files for use with tail 50# updates to log files for use with tail
41try: 51try:
42 if sys.stdout.name == '<stdout>': 52 if sys.stdout.name == '<stdout>':
43 import fcntl
44 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 53 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
45 fl |= os.O_SYNC 54 fl |= os.O_SYNC
46 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 55 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
@@ -52,6 +61,12 @@ logger = logging.getLogger("BitBake")
52 61
53worker_pipe = sys.stdout.fileno() 62worker_pipe = sys.stdout.fileno()
54bb.utils.nonblockingfd(worker_pipe) 63bb.utils.nonblockingfd(worker_pipe)
64# Try to make the pipe buffers larger as it is much more efficient. If we can't
65# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over.
66try:
67 fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024)
68except:
69 pass
55# Need to guard against multiprocessing being used in child processes 70# Need to guard against multiprocessing being used in child processes
56# and multiple processes trying to write to the parent at the same time 71# and multiple processes trying to write to the parent at the same time
57worker_pipe_lock = None 72worker_pipe_lock = None
@@ -87,21 +102,21 @@ def worker_fire_prepickled(event):
87worker_thread_exit = False 102worker_thread_exit = False
88 103
89def worker_flush(worker_queue): 104def worker_flush(worker_queue):
90 worker_queue_int = b"" 105 worker_queue_int = bytearray()
91 global worker_pipe, worker_thread_exit 106 global worker_pipe, worker_thread_exit
92 107
93 while True: 108 while True:
94 try: 109 try:
95 worker_queue_int = worker_queue_int + worker_queue.get(True, 1) 110 worker_queue_int.extend(worker_queue.get(True, 1))
96 except queue.Empty: 111 except queue.Empty:
97 pass 112 pass
98 while (worker_queue_int or not worker_queue.empty()): 113 while (worker_queue_int or not worker_queue.empty()):
99 try: 114 try:
100 (_, ready, _) = select.select([], [worker_pipe], [], 1) 115 (_, ready, _) = select.select([], [worker_pipe], [], 1)
101 if not worker_queue.empty(): 116 if not worker_queue.empty():
102 worker_queue_int = worker_queue_int + worker_queue.get() 117 worker_queue_int.extend(worker_queue.get())
103 written = os.write(worker_pipe, worker_queue_int) 118 written = os.write(worker_pipe, worker_queue_int)
104 worker_queue_int = worker_queue_int[written:] 119 del worker_queue_int[0:written]
105 except (IOError, OSError) as e: 120 except (IOError, OSError) as e:
106 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 121 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
107 raise 122 raise
@@ -117,11 +132,10 @@ def worker_child_fire(event, d):
117 132
118 data = b"<event>" + pickle.dumps(event) + b"</event>" 133 data = b"<event>" + pickle.dumps(event) + b"</event>"
119 try: 134 try:
120 worker_pipe_lock.acquire() 135 with bb.utils.lock_timeout(worker_pipe_lock):
121 while(len(data)): 136 while(len(data)):
122 written = worker_pipe.write(data) 137 written = worker_pipe.write(data)
123 data = data[written:] 138 data = data[written:]
124 worker_pipe_lock.release()
125 except IOError: 139 except IOError:
126 sigterm_handler(None, None) 140 sigterm_handler(None, None)
127 raise 141 raise
@@ -140,44 +154,56 @@ def sigterm_handler(signum, frame):
140 os.killpg(0, signal.SIGTERM) 154 os.killpg(0, signal.SIGTERM)
141 sys.exit() 155 sys.exit()
142 156
143def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): 157def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
158
159 fn = runtask['fn']
160 task = runtask['task']
161 taskname = runtask['taskname']
162 taskhash = runtask['taskhash']
163 unihash = runtask['unihash']
164 appends = runtask['appends']
165 layername = runtask['layername']
166 taskdepdata = runtask['taskdepdata']
167 quieterrors = runtask['quieterrors']
144 # We need to setup the environment BEFORE the fork, since 168 # We need to setup the environment BEFORE the fork, since
145 # a fork() or exec*() activates PSEUDO... 169 # a fork() or exec*() activates PSEUDO...
146 170
147 envbackup = {} 171 envbackup = {}
172 fakeroot = False
148 fakeenv = {} 173 fakeenv = {}
149 umask = None 174 umask = None
150 175
151 taskdep = workerdata["taskdeps"][fn] 176 uid = os.getuid()
177 gid = os.getgid()
178
179 taskdep = runtask['taskdep']
152 if 'umask' in taskdep and taskname in taskdep['umask']: 180 if 'umask' in taskdep and taskname in taskdep['umask']:
153 umask = taskdep['umask'][taskname] 181 umask = taskdep['umask'][taskname]
154 elif workerdata["umask"]: 182 elif workerdata["umask"]:
155 umask = workerdata["umask"] 183 umask = workerdata["umask"]
156 if umask: 184 if umask:
157 # umask might come in as a number or text string.. 185 # Convert to a python numeric value as it could be a string
158 try: 186 umask = bb.utils.to_filemode(umask)
159 umask = int(umask, 8)
160 except TypeError:
161 pass
162 187
163 dry_run = cfg.dry_run or dry_run_exec 188 dry_run = cfg.dry_run or runtask['dry_run']
164 189
165 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 190 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
166 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 191 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
167 envvars = (workerdata["fakerootenv"][fn] or "").split() 192 fakeroot = True
168 for key, value in (var.split('=') for var in envvars): 193 envvars = (runtask['fakerootenv'] or "").split()
194 for key, value in (var.split('=',1) for var in envvars):
169 envbackup[key] = os.environ.get(key) 195 envbackup[key] = os.environ.get(key)
170 os.environ[key] = value 196 os.environ[key] = value
171 fakeenv[key] = value 197 fakeenv[key] = value
172 198
173 fakedirs = (workerdata["fakerootdirs"][fn] or "").split() 199 fakedirs = (runtask['fakerootdirs'] or "").split()
174 for p in fakedirs: 200 for p in fakedirs:
175 bb.utils.mkdirhier(p) 201 bb.utils.mkdirhier(p)
176 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 202 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
177 (fn, taskname, ', '.join(fakedirs))) 203 (fn, taskname, ', '.join(fakedirs)))
178 else: 204 else:
179 envvars = (workerdata["fakerootnoenv"][fn] or "").split() 205 envvars = (runtask['fakerootnoenv'] or "").split()
180 for key, value in (var.split('=') for var in envvars): 206 for key, value in (var.split('=',1) for var in envvars):
181 envbackup[key] = os.environ.get(key) 207 envbackup[key] = os.environ.get(key)
182 os.environ[key] = value 208 os.environ[key] = value
183 fakeenv[key] = value 209 fakeenv[key] = value
@@ -219,19 +245,21 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
219 # Let SIGHUP exit as SIGTERM 245 # Let SIGHUP exit as SIGTERM
220 signal.signal(signal.SIGHUP, sigterm_handler) 246 signal.signal(signal.SIGHUP, sigterm_handler)
221 247
222 # No stdin 248 # No stdin & stdout
223 newsi = os.open(os.devnull, os.O_RDWR) 249 # stdout is used as a status report channel and must not be used by child processes.
224 os.dup2(newsi, sys.stdin.fileno()) 250 dumbio = os.open(os.devnull, os.O_RDWR)
251 os.dup2(dumbio, sys.stdin.fileno())
252 os.dup2(dumbio, sys.stdout.fileno())
225 253
226 if umask: 254 if umask is not None:
227 os.umask(umask) 255 os.umask(umask)
228 256
229 try: 257 try:
230 bb_cache = bb.cache.NoCache(databuilder)
231 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 258 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
232 the_data = databuilder.mcdata[mc] 259 the_data = databuilder.mcdata[mc]
233 the_data.setVar("BB_WORKERCONTEXT", "1") 260 the_data.setVar("BB_WORKERCONTEXT", "1")
234 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 261 the_data.setVar("BB_TASKDEPDATA", taskdepdata)
262 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
235 if cfg.limited_deps: 263 if cfg.limited_deps:
236 the_data.setVar("BB_LIMITEDDEPS", "1") 264 the_data.setVar("BB_LIMITEDDEPS", "1")
237 the_data.setVar("BUILDNAME", workerdata["buildname"]) 265 the_data.setVar("BUILDNAME", workerdata["buildname"])
@@ -245,12 +273,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
245 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 273 bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
246 ret = 0 274 ret = 0
247 275
248 the_data = bb_cache.loadDataFull(fn, appends) 276 the_data = databuilder.parseRecipe(fn, appends, layername)
249 the_data.setVar('BB_TASKHASH', taskhash) 277 the_data.setVar('BB_TASKHASH', taskhash)
250 the_data.setVar('BB_UNIHASH', unihash) 278 the_data.setVar('BB_UNIHASH', unihash)
279 bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
251 280
252 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 281 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
253 282
283 if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
284 if bb.utils.is_local_uid(uid):
285 logger.debug("Attempting to disable network for %s" % taskname)
286 bb.utils.disable_network(uid, gid)
287 else:
288 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
289
254 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 290 # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
255 # successfully. We also need to unset anything from the environment which shouldn't be there 291 # successfully. We also need to unset anything from the environment which shouldn't be there
256 exports = bb.data.exported_vars(the_data) 292 exports = bb.data.exported_vars(the_data)
@@ -279,10 +315,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
279 if not quieterrors: 315 if not quieterrors:
280 logger.critical(traceback.format_exc()) 316 logger.critical(traceback.format_exc())
281 os._exit(1) 317 os._exit(1)
318
319 sys.stdout.flush()
320 sys.stderr.flush()
321
282 try: 322 try:
283 if dry_run: 323 if dry_run:
284 return 0 324 return 0
285 return bb.build.exec_task(fn, taskname, the_data, cfg.profile) 325 try:
326 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
327 finally:
328 if fakeroot:
329 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
330 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
331 return ret
286 except: 332 except:
287 os._exit(1) 333 os._exit(1)
288 if not profiling: 334 if not profiling:
@@ -314,12 +360,12 @@ class runQueueWorkerPipe():
314 if pipeout: 360 if pipeout:
315 pipeout.close() 361 pipeout.close()
316 bb.utils.nonblockingfd(self.input) 362 bb.utils.nonblockingfd(self.input)
317 self.queue = b"" 363 self.queue = bytearray()
318 364
319 def read(self): 365 def read(self):
320 start = len(self.queue) 366 start = len(self.queue)
321 try: 367 try:
322 self.queue = self.queue + (self.input.read(102400) or b"") 368 self.queue.extend(self.input.read(512*1024) or b"")
323 except (OSError, IOError) as e: 369 except (OSError, IOError) as e:
324 if e.errno != errno.EAGAIN: 370 if e.errno != errno.EAGAIN:
325 raise 371 raise
@@ -347,7 +393,7 @@ class BitbakeWorker(object):
347 def __init__(self, din): 393 def __init__(self, din):
348 self.input = din 394 self.input = din
349 bb.utils.nonblockingfd(self.input) 395 bb.utils.nonblockingfd(self.input)
350 self.queue = b"" 396 self.queue = bytearray()
351 self.cookercfg = None 397 self.cookercfg = None
352 self.databuilder = None 398 self.databuilder = None
353 self.data = None 399 self.data = None
@@ -381,7 +427,7 @@ class BitbakeWorker(object):
381 if len(r) == 0: 427 if len(r) == 0:
382 # EOF on pipe, server must have terminated 428 # EOF on pipe, server must have terminated
383 self.sigterm_exception(signal.SIGTERM, None) 429 self.sigterm_exception(signal.SIGTERM, None)
384 self.queue = self.queue + r 430 self.queue.extend(r)
385 except (OSError, IOError): 431 except (OSError, IOError):
386 pass 432 pass
387 if len(self.queue): 433 if len(self.queue):
@@ -401,19 +447,35 @@ class BitbakeWorker(object):
401 while self.process_waitpid(): 447 while self.process_waitpid():
402 continue 448 continue
403 449
404
405 def handle_item(self, item, func): 450 def handle_item(self, item, func):
406 if self.queue.startswith(b"<" + item + b">"): 451 opening_tag = b"<" + item + b">"
407 index = self.queue.find(b"</" + item + b">") 452 if not self.queue.startswith(opening_tag):
408 while index != -1: 453 return
409 func(self.queue[(len(item) + 2):index]) 454
410 self.queue = self.queue[(index + len(item) + 3):] 455 tag_len = len(opening_tag)
411 index = self.queue.find(b"</" + item + b">") 456 if len(self.queue) < tag_len + 4:
457 # we need to receive more data
458 return
459 header = self.queue[tag_len:tag_len + 4]
460 payload_len = int.from_bytes(header, 'big')
461 # closing tag has length (tag_len + 1)
462 if len(self.queue) < tag_len * 2 + 1 + payload_len:
463 # we need to receive more data
464 return
465
466 index = self.queue.find(b"</" + item + b">")
467 if index != -1:
468 try:
469 func(self.queue[(tag_len + 4):index])
470 except pickle.UnpicklingError:
471 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
472 raise
473 self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
412 474
413 def handle_cookercfg(self, data): 475 def handle_cookercfg(self, data):
414 self.cookercfg = pickle.loads(data) 476 self.cookercfg = pickle.loads(data)
415 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 477 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
416 self.databuilder.parseBaseConfiguration() 478 self.databuilder.parseBaseConfiguration(worker=True)
417 self.data = self.databuilder.data 479 self.data = self.databuilder.data
418 480
419 def handle_extraconfigdata(self, data): 481 def handle_extraconfigdata(self, data):
@@ -428,6 +490,7 @@ class BitbakeWorker(object):
428 for mc in self.databuilder.mcdata: 490 for mc in self.databuilder.mcdata:
429 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 491 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
430 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 492 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
493 self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
431 494
432 def handle_newtaskhashes(self, data): 495 def handle_newtaskhashes(self, data):
433 self.workerdata["newhashes"] = pickle.loads(data) 496 self.workerdata["newhashes"] = pickle.loads(data)
@@ -445,11 +508,15 @@ class BitbakeWorker(object):
445 sys.exit(0) 508 sys.exit(0)
446 509
447 def handle_runtask(self, data): 510 def handle_runtask(self, data):
448 fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) 511 runtask = pickle.loads(data)
449 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 512
513 fn = runtask['fn']
514 task = runtask['task']
515 taskname = runtask['taskname']
450 516
451 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) 517 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
452 518
519 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
453 self.build_pids[pid] = task 520 self.build_pids[pid] = task
454 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 521 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
455 522
@@ -513,9 +580,11 @@ except BaseException as e:
513 import traceback 580 import traceback
514 sys.stderr.write(traceback.format_exc()) 581 sys.stderr.write(traceback.format_exc())
515 sys.stderr.write(str(e)) 582 sys.stderr.write(str(e))
583finally:
584 worker_thread_exit = True
585 worker_thread.join()
516 586
517worker_thread_exit = True 587workerlog_write("exiting")
518worker_thread.join() 588if not normalexit:
519 589 sys.exit(1)
520workerlog_write("exitting")
521sys.exit(0) 590sys.exit(0)