summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2016-08-15 17:58:39 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2016-08-18 10:06:26 +0100
commit0ef16f083eddb0eccd5fd1604e6e922a38705ae5 (patch)
tree4031c374b8d6d93f3ad374fda1c306b902a0750c /bitbake/lib/bb/runqueue.py
parent249686927ba7c1149298c2efd0645993a5dd74c5 (diff)
downloadpoky-0ef16f083eddb0eccd5fd1604e6e922a38705ae5.tar.gz
bitbake: runqueue: Abstract worker functionality to an object/array
With the introduction of multi-config and the possibility of distributed builds we need arrays of workers rather than the existing two. This refactors the code to have a dict() of workers and a dict of fakeworkers, represented by objects. The code can iterate over these. This is separated out from the multi-config changes since its separable and clearer this way. (Bitbake rev: 8181d96e0a4df0aa47287669681116fa65bcae16) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py122
1 files changed, 66 insertions, 56 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 3a593b6c4b..6a953b844a 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -922,6 +922,11 @@ class RunQueueData:
922 self.runtaskentries[tid].depends, 922 self.runtaskentries[tid].depends,
923 self.runtaskentries[tid].revdeps) 923 self.runtaskentries[tid].revdeps)
924 924
925class RunQueueWorker():
926 def __init__(self, process, pipe):
927 self.process = process
928 self.pipe = pipe
929
925class RunQueue: 930class RunQueue:
926 def __init__(self, cooker, cfgData, dataCache, taskData, targets): 931 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
927 932
@@ -940,10 +945,8 @@ class RunQueue:
940 self.dm = monitordisk.diskMonitor(cfgData) 945 self.dm = monitordisk.diskMonitor(cfgData)
941 946
942 self.rqexe = None 947 self.rqexe = None
943 self.worker = None 948 self.worker = {}
944 self.workerpipe = None 949 self.fakeworker = {}
945 self.fakeworker = None
946 self.fakeworkerpipe = None
947 950
948 def _start_worker(self, fakeroot = False, rqexec = None): 951 def _start_worker(self, fakeroot = False, rqexec = None):
949 logger.debug(1, "Starting bitbake-worker") 952 logger.debug(1, "Starting bitbake-worker")
@@ -988,55 +991,56 @@ class RunQueue:
988 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") 991 worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
989 worker.stdin.flush() 992 worker.stdin.flush()
990 993
991 return worker, workerpipe 994 return RunQueueWorker(worker, workerpipe)
992 995
993 def _teardown_worker(self, worker, workerpipe): 996 def _teardown_worker(self, worker):
994 if not worker: 997 if not worker:
995 return 998 return
996 logger.debug(1, "Teardown for bitbake-worker") 999 logger.debug(1, "Teardown for bitbake-worker")
997 try: 1000 try:
998 worker.stdin.write(b"<quit></quit>") 1001 worker.process.stdin.write(b"<quit></quit>")
999 worker.stdin.flush() 1002 worker.process.stdin.flush()
1000 worker.stdin.close() 1003 worker.process.stdin.close()
1001 except IOError: 1004 except IOError:
1002 pass 1005 pass
1003 while worker.returncode is None: 1006 while worker.process.returncode is None:
1004 workerpipe.read() 1007 worker.pipe.read()
1005 worker.poll() 1008 worker.process.poll()
1006 while workerpipe.read(): 1009 while worker.pipe.read():
1007 continue 1010 continue
1008 workerpipe.close() 1011 worker.pipe.close()
1009 1012
1010 def start_worker(self): 1013 def start_worker(self):
1011 if self.worker: 1014 if self.worker:
1012 self.teardown_workers() 1015 self.teardown_workers()
1013 self.teardown = False 1016 self.teardown = False
1014 self.worker, self.workerpipe = self._start_worker() 1017 self.worker[''] = self._start_worker()
1015 1018
1016 def start_fakeworker(self, rqexec): 1019 def start_fakeworker(self, rqexec):
1017 if not self.fakeworker: 1020 if not self.fakeworker:
1018 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec) 1021 self.fakeworker[''] = self._start_worker(True, rqexec)
1019 1022
1020 def teardown_workers(self): 1023 def teardown_workers(self):
1021 self.teardown = True 1024 self.teardown = True
1022 self._teardown_worker(self.worker, self.workerpipe) 1025 for mc in self.worker:
1023 self.worker = None 1026 self._teardown_worker(self.worker[mc])
1024 self.workerpipe = None 1027 self.worker = {}
1025 self._teardown_worker(self.fakeworker, self.fakeworkerpipe) 1028 for mc in self.fakeworker:
1026 self.fakeworker = None 1029 self._teardown_worker(self.fakeworker[mc])
1027 self.fakeworkerpipe = None 1030 self.fakeworker = {}
1028 1031
1029 def read_workers(self): 1032 def read_workers(self):
1030 self.workerpipe.read() 1033 for mc in self.worker:
1031 if self.fakeworkerpipe: 1034 self.worker[mc].pipe.read()
1032 self.fakeworkerpipe.read() 1035 for mc in self.fakeworker:
1036 self.fakeworker[mc].pipe.read()
1033 1037
1034 def active_fds(self): 1038 def active_fds(self):
1035 fds = [] 1039 fds = []
1036 if self.workerpipe: 1040 for mc in self.worker:
1037 fds.append(self.workerpipe.input) 1041 fds.append(self.worker[mc].pipe.input)
1038 if self.fakeworkerpipe: 1042 for mc in self.fakeworker:
1039 fds.append(self.fakeworkerpipe.input) 1043 fds.append(self.fakeworker[mc].pipe.input)
1040 return fds 1044 return fds
1041 1045
1042 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1046 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
@@ -1393,9 +1397,10 @@ class RunQueueExecute:
1393 1397
1394 self.stampcache = {} 1398 self.stampcache = {}
1395 1399
1396 rq.workerpipe.setrunqueueexec(self) 1400 for mc in rq.worker:
1397 if rq.fakeworkerpipe: 1401 rq.worker[mc].pipe.setrunqueueexec(self)
1398 rq.fakeworkerpipe.setrunqueueexec(self) 1402 for mc in rq.fakeworker:
1403 rq.fakeworker[mc].pipe.setrunqueueexec(self)
1399 1404
1400 if self.number_tasks <= 0: 1405 if self.number_tasks <= 0:
1401 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1406 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
@@ -1414,15 +1419,21 @@ class RunQueueExecute:
1414 return True 1419 return True
1415 1420
1416 def finish_now(self): 1421 def finish_now(self):
1417 for worker in [self.rq.worker, self.rq.fakeworker]: 1422 for mc in self.rq.worker:
1418 if not worker: 1423 try:
1419 continue 1424 self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
1425 self.rq.worker[mc].process.stdin.flush()
1426 except IOError:
1427 # worker must have died?
1428 pass
1429 for mc in self.rq.fakeworker:
1420 try: 1430 try:
1421 worker.stdin.write(b"<finishnow></finishnow>") 1431 self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
1422 worker.stdin.flush() 1432 self.rq.fakeworker[mc].process.stdin.flush()
1423 except IOError: 1433 except IOError:
1424 # worker must have died? 1434 # worker must have died?
1425 pass 1435 pass
1436
1426 if len(self.failed_fns) != 0: 1437 if len(self.failed_fns) != 0:
1427 self.rq.state = runQueueFailed 1438 self.rq.state = runQueueFailed
1428 return 1439 return
@@ -1733,11 +1744,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
1733 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 1744 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
1734 self.rq.state = runQueueFailed 1745 self.rq.state = runQueueFailed
1735 return True 1746 return True
1736 self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>") 1747 self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
1737 self.rq.fakeworker.stdin.flush() 1748 self.rq.fakeworker[''].process.stdin.flush()
1738 else: 1749 else:
1739 self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>") 1750 self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + b"</runtask>")
1740 self.rq.worker.stdin.flush() 1751 self.rq.worker[''].process.stdin.flush()
1741 1752
1742 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) 1753 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1743 self.build_stamps2.append(self.build_stamps[task]) 1754 self.build_stamps2.append(self.build_stamps[task])
@@ -2143,11 +2154,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
2143 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2154 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2144 if not self.rq.fakeworker: 2155 if not self.rq.fakeworker:
2145 self.rq.start_fakeworker(self) 2156 self.rq.start_fakeworker(self)
2146 self.rq.fakeworker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>") 2157 self.rq.fakeworker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
2147 self.rq.fakeworker.stdin.flush() 2158 self.rq.fakeworker[''].process.stdin.flush()
2148 else: 2159 else:
2149 self.rq.worker.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>") 2160 self.rq.worker[''].process.stdin.write(b"<runtask>" + pickle.dumps((fn, task, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + b"</runtask>")
2150 self.rq.worker.stdin.flush() 2161 self.rq.worker[''].process.stdin.flush()
2151 2162
2152 self.runq_running.add(task) 2163 self.runq_running.add(task)
2153 self.stats.taskActive() 2164 self.stats.taskActive()
@@ -2301,17 +2312,16 @@ class runQueuePipe():
2301 2312
2302 def read(self): 2313 def read(self):
2303 for w in [self.rq.worker, self.rq.fakeworker]: 2314 for w in [self.rq.worker, self.rq.fakeworker]:
2304 if not w: 2315 for mc in w:
2305 continue 2316 w[mc].process.poll()
2306 w.poll() 2317 if w[mc].process.returncode is not None and not self.rq.teardown:
2307 if w.returncode is not None and not self.rq.teardown: 2318 name = None
2308 name = None 2319 if w in self.rq.worker:
2309 if self.rq.worker and w.pid == self.rq.worker.pid: 2320 name = "Worker"
2310 name = "Worker" 2321 elif w in self.rq.fakeworker:
2311 elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid: 2322 name = "Fakeroot"
2312 name = "Fakeroot" 2323 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
2313 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode))) 2324 self.rq.finish_runqueue(True)
2314 self.rq.finish_runqueue(True)
2315 2325
2316 start = len(self.queue) 2326 start = len(self.queue)
2317 try: 2327 try: