diff options
Diffstat (limited to 'bitbake/bin/bitbake-worker')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 149 |
1 files changed, 105 insertions, 44 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index 7765b9368b..e8073f2ac3 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
@@ -1,11 +1,14 @@ | |||
1 | #!/usr/bin/env python3 | 1 | #!/usr/bin/env python3 |
2 | # | 2 | # |
3 | # Copyright BitBake Contributors | ||
4 | # | ||
3 | # SPDX-License-Identifier: GPL-2.0-only | 5 | # SPDX-License-Identifier: GPL-2.0-only |
4 | # | 6 | # |
5 | 7 | ||
6 | import os | 8 | import os |
7 | import sys | 9 | import sys |
8 | import warnings | 10 | import warnings |
11 | warnings.simplefilter("default") | ||
9 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) | 12 | sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) |
10 | from bb import fetch2 | 13 | from bb import fetch2 |
11 | import logging | 14 | import logging |
@@ -16,11 +19,12 @@ import signal | |||
16 | import pickle | 19 | import pickle |
17 | import traceback | 20 | import traceback |
18 | import queue | 21 | import queue |
22 | import shlex | ||
23 | import subprocess | ||
19 | from multiprocessing import Lock | 24 | from multiprocessing import Lock |
20 | from threading import Thread | 25 | from threading import Thread |
21 | 26 | ||
22 | if sys.getfilesystemencoding() != "utf-8": | 27 | bb.utils.check_system_locale() |
23 | sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") | ||
24 | 28 | ||
25 | # Users shouldn't be running this code directly | 29 | # Users shouldn't be running this code directly |
26 | if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): | 30 | if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): |
@@ -87,19 +91,19 @@ def worker_fire_prepickled(event): | |||
87 | worker_thread_exit = False | 91 | worker_thread_exit = False |
88 | 92 | ||
89 | def worker_flush(worker_queue): | 93 | def worker_flush(worker_queue): |
90 | worker_queue_int = b"" | 94 | worker_queue_int = bytearray() |
91 | global worker_pipe, worker_thread_exit | 95 | global worker_pipe, worker_thread_exit |
92 | 96 | ||
93 | while True: | 97 | while True: |
94 | try: | 98 | try: |
95 | worker_queue_int = worker_queue_int + worker_queue.get(True, 1) | 99 | worker_queue_int.extend(worker_queue.get(True, 1)) |
96 | except queue.Empty: | 100 | except queue.Empty: |
97 | pass | 101 | pass |
98 | while (worker_queue_int or not worker_queue.empty()): | 102 | while (worker_queue_int or not worker_queue.empty()): |
99 | try: | 103 | try: |
100 | (_, ready, _) = select.select([], [worker_pipe], [], 1) | 104 | (_, ready, _) = select.select([], [worker_pipe], [], 1) |
101 | if not worker_queue.empty(): | 105 | if not worker_queue.empty(): |
102 | worker_queue_int = worker_queue_int + worker_queue.get() | 106 | worker_queue_int.extend(worker_queue.get()) |
103 | written = os.write(worker_pipe, worker_queue_int) | 107 | written = os.write(worker_pipe, worker_queue_int) |
104 | worker_queue_int = worker_queue_int[written:] | 108 | worker_queue_int = worker_queue_int[written:] |
105 | except (IOError, OSError) as e: | 109 | except (IOError, OSError) as e: |
@@ -117,11 +121,10 @@ def worker_child_fire(event, d): | |||
117 | 121 | ||
118 | data = b"<event>" + pickle.dumps(event) + b"</event>" | 122 | data = b"<event>" + pickle.dumps(event) + b"</event>" |
119 | try: | 123 | try: |
120 | worker_pipe_lock.acquire() | 124 | with bb.utils.lock_timeout(worker_pipe_lock): |
121 | while(len(data)): | 125 | while(len(data)): |
122 | written = worker_pipe.write(data) | 126 | written = worker_pipe.write(data) |
123 | data = data[written:] | 127 | data = data[written:] |
124 | worker_pipe_lock.release() | ||
125 | except IOError: | 128 | except IOError: |
126 | sigterm_handler(None, None) | 129 | sigterm_handler(None, None) |
127 | raise | 130 | raise |
@@ -140,15 +143,29 @@ def sigterm_handler(signum, frame): | |||
140 | os.killpg(0, signal.SIGTERM) | 143 | os.killpg(0, signal.SIGTERM) |
141 | sys.exit() | 144 | sys.exit() |
142 | 145 | ||
143 | def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): | 146 | def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): |
147 | |||
148 | fn = runtask['fn'] | ||
149 | task = runtask['task'] | ||
150 | taskname = runtask['taskname'] | ||
151 | taskhash = runtask['taskhash'] | ||
152 | unihash = runtask['unihash'] | ||
153 | appends = runtask['appends'] | ||
154 | layername = runtask['layername'] | ||
155 | taskdepdata = runtask['taskdepdata'] | ||
156 | quieterrors = runtask['quieterrors'] | ||
144 | # We need to setup the environment BEFORE the fork, since | 157 | # We need to setup the environment BEFORE the fork, since |
145 | # a fork() or exec*() activates PSEUDO... | 158 | # a fork() or exec*() activates PSEUDO... |
146 | 159 | ||
147 | envbackup = {} | 160 | envbackup = {} |
161 | fakeroot = False | ||
148 | fakeenv = {} | 162 | fakeenv = {} |
149 | umask = None | 163 | umask = None |
150 | 164 | ||
151 | taskdep = workerdata["taskdeps"][fn] | 165 | uid = os.getuid() |
166 | gid = os.getgid() | ||
167 | |||
168 | taskdep = runtask['taskdep'] | ||
152 | if 'umask' in taskdep and taskname in taskdep['umask']: | 169 | if 'umask' in taskdep and taskname in taskdep['umask']: |
153 | umask = taskdep['umask'][taskname] | 170 | umask = taskdep['umask'][taskname] |
154 | elif workerdata["umask"]: | 171 | elif workerdata["umask"]: |
@@ -160,24 +177,25 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
160 | except TypeError: | 177 | except TypeError: |
161 | pass | 178 | pass |
162 | 179 | ||
163 | dry_run = cfg.dry_run or dry_run_exec | 180 | dry_run = cfg.dry_run or runtask['dry_run'] |
164 | 181 | ||
165 | # We can't use the fakeroot environment in a dry run as it possibly hasn't been built | 182 | # We can't use the fakeroot environment in a dry run as it possibly hasn't been built |
166 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: | 183 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: |
167 | envvars = (workerdata["fakerootenv"][fn] or "").split() | 184 | fakeroot = True |
168 | for key, value in (var.split('=') for var in envvars): | 185 | envvars = (runtask['fakerootenv'] or "").split() |
186 | for key, value in (var.split('=',1) for var in envvars): | ||
169 | envbackup[key] = os.environ.get(key) | 187 | envbackup[key] = os.environ.get(key) |
170 | os.environ[key] = value | 188 | os.environ[key] = value |
171 | fakeenv[key] = value | 189 | fakeenv[key] = value |
172 | 190 | ||
173 | fakedirs = (workerdata["fakerootdirs"][fn] or "").split() | 191 | fakedirs = (runtask['fakerootdirs'] or "").split() |
174 | for p in fakedirs: | 192 | for p in fakedirs: |
175 | bb.utils.mkdirhier(p) | 193 | bb.utils.mkdirhier(p) |
176 | logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % | 194 | logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % |
177 | (fn, taskname, ', '.join(fakedirs))) | 195 | (fn, taskname, ', '.join(fakedirs))) |
178 | else: | 196 | else: |
179 | envvars = (workerdata["fakerootnoenv"][fn] or "").split() | 197 | envvars = (runtask['fakerootnoenv'] or "").split() |
180 | for key, value in (var.split('=') for var in envvars): | 198 | for key, value in (var.split('=',1) for var in envvars): |
181 | envbackup[key] = os.environ.get(key) | 199 | envbackup[key] = os.environ.get(key) |
182 | os.environ[key] = value | 200 | os.environ[key] = value |
183 | fakeenv[key] = value | 201 | fakeenv[key] = value |
@@ -219,19 +237,21 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
219 | # Let SIGHUP exit as SIGTERM | 237 | # Let SIGHUP exit as SIGTERM |
220 | signal.signal(signal.SIGHUP, sigterm_handler) | 238 | signal.signal(signal.SIGHUP, sigterm_handler) |
221 | 239 | ||
222 | # No stdin | 240 | # No stdin & stdout |
223 | newsi = os.open(os.devnull, os.O_RDWR) | 241 | # stdout is used as a status report channel and must not be used by child processes. |
224 | os.dup2(newsi, sys.stdin.fileno()) | 242 | dumbio = os.open(os.devnull, os.O_RDWR) |
243 | os.dup2(dumbio, sys.stdin.fileno()) | ||
244 | os.dup2(dumbio, sys.stdout.fileno()) | ||
225 | 245 | ||
226 | if umask: | 246 | if umask is not None: |
227 | os.umask(umask) | 247 | os.umask(umask) |
228 | 248 | ||
229 | try: | 249 | try: |
230 | bb_cache = bb.cache.NoCache(databuilder) | ||
231 | (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) | 250 | (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) |
232 | the_data = databuilder.mcdata[mc] | 251 | the_data = databuilder.mcdata[mc] |
233 | the_data.setVar("BB_WORKERCONTEXT", "1") | 252 | the_data.setVar("BB_WORKERCONTEXT", "1") |
234 | the_data.setVar("BB_TASKDEPDATA", taskdepdata) | 253 | the_data.setVar("BB_TASKDEPDATA", taskdepdata) |
254 | the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) | ||
235 | if cfg.limited_deps: | 255 | if cfg.limited_deps: |
236 | the_data.setVar("BB_LIMITEDDEPS", "1") | 256 | the_data.setVar("BB_LIMITEDDEPS", "1") |
237 | the_data.setVar("BUILDNAME", workerdata["buildname"]) | 257 | the_data.setVar("BUILDNAME", workerdata["buildname"]) |
@@ -245,12 +265,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
245 | bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) | 265 | bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) |
246 | ret = 0 | 266 | ret = 0 |
247 | 267 | ||
248 | the_data = bb_cache.loadDataFull(fn, appends) | 268 | the_data = databuilder.parseRecipe(fn, appends, layername) |
249 | the_data.setVar('BB_TASKHASH', taskhash) | 269 | the_data.setVar('BB_TASKHASH', taskhash) |
250 | the_data.setVar('BB_UNIHASH', unihash) | 270 | the_data.setVar('BB_UNIHASH', unihash) |
271 | bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) | ||
251 | 272 | ||
252 | bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) | 273 | bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) |
253 | 274 | ||
275 | if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): | ||
276 | if bb.utils.is_local_uid(uid): | ||
277 | logger.debug("Attempting to disable network for %s" % taskname) | ||
278 | bb.utils.disable_network(uid, gid) | ||
279 | else: | ||
280 | logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) | ||
281 | |||
254 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() | 282 | # exported_vars() returns a generator which *cannot* be passed to os.environ.update() |
255 | # successfully. We also need to unset anything from the environment which shouldn't be there | 283 | # successfully. We also need to unset anything from the environment which shouldn't be there |
256 | exports = bb.data.exported_vars(the_data) | 284 | exports = bb.data.exported_vars(the_data) |
@@ -279,10 +307,20 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
279 | if not quieterrors: | 307 | if not quieterrors: |
280 | logger.critical(traceback.format_exc()) | 308 | logger.critical(traceback.format_exc()) |
281 | os._exit(1) | 309 | os._exit(1) |
310 | |||
311 | sys.stdout.flush() | ||
312 | sys.stderr.flush() | ||
313 | |||
282 | try: | 314 | try: |
283 | if dry_run: | 315 | if dry_run: |
284 | return 0 | 316 | return 0 |
285 | return bb.build.exec_task(fn, taskname, the_data, cfg.profile) | 317 | try: |
318 | ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) | ||
319 | finally: | ||
320 | if fakeroot: | ||
321 | fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) | ||
322 | subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) | ||
323 | return ret | ||
286 | except: | 324 | except: |
287 | os._exit(1) | 325 | os._exit(1) |
288 | if not profiling: | 326 | if not profiling: |
@@ -314,12 +352,12 @@ class runQueueWorkerPipe(): | |||
314 | if pipeout: | 352 | if pipeout: |
315 | pipeout.close() | 353 | pipeout.close() |
316 | bb.utils.nonblockingfd(self.input) | 354 | bb.utils.nonblockingfd(self.input) |
317 | self.queue = b"" | 355 | self.queue = bytearray() |
318 | 356 | ||
319 | def read(self): | 357 | def read(self): |
320 | start = len(self.queue) | 358 | start = len(self.queue) |
321 | try: | 359 | try: |
322 | self.queue = self.queue + (self.input.read(102400) or b"") | 360 | self.queue.extend(self.input.read(102400) or b"") |
323 | except (OSError, IOError) as e: | 361 | except (OSError, IOError) as e: |
324 | if e.errno != errno.EAGAIN: | 362 | if e.errno != errno.EAGAIN: |
325 | raise | 363 | raise |
@@ -347,7 +385,7 @@ class BitbakeWorker(object): | |||
347 | def __init__(self, din): | 385 | def __init__(self, din): |
348 | self.input = din | 386 | self.input = din |
349 | bb.utils.nonblockingfd(self.input) | 387 | bb.utils.nonblockingfd(self.input) |
350 | self.queue = b"" | 388 | self.queue = bytearray() |
351 | self.cookercfg = None | 389 | self.cookercfg = None |
352 | self.databuilder = None | 390 | self.databuilder = None |
353 | self.data = None | 391 | self.data = None |
@@ -381,7 +419,7 @@ class BitbakeWorker(object): | |||
381 | if len(r) == 0: | 419 | if len(r) == 0: |
382 | # EOF on pipe, server must have terminated | 420 | # EOF on pipe, server must have terminated |
383 | self.sigterm_exception(signal.SIGTERM, None) | 421 | self.sigterm_exception(signal.SIGTERM, None) |
384 | self.queue = self.queue + r | 422 | self.queue.extend(r) |
385 | except (OSError, IOError): | 423 | except (OSError, IOError): |
386 | pass | 424 | pass |
387 | if len(self.queue): | 425 | if len(self.queue): |
@@ -401,19 +439,35 @@ class BitbakeWorker(object): | |||
401 | while self.process_waitpid(): | 439 | while self.process_waitpid(): |
402 | continue | 440 | continue |
403 | 441 | ||
404 | |||
405 | def handle_item(self, item, func): | 442 | def handle_item(self, item, func): |
406 | if self.queue.startswith(b"<" + item + b">"): | 443 | opening_tag = b"<" + item + b">" |
407 | index = self.queue.find(b"</" + item + b">") | 444 | if not self.queue.startswith(opening_tag): |
408 | while index != -1: | 445 | return |
409 | func(self.queue[(len(item) + 2):index]) | 446 | |
410 | self.queue = self.queue[(index + len(item) + 3):] | 447 | tag_len = len(opening_tag) |
411 | index = self.queue.find(b"</" + item + b">") | 448 | if len(self.queue) < tag_len + 4: |
449 | # we need to receive more data | ||
450 | return | ||
451 | header = self.queue[tag_len:tag_len + 4] | ||
452 | payload_len = int.from_bytes(header, 'big') | ||
453 | # closing tag has length (tag_len + 1) | ||
454 | if len(self.queue) < tag_len * 2 + 1 + payload_len: | ||
455 | # we need to receive more data | ||
456 | return | ||
457 | |||
458 | index = self.queue.find(b"</" + item + b">") | ||
459 | if index != -1: | ||
460 | try: | ||
461 | func(self.queue[(tag_len + 4):index]) | ||
462 | except pickle.UnpicklingError: | ||
463 | workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) | ||
464 | raise | ||
465 | self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] | ||
412 | 466 | ||
413 | def handle_cookercfg(self, data): | 467 | def handle_cookercfg(self, data): |
414 | self.cookercfg = pickle.loads(data) | 468 | self.cookercfg = pickle.loads(data) |
415 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) | 469 | self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) |
416 | self.databuilder.parseBaseConfiguration() | 470 | self.databuilder.parseBaseConfiguration(worker=True) |
417 | self.data = self.databuilder.data | 471 | self.data = self.databuilder.data |
418 | 472 | ||
419 | def handle_extraconfigdata(self, data): | 473 | def handle_extraconfigdata(self, data): |
@@ -428,6 +482,7 @@ class BitbakeWorker(object): | |||
428 | for mc in self.databuilder.mcdata: | 482 | for mc in self.databuilder.mcdata: |
429 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) | 483 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) |
430 | self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) | 484 | self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) |
485 | self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") | ||
431 | 486 | ||
432 | def handle_newtaskhashes(self, data): | 487 | def handle_newtaskhashes(self, data): |
433 | self.workerdata["newhashes"] = pickle.loads(data) | 488 | self.workerdata["newhashes"] = pickle.loads(data) |
@@ -445,11 +500,15 @@ class BitbakeWorker(object): | |||
445 | sys.exit(0) | 500 | sys.exit(0) |
446 | 501 | ||
447 | def handle_runtask(self, data): | 502 | def handle_runtask(self, data): |
448 | fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) | 503 | runtask = pickle.loads(data) |
449 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) | ||
450 | 504 | ||
451 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) | 505 | fn = runtask['fn'] |
506 | task = runtask['task'] | ||
507 | taskname = runtask['taskname'] | ||
452 | 508 | ||
509 | workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) | ||
510 | |||
511 | pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) | ||
453 | self.build_pids[pid] = task | 512 | self.build_pids[pid] = task |
454 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) | 513 | self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) |
455 | 514 | ||
@@ -513,9 +572,11 @@ except BaseException as e: | |||
513 | import traceback | 572 | import traceback |
514 | sys.stderr.write(traceback.format_exc()) | 573 | sys.stderr.write(traceback.format_exc()) |
515 | sys.stderr.write(str(e)) | 574 | sys.stderr.write(str(e)) |
575 | finally: | ||
576 | worker_thread_exit = True | ||
577 | worker_thread.join() | ||
516 | 578 | ||
517 | worker_thread_exit = True | 579 | workerlog_write("exiting") |
518 | worker_thread.join() | 580 | if not normalexit: |
519 | 581 | sys.exit(1) | |
520 | workerlog_write("exitting") | ||
521 | sys.exit(0) | 582 | sys.exit(0) |