summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorEtienne Cordonnier <ecordonnier@snap.com>2023-09-21 09:56:58 +0200
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-06 16:45:11 +0000
commitf1b0ab7e4b1cfeabaa8f981175e503d0c435ba0b (patch)
tree224e12e16188e74ca46679952ef608d4d593c419 /bitbake/lib/bb/runqueue.py
parent7de36b25d1df97b105fb52b5dc3c62ab1800fb3a (diff)
downloadpoky-f1b0ab7e4b1cfeabaa8f981175e503d0c435ba0b.tar.gz
bitbake: bitbake-worker: add header with length of message
The IPC mechanism between runqueue.py and bitbake-worker is currently not scalable: The data is sent with the format <tag>pickled-data</tag>, and bitbake-worker has no information about the size of the message. Therefore, the bitbake-worker is calling select() and read() in a loop, and then calling "self.queue.find(b"</" + item + b">")" for each chunk received. This does not scale, because queue.find has a linear complexity relative to the size of the queue, and workerdata messages get very big e.g. for builds which reference a lot of files in SRC_URI. The number of chunks varies, but on my test system a lot of chunks of 65536 bytes are sent, and each iteration takes 0.1 seconds, making the transfer of the "workerdata" data very slow (on my test setup 35 seconds before this fix, and 1.5 seconds after this fix). This commit adds a 4 bytes header after <tag>, so that bitbake-worker knows how many bytes need to be received, and does not need to constantly search the whole queue for </tag>. (Bitbake rev: 595176d6be95a9c4718d3a40499d1eb576b535f5) Signed-off-by: Etienne Cordonnier <ecordonnier@snap.com> Signed-off-by: Alexandre Belloni <alexandre.belloni@bootlin.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py34
1 files changed, 22 insertions, 12 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 50475ea0cd..1029eec07a 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -1318,6 +1318,16 @@ class RunQueue:
1318 self.worker = {} 1318 self.worker = {}
1319 self.fakeworker = {} 1319 self.fakeworker = {}
1320 1320
1321 @staticmethod
1322 def send_pickled_data(worker, data, name):
1323 msg = bytearray()
1324 msg.extend(b"<" + name.encode() + b">")
1325 pickled_data = pickle.dumps(data)
1326 msg.extend(len(pickled_data).to_bytes(4, 'big'))
1327 msg.extend(pickled_data)
1328 msg.extend(b"</" + name.encode() + b">")
1329 worker.stdin.write(msg)
1330
1321 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1331 def _start_worker(self, mc, fakeroot = False, rqexec = None):
1322 logger.debug("Starting bitbake-worker") 1332 logger.debug("Starting bitbake-worker")
1323 magic = "decafbad" 1333 magic = "decafbad"
@@ -1355,9 +1365,9 @@ class RunQueue:
1355 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1365 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1356 } 1366 }
1357 1367
1358 worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") 1368 RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
1359 worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") 1369 RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
1360 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 1370 RunQueue.send_pickled_data(worker, workerdata, "workerdata")
1361 worker.stdin.flush() 1371 worker.stdin.flush()
1362 1372
1363 return RunQueueWorker(worker, workerpipe) 1373 return RunQueueWorker(worker, workerpipe)
@@ -1367,7 +1377,7 @@ class RunQueue:
1367 return 1377 return
1368 logger.debug("Teardown for bitbake-worker") 1378 logger.debug("Teardown for bitbake-worker")
1369 try: 1379 try:
1370 worker.process.stdin.write(b"<quit></quit>") 1380 RunQueue.send_pickled_data(worker.process, b"", "quit")
1371 worker.process.stdin.flush() 1381 worker.process.stdin.flush()
1372 worker.process.stdin.close() 1382 worker.process.stdin.close()
1373 except IOError: 1383 except IOError:
@@ -1894,14 +1904,14 @@ class RunQueueExecute:
1894 def finish_now(self): 1904 def finish_now(self):
1895 for mc in self.rq.worker: 1905 for mc in self.rq.worker:
1896 try: 1906 try:
1897 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") 1907 RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
1898 self.rq.worker[mc].process.stdin.flush() 1908 self.rq.worker[mc].process.stdin.flush()
1899 except IOError: 1909 except IOError:
1900 # worker must have died? 1910 # worker must have died?
1901 pass 1911 pass
1902 for mc in self.rq.fakeworker: 1912 for mc in self.rq.fakeworker:
1903 try: 1913 try:
1904 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") 1914 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
1905 self.rq.fakeworker[mc].process.stdin.flush() 1915 self.rq.fakeworker[mc].process.stdin.flush()
1906 except IOError: 1916 except IOError:
1907 # worker must have died? 1917 # worker must have died?
@@ -2196,10 +2206,10 @@ class RunQueueExecute:
2196 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2206 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2197 if not mc in self.rq.fakeworker: 2207 if not mc in self.rq.fakeworker:
2198 self.rq.start_fakeworker(self, mc) 2208 self.rq.start_fakeworker(self, mc)
2199 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2209 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2200 self.rq.fakeworker[mc].process.stdin.flush() 2210 self.rq.fakeworker[mc].process.stdin.flush()
2201 else: 2211 else:
2202 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2212 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2203 self.rq.worker[mc].process.stdin.flush() 2213 self.rq.worker[mc].process.stdin.flush()
2204 2214
2205 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2215 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2297,10 +2307,10 @@ class RunQueueExecute:
2297 self.rq.state = runQueueFailed 2307 self.rq.state = runQueueFailed
2298 self.stats.taskFailed() 2308 self.stats.taskFailed()
2299 return True 2309 return True
2300 self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2310 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2301 self.rq.fakeworker[mc].process.stdin.flush() 2311 self.rq.fakeworker[mc].process.stdin.flush()
2302 else: 2312 else:
2303 self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") 2313 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2304 self.rq.worker[mc].process.stdin.flush() 2314 self.rq.worker[mc].process.stdin.flush()
2305 2315
2306 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2316 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
@@ -2502,9 +2512,9 @@ class RunQueueExecute:
2502 2512
2503 if changed: 2513 if changed:
2504 for mc in self.rq.worker: 2514 for mc in self.rq.worker:
2505 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2515 RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2506 for mc in self.rq.fakeworker: 2516 for mc in self.rq.fakeworker:
2507 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2517 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2508 2518
2509 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2519 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2510 2520