summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-07 18:11:49 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2013-06-14 12:52:56 +0100
commit026c94be2ea2889a5efc1259cd4f8a33a4a677ef (patch)
tree9419114c187a692b8a383692e64dab11184ccfd8
parentd0f0e5d9e69cc22f0c6635c7e416de93660c6bca (diff)
downloadpoky-026c94be2ea2889a5efc1259cd4f8a33a4a677ef.tar.gz
bitbake: runqueue: Move the bitbake-worker execution to a higher level
The worker was being executed by each execution queue so would get constructed twice for each build. This is wasteful so move execution to the main runqueue so we only have to start the worker once. (Bitbake rev: 8117f8480125b121b2b5ac0afc31b108d9e670ae) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--bitbake/lib/bb/runqueue.py114
1 files changed, 55 insertions, 59 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index dd6e071c37..34a123b484 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -84,7 +84,6 @@ runQueueRunning = 6
84runQueueFailed = 7 84runQueueFailed = 7
85runQueueCleanUp = 8 85runQueueCleanUp = 8
86runQueueComplete = 9 86runQueueComplete = 9
87runQueueChildProcess = 10
88 87
89class RunQueueScheduler(object): 88class RunQueueScheduler(object):
90 """ 89 """
@@ -800,6 +799,45 @@ class RunQueue:
800 self.dm = monitordisk.diskMonitor(cfgData) 799 self.dm = monitordisk.diskMonitor(cfgData)
801 800
802 self.rqexe = None 801 self.rqexe = None
802 self.worker = None
803
804 def start_worker(self):
805 if self.worker:
806 self.teardown_worker()
807
808 logger.debug(1, "Starting bitbake-worker")
809 self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
810 bb.utils.nonblockingfd(self.worker.stdout)
811 self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self)
812
813 workerdata = {
814 "taskdeps" : self.rqdata.dataCache.task_deps,
815 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
816 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
817 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
818 "hashes" : self.rqdata.hashes,
819 "hash_deps" : self.rqdata.hash_deps,
820 "sigchecksums" : bb.parse.siggen.file_checksum_values,
821 "runq_hash" : self.rqdata.runq_hash,
822 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
823 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
824 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
825 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
826 }
827
828 self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
829 self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
830 self.worker.stdin.flush()
831
832 def teardown_worker(self):
833 logger.debug(1, "Teardown for bitbake-worker")
834 self.worker.stdin.write("<quit></quit>")
835 self.worker.stdin.flush()
836 while self.worker.returncode is None:
837 self.workerpipe.read()
838 self.worker.poll()
839 while self.workerpipe.read():
840 continue
803 841
804 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None): 842 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
805 def get_timestamp(f): 843 def get_timestamp(f):
@@ -891,6 +929,7 @@ class RunQueue:
891 if self.cooker.configuration.dump_signatures: 929 if self.cooker.configuration.dump_signatures:
892 self.dump_signatures() 930 self.dump_signatures()
893 else: 931 else:
932 self.start_worker()
894 self.rqexe = RunQueueExecuteScenequeue(self) 933 self.rqexe = RunQueueExecuteScenequeue(self)
895 934
896 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]: 935 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
@@ -911,6 +950,7 @@ class RunQueue:
911 self.rqexe.finish() 950 self.rqexe.finish()
912 951
913 if self.state is runQueueComplete or self.state is runQueueFailed: 952 if self.state is runQueueComplete or self.state is runQueueFailed:
953 self.teardown_worker()
914 if self.rqexe.stats.failed: 954 if self.rqexe.stats.failed:
915 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 955 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
916 else: 956 else:
@@ -928,10 +968,6 @@ class RunQueue:
928 # All done 968 # All done
929 return False 969 return False
930 970
931 if self.state is runQueueChildProcess:
932 print("Child process, eeek, shouldn't happen!")
933 return False
934
935 # Loop 971 # Loop
936 return retval 972 return retval
937 973
@@ -946,7 +982,7 @@ class RunQueue:
946 except: 982 except:
947 logger.error("An uncaught exception occured in runqueue, please see the failure below:") 983 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
948 try: 984 try:
949 self.rqexe.teardown() 985 self.teardown_worker()
950 except: 986 except:
951 pass 987 pass
952 self.state = runQueueComplete 988 self.state = runQueueComplete
@@ -996,29 +1032,7 @@ class RunQueueExecute:
996 1032
997 self.stampcache = {} 1033 self.stampcache = {}
998 1034
999 logger.debug(1, "Starting bitbake-worker") 1035 rq.workerpipe.setrunqueueexec(self)
1000 self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1001 bb.utils.nonblockingfd(self.worker.stdout)
1002 self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self)
1003
1004 workerdata = {
1005 "taskdeps" : self.rqdata.dataCache.task_deps,
1006 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
1007 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
1008 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
1009 "hashes" : self.rqdata.hashes,
1010 "hash_deps" : self.rqdata.hash_deps,
1011 "sigchecksums" : bb.parse.siggen.file_checksum_values,
1012 "runq_hash" : self.rqdata.runq_hash,
1013 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
1014 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
1015 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
1016 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1017 }
1018
1019 self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
1020 self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
1021 self.worker.stdin.flush()
1022 1036
1023 def runqueue_process_waitpid(self, task, status): 1037 def runqueue_process_waitpid(self, task, status):
1024 1038
@@ -1034,10 +1048,8 @@ class RunQueueExecute:
1034 1048
1035 def finish_now(self): 1049 def finish_now(self):
1036 1050
1037 self.worker.stdin.write("<finishnow></finishnow>") 1051 self.rq.worker.stdin.write("<finishnow></finishnow>")
1038 self.worker.stdin.flush() 1052 self.rq.worker.stdin.flush()
1039
1040 self.teardown()
1041 1053
1042 if len(self.failed_fnids) != 0: 1054 if len(self.failed_fnids) != 0:
1043 self.rq.state = runQueueFailed 1055 self.rq.state = runQueueFailed
@@ -1051,11 +1063,9 @@ class RunQueueExecute:
1051 1063
1052 if self.stats.active > 0: 1064 if self.stats.active > 0:
1053 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) 1065 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1054 self.workerpipe.read() 1066 self.rq.workerpipe.read()
1055 return 1067 return
1056 1068
1057 self.teardown()
1058
1059 if len(self.failed_fnids) != 0: 1069 if len(self.failed_fnids) != 0:
1060 self.rq.state = runQueueFailed 1070 self.rq.state = runQueueFailed
1061 return 1071 return
@@ -1083,16 +1093,6 @@ class RunQueueExecute:
1083 valid = bb.utils.better_eval(call, locs) 1093 valid = bb.utils.better_eval(call, locs)
1084 return valid 1094 return valid
1085 1095
1086 def teardown(self):
1087 logger.debug(1, "Teardown for bitbake-worker")
1088 self.worker.stdin.write("<quit></quit>")
1089 self.worker.stdin.flush()
1090 while self.worker.returncode is None:
1091 self.workerpipe.read()
1092 self.worker.poll()
1093 while self.workerpipe.read():
1094 continue
1095
1096class RunQueueExecuteDummy(RunQueueExecute): 1096class RunQueueExecuteDummy(RunQueueExecute):
1097 def __init__(self, rq): 1097 def __init__(self, rq):
1098 self.rq = rq 1098 self.rq = rq
@@ -1257,7 +1257,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
1257 Run the tasks in a queue prepared by rqdata.prepare() 1257 Run the tasks in a queue prepared by rqdata.prepare()
1258 """ 1258 """
1259 1259
1260 self.workerpipe.read() 1260 self.rq.workerpipe.read()
1261 1261
1262 1262
1263 if self.stats.total == 0: 1263 if self.stats.total == 0:
@@ -1295,8 +1295,8 @@ class RunQueueExecuteTasks(RunQueueExecute):
1295 startevent = runQueueTaskStarted(task, self.stats, self.rq) 1295 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1296 bb.event.fire(startevent, self.cfgData) 1296 bb.event.fire(startevent, self.cfgData)
1297 1297
1298 self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>") 1298 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1299 self.worker.stdin.flush() 1299 self.rq.worker.stdin.flush()
1300 1300
1301 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) 1301 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1302 self.runq_running[task] = 1 1302 self.runq_running[task] = 1
@@ -1305,11 +1305,9 @@ class RunQueueExecuteTasks(RunQueueExecute):
1305 return True 1305 return True
1306 1306
1307 if self.stats.active > 0: 1307 if self.stats.active > 0:
1308 self.workerpipe.read() 1308 self.rq.workerpipe.read()
1309 return 0.5 1309 return 0.5
1310 1310
1311 self.teardown()
1312
1313 if len(self.failed_fnids) != 0: 1311 if len(self.failed_fnids) != 0:
1314 self.rq.state = runQueueFailed 1312 self.rq.state = runQueueFailed
1315 return True 1313 return True
@@ -1337,7 +1335,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1337 # If we don't have any setscene functions, skip this step 1335 # If we don't have any setscene functions, skip this step
1338 if len(self.rqdata.runq_setscene) == 0: 1336 if len(self.rqdata.runq_setscene) == 0:
1339 rq.scenequeue_covered = set() 1337 rq.scenequeue_covered = set()
1340 self.teardown()
1341 rq.state = runQueueRunInit 1338 rq.state = runQueueRunInit
1342 return 1339 return
1343 1340
@@ -1586,7 +1583,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1586 Run the tasks in a queue prepared by prepare_runqueue 1583 Run the tasks in a queue prepared by prepare_runqueue
1587 """ 1584 """
1588 1585
1589 self.workerpipe.read() 1586 self.rq.workerpipe.read()
1590 1587
1591 task = None 1588 task = None
1592 if self.stats.active < self.number_tasks: 1589 if self.stats.active < self.number_tasks:
@@ -1628,8 +1625,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1628 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 1625 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1629 bb.event.fire(startevent, self.cfgData) 1626 bb.event.fire(startevent, self.cfgData)
1630 1627
1631 self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>") 1628 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1632 self.worker.stdin.flush() 1629 self.rq.worker.stdin.flush()
1633 1630
1634 self.runq_running[task] = 1 1631 self.runq_running[task] = 1
1635 self.stats.taskActive() 1632 self.stats.taskActive()
@@ -1637,7 +1634,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1637 return True 1634 return True
1638 1635
1639 if self.stats.active > 0: 1636 if self.stats.active > 0:
1640 self.workerpipe.read() 1637 self.rq.workerpipe.read()
1641 return 0.5 1638 return 0.5
1642 1639
1643 # Convert scenequeue_covered task numbers into full taskgraph ids 1640 # Convert scenequeue_covered task numbers into full taskgraph ids
@@ -1652,7 +1649,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
1652 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) 1649 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1653 1650
1654 self.rq.state = runQueueRunInit 1651 self.rq.state = runQueueRunInit
1655 self.teardown()
1656 return True 1652 return True
1657 1653
1658 def runqueue_process_waitpid(self, task, status): 1654 def runqueue_process_waitpid(self, task, status):
@@ -1747,7 +1743,7 @@ class runQueuePipe():
1747 self.d = d 1743 self.d = d
1748 self.rq = rq 1744 self.rq = rq
1749 1745
1750 def setrunqueue(self, rq): 1746 def setrunqueueexec(self, rq):
1751 self.rq = rq 1747 self.rq = rq
1752 1748
1753 def read(self): 1749 def read(self):