diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2016-08-15 17:58:39 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2016-08-18 10:06:26 +0100 |
commit | 0ef16f083eddb0eccd5fd1604e6e922a38705ae5 (patch) | |
tree | 4031c374b8d6d93f3ad374fda1c306b902a0750c /bitbake/lib/bb | |
parent | 249686927ba7c1149298c2efd0645993a5dd74c5 (diff) | |
download | poky-0ef16f083eddb0eccd5fd1604e6e922a38705ae5.tar.gz |
bitbake: runqueue: Abstract worker functionality to an object/array
With the introduction of multi-config and the possibility of distributed
builds we need arrays of workers rather than the existing two.
This refactors the code to have a dict() of workers and a dict of
fakeworkers, represented by objects. The code can iterate over these.
This is separated out from the multi-config changes since its separable
and clearer this way.
(Bitbake rev: 8181d96e0a4df0aa47287669681116fa65bcae16)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 122 |
1 files changed, 66 insertions, 56 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 3a593b6c4b..6a953b844a 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -922,6 +922,11 @@ class RunQueueData: | |||
922 | self.runtaskentries[tid].depends, | 922 | self.runtaskentries[tid].depends, |
923 | self.runtaskentries[tid].revdeps) | 923 | self.runtaskentries[tid].revdeps) |
924 | 924 | ||
925 | class RunQueueWorker(): | ||
926 | def __init__(self, process, pipe): | ||
927 | self.process = process | ||
928 | self.pipe = pipe | ||
929 | |||
925 | class RunQueue: | 930 | class RunQueue: |
926 | def __init__(self, cooker, cfgData, dataCache, taskData, targets): | 931 | def __init__(self, cooker, cfgData, dataCache, taskData, targets): |
927 | 932 | ||
@@ -940,10 +945,8 @@ class RunQueue: | |||
940 | self.dm = monitordisk.diskMonitor(cfgData) | 945 | self.dm = monitordisk.diskMonitor(cfgData) |
941 | 946 | ||
942 | self.rqexe = None | 947 | self.rqexe = None |
943 | self.worker = None | 948 | self.worker = {} |
944 | self.workerpipe = None | 949 | self.fakeworker = {} |
945 | self.fakeworker = None | ||
946 | self.fakeworkerpipe = None | ||
947 | 950 | ||
948 | def _start_worker(self, fakeroot = False, rqexec = None): | 951 | def _start_worker(self, fakeroot = False, rqexec = None): |
949 | logger.debug(1, "Starting bitbake-worker") | 952 | logger.debug(1, "Starting bitbake-worker") |
@@ -988,55 +991,56 @@ class RunQueue: | |||
988 | worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") | 991 | worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") |
989 | worker.stdin.flush() | 992 | worker.stdin.flush() |
990 | 993 | ||
991 | return worker, workerpipe | 994 | return RunQueueWorker(worker, workerpipe) |
992 | 995 | ||
993 | def _teardown_worker(self, worker, workerpipe): | 996 | def _teardown_worker(self, worker): |
994 | if not worker: | 997 | if not worker: |
995 | return | 998 | return |
996 | logger.debug(1, "Teardown for bitbake-worker") | 999 | logger.debug(1, "Teardown for bitbake-worker") |
997 | try: | 1000 | try: |
998 | worker.stdin.write(b"<quit></quit>") | 1001 | worker.process.stdin.write(b"<quit></quit>") |
999 | worker.stdin.flush() | 1002 | worker.process.stdin.flush() |
1000 | worker.stdin.close() | 1003 | worker.process.stdin.close() |
1001 | except IOError: | 1004 | except IOError: |
1002 | pass | 1005 | pass |
1003 | while worker.returncode is None: | 1006 | while worker.process.returncode is None: |
1004 | workerpipe.read() | 1007 | worker.pipe.read() |
1005 | worker.poll() | 1008 | worker.process.poll() |
1006 | while workerpipe.read(): | 1009 | while worker.pipe.read(): |
1007 | continue | 1010 | continue |
1008 | workerpipe.close() | 1011 | worker.pipe.close() |
1009 | 1012 | ||
1010 | def start_worker(self): | 1013 | def start_worker(self): |
1011 | if self.worker: | 1014 | if self.worker: |
1012 | self.teardown_workers() | 1015 | self.teardown_workers() |
1013 | self.teardown = False | 1016 | self.teardown = False |
1014 | self.worker, self.workerpipe = self._start_worker() | 1017 | self.worker[''] = self._start_worker() |
1015 | 1018 | ||
1016 | def start_fakeworker(self, rqexec): | 1019 | def start_fakeworker(self, rqexec): |
1017 | if not self.fakeworker: | 1020 | if not self.fakeworker: |
1018 | self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec) | 1021 | self.fakeworker[''] = self._start_worker(True, rqexec) |
1019 | 1022 | ||
1020 | def teardown_workers(self): | 1023 | def teardown_workers(self): |
1021 | self.teardown = True | 1024 | self.teardown = True |
1022 | self._teardown_worker(self.worker, self.workerpipe) | 1025 | for mc in self.worker: |
1023 | self.worker = None | 1026 | self._teardown_worker(self.worker[mc]) |
1024 | self.workerpipe = None | 1027 | self.worker = {} |
1025 | self._teardown_worker(self.fakeworker, self.fakeworkerpipe) | 1028 | for mc in self.fakeworker: |
1026 | self.fakeworker = None | 1029 | self._teardown_worker(self.fakeworker[mc]) |
1027 | self.fakeworkerpipe = None | 1030 | self.fakeworker = {} |
1028 | 1031 | ||
1029 | def read_workers(self): | 1032 | def read_workers(self): |
1030 | self.workerpipe.read() | 1033 | for mc in self.worker: |
1031 | if self.fakeworkerpipe: | 1034 | self.worker[mc].pipe.read() |
1032 | self.fakeworkerpipe.read() | 1035 | for mc in self.fakeworker: |
1036 | self.fakeworker[mc].pipe.read() | ||
1033 | 1037 | ||
1034 | def active_fds(self): | 1038 | def active_fds(self): |
1035 | fds = [] | 1039 | fds = [] |
1036 | if self.workerpipe: | 1040 | for mc in self.worker: |
1037 | fds.append(self.workerpipe.input) | 1041 | fds.append(self.worker[mc].pipe.input) |
1038 | if self.fakeworkerpipe: | 1042 | for mc in self.fakeworker: |
1039 | fds.append(self.fakeworkerpipe.input) | 1043 | fds.append(self.fakeworker[mc].pipe.input) |
1040 | return fds | 1044 | return fds |
1041 | 1045 | ||
1042 | def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): | 1046 | def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): |
@@ -1393,9 +1397,10 @@ class RunQueueExecute: | |||
1393 | 1397 | ||
1394 | self.stampcache = {} | 1398 | self.stampcache = {} |
1395 | 1399 | ||
1396 | rq.workerpipe.setrunqueueexec(self) | 1400 | for mc in rq.worker: |
1397 | if rq.fakeworkerpipe: | 1401 | rq.worker[mc].pipe.setrunqueueexec(self) |
1398 | rq.fakeworkerpipe.setrunqueueexec(self) | 1402 | for mc in rq.fakeworker: |
1403 | rq.fakeworker[mc].pipe.setrunqueueexec(self) | ||
1399 | 1404 | ||
1400 | if self.number_tasks <= 0: | 1405 | if self.number_tasks <= 0: |
1401 | bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) | 1406 | bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) |
@@ -1414,15 +1419,21 @@ class RunQueueExecute: | |||
1414 | return True | 1419 | return True |
1415 | 1420 | ||
1416 | def finish_now(self): | 1421 | def finish_now(self): |
1417 | for worker in [self.rq.worker, self.rq.fakeworker]: | 1422 | for mc in self.rq.worker: |
1418 | if not worker: | 1423 | try: |
1419 | continue | 1424 | self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") |
1425 | self.rq.worker[mc].process.stdin.flush() | ||
1426 | except IOError: | ||
1427 | # worker must have died? | ||
1428 | pass | ||
1429 | for mc in self.rq.fakeworker: | ||
1420 | try: | 1430 | try: |
1421 | worker.stdin.write(b"<finishnow></finishnow>") | 1431 | self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") |
1422 | worker.stdin.flush() | 1432 | self.rq.fakeworker[mc].process.stdin.flush() |
1423 | except IOError: | 1433 | except IOError: |
1424 | # worker must have died? | 1434 | # worker must have died? |
1425 | pass | 1435 | pass |
1436 | |||
1426 | if len(self.failed_fns) != 0: | 1437 | if len(self.failed_fns) != 0: |
1427 | self.rq.state = runQueueFailed | 1438 | self.rq.state = runQueueFailed |
1428 | return | 1439 | return |
@@ -1733,11 +1744,11 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1733 | logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) | 1744 | logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) |
1734 | self.rq.state = runQueueFailed | 1745 | self.rq.state = runQueueFailed |
1735 | return True | 1746 | return True |
1736 | self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>") | 1747 | self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>") |
1737 | self.rq.fakeworker.stdin.flush() | 1748 | self.rq.fakeworker[''].process.stdin.flush() |
1738 | else: | 1749 | else: |
1739 | self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>") | 1750 | self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>") |
1740 | self.rq.worker.stdin.flush() | 1751 | self.rq.worker[''].process.stdin.flush() |
1741 | 1752 | ||
1742 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | 1753 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) |
1743 | self.build_stamps2.append(self.build_stamps[task]) | 1754 | self.build_stamps2.append(self.build_stamps[task]) |
@@ -2143,11 +2154,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
2143 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: | 2154 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: |
2144 | if not self.rq.fakeworker: | 2155 | if not self.rq.fakeworker: |
2145 | self.rq.start_fakeworker(self) | 2156 | self.rq.start_fakeworker(self) |
2146 | self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>") | 2157 | self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>") |
2147 | self.rq.fakeworker.stdin.flush() | 2158 | self.rq.fakeworker[''].process.stdin.flush() |
2148 | else: | 2159 | else: |
2149 | self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>") | 2160 | self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>") |
2150 | self.rq.worker.stdin.flush() | 2161 | self.rq.worker[''].process.stdin.flush() |
2151 | 2162 | ||
2152 | self.runq_running.add(task) | 2163 | self.runq_running.add(task) |
2153 | self.stats.taskActive() | 2164 | self.stats.taskActive() |
@@ -2301,17 +2312,16 @@ class runQueuePipe(): | |||
2301 | 2312 | ||
2302 | def read(self): | 2313 | def read(self): |
2303 | for w in [self.rq.worker, self.rq.fakeworker]: | 2314 | for w in [self.rq.worker, self.rq.fakeworker]: |
2304 | if not w: | 2315 | for mc in w: |
2305 | continue | 2316 | w[mc].process.poll() |
2306 | w.poll() | 2317 | if w[mc].process.returncode is not None and not self.rq.teardown: |
2307 | if w.returncode is not None and not self.rq.teardown: | 2318 | name = None |
2308 | name = None | 2319 | if w in self.rq.worker: |
2309 | if self.rq.worker and w.pid == self.rq.worker.pid: | 2320 | name = "Worker" |
2310 | name = "Worker" | 2321 | elif w in self.rq.fakeworker: |
2311 | elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid: | 2322 | name = "Fakeroot" |
2312 | name = "Fakeroot" | 2323 | bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode))) |
2313 | bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode))) | 2324 | self.rq.finish_runqueue(True) |
2314 | self.rq.finish_runqueue(True) | ||
2315 | 2325 | ||
2316 | start = len(self.queue) | 2326 | start = len(self.queue) |
2317 | try: | 2327 | try: |