diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-06-07 18:11:49 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2013-06-14 12:52:56 +0100 |
commit | 026c94be2ea2889a5efc1259cd4f8a33a4a677ef (patch) | |
tree | 9419114c187a692b8a383692e64dab11184ccfd8 | |
parent | d0f0e5d9e69cc22f0c6635c7e416de93660c6bca (diff) | |
download | poky-026c94be2ea2889a5efc1259cd4f8a33a4a677ef.tar.gz |
bitbake: runqueue: Move the bitbake-worker execution to a higher level
The worker was being executed by each execution queue so would get
constructed twice for each build. This is wasteful so move execution
to the main runqueue so we only have to start the worker once.
(Bitbake rev: 8117f8480125b121b2b5ac0afc31b108d9e670ae)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 114 |
1 files changed, 55 insertions, 59 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index dd6e071c37..34a123b484 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -84,7 +84,6 @@ runQueueRunning = 6 | |||
84 | runQueueFailed = 7 | 84 | runQueueFailed = 7 |
85 | runQueueCleanUp = 8 | 85 | runQueueCleanUp = 8 |
86 | runQueueComplete = 9 | 86 | runQueueComplete = 9 |
87 | runQueueChildProcess = 10 | ||
88 | 87 | ||
89 | class RunQueueScheduler(object): | 88 | class RunQueueScheduler(object): |
90 | """ | 89 | """ |
@@ -800,6 +799,45 @@ class RunQueue: | |||
800 | self.dm = monitordisk.diskMonitor(cfgData) | 799 | self.dm = monitordisk.diskMonitor(cfgData) |
801 | 800 | ||
802 | self.rqexe = None | 801 | self.rqexe = None |
802 | self.worker = None | ||
803 | |||
804 | def start_worker(self): | ||
805 | if self.worker: | ||
806 | self.teardown_worker() | ||
807 | |||
808 | logger.debug(1, "Starting bitbake-worker") | ||
809 | self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) | ||
810 | bb.utils.nonblockingfd(self.worker.stdout) | ||
811 | self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self) | ||
812 | |||
813 | workerdata = { | ||
814 | "taskdeps" : self.rqdata.dataCache.task_deps, | ||
815 | "fakerootenv" : self.rqdata.dataCache.fakerootenv, | ||
816 | "fakerootdirs" : self.rqdata.dataCache.fakerootdirs, | ||
817 | "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv, | ||
818 | "hashes" : self.rqdata.hashes, | ||
819 | "hash_deps" : self.rqdata.hash_deps, | ||
820 | "sigchecksums" : bb.parse.siggen.file_checksum_values, | ||
821 | "runq_hash" : self.rqdata.runq_hash, | ||
822 | "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel, | ||
823 | "logdefaultverbose" : bb.msg.loggerDefaultVerbose, | ||
824 | "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, | ||
825 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, | ||
826 | } | ||
827 | |||
828 | self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>") | ||
829 | self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>") | ||
830 | self.worker.stdin.flush() | ||
831 | |||
832 | def teardown_worker(self): | ||
833 | logger.debug(1, "Teardown for bitbake-worker") | ||
834 | self.worker.stdin.write("<quit></quit>") | ||
835 | self.worker.stdin.flush() | ||
836 | while self.worker.returncode is None: | ||
837 | self.workerpipe.read() | ||
838 | self.worker.poll() | ||
839 | while self.workerpipe.read(): | ||
840 | continue | ||
803 | 841 | ||
804 | def check_stamp_task(self, task, taskname = None, recurse = False, cache = None): | 842 | def check_stamp_task(self, task, taskname = None, recurse = False, cache = None): |
805 | def get_timestamp(f): | 843 | def get_timestamp(f): |
@@ -891,6 +929,7 @@ class RunQueue: | |||
891 | if self.cooker.configuration.dump_signatures: | 929 | if self.cooker.configuration.dump_signatures: |
892 | self.dump_signatures() | 930 | self.dump_signatures() |
893 | else: | 931 | else: |
932 | self.start_worker() | ||
894 | self.rqexe = RunQueueExecuteScenequeue(self) | 933 | self.rqexe = RunQueueExecuteScenequeue(self) |
895 | 934 | ||
896 | if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]: | 935 | if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]: |
@@ -911,6 +950,7 @@ class RunQueue: | |||
911 | self.rqexe.finish() | 950 | self.rqexe.finish() |
912 | 951 | ||
913 | if self.state is runQueueComplete or self.state is runQueueFailed: | 952 | if self.state is runQueueComplete or self.state is runQueueFailed: |
953 | self.teardown_worker() | ||
914 | if self.rqexe.stats.failed: | 954 | if self.rqexe.stats.failed: |
915 | logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) | 955 | logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) |
916 | else: | 956 | else: |
@@ -928,10 +968,6 @@ class RunQueue: | |||
928 | # All done | 968 | # All done |
929 | return False | 969 | return False |
930 | 970 | ||
931 | if self.state is runQueueChildProcess: | ||
932 | print("Child process, eeek, shouldn't happen!") | ||
933 | return False | ||
934 | |||
935 | # Loop | 971 | # Loop |
936 | return retval | 972 | return retval |
937 | 973 | ||
@@ -946,7 +982,7 @@ class RunQueue: | |||
946 | except: | 982 | except: |
947 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") | 983 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") |
948 | try: | 984 | try: |
949 | self.rqexe.teardown() | 985 | self.teardown_worker() |
950 | except: | 986 | except: |
951 | pass | 987 | pass |
952 | self.state = runQueueComplete | 988 | self.state = runQueueComplete |
@@ -996,29 +1032,7 @@ class RunQueueExecute: | |||
996 | 1032 | ||
997 | self.stampcache = {} | 1033 | self.stampcache = {} |
998 | 1034 | ||
999 | logger.debug(1, "Starting bitbake-worker") | 1035 | rq.workerpipe.setrunqueueexec(self) |
1000 | self.worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) | ||
1001 | bb.utils.nonblockingfd(self.worker.stdout) | ||
1002 | self.workerpipe = runQueuePipe(self.worker.stdout, None, self.cfgData, self) | ||
1003 | |||
1004 | workerdata = { | ||
1005 | "taskdeps" : self.rqdata.dataCache.task_deps, | ||
1006 | "fakerootenv" : self.rqdata.dataCache.fakerootenv, | ||
1007 | "fakerootdirs" : self.rqdata.dataCache.fakerootdirs, | ||
1008 | "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv, | ||
1009 | "hashes" : self.rqdata.hashes, | ||
1010 | "hash_deps" : self.rqdata.hash_deps, | ||
1011 | "sigchecksums" : bb.parse.siggen.file_checksum_values, | ||
1012 | "runq_hash" : self.rqdata.runq_hash, | ||
1013 | "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel, | ||
1014 | "logdefaultverbose" : bb.msg.loggerDefaultVerbose, | ||
1015 | "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, | ||
1016 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, | ||
1017 | } | ||
1018 | |||
1019 | self.worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>") | ||
1020 | self.worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>") | ||
1021 | self.worker.stdin.flush() | ||
1022 | 1036 | ||
1023 | def runqueue_process_waitpid(self, task, status): | 1037 | def runqueue_process_waitpid(self, task, status): |
1024 | 1038 | ||
@@ -1034,10 +1048,8 @@ class RunQueueExecute: | |||
1034 | 1048 | ||
1035 | def finish_now(self): | 1049 | def finish_now(self): |
1036 | 1050 | ||
1037 | self.worker.stdin.write("<finishnow></finishnow>") | 1051 | self.rq.worker.stdin.write("<finishnow></finishnow>") |
1038 | self.worker.stdin.flush() | 1052 | self.rq.worker.stdin.flush() |
1039 | |||
1040 | self.teardown() | ||
1041 | 1053 | ||
1042 | if len(self.failed_fnids) != 0: | 1054 | if len(self.failed_fnids) != 0: |
1043 | self.rq.state = runQueueFailed | 1055 | self.rq.state = runQueueFailed |
@@ -1051,11 +1063,9 @@ class RunQueueExecute: | |||
1051 | 1063 | ||
1052 | if self.stats.active > 0: | 1064 | if self.stats.active > 0: |
1053 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | 1065 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) |
1054 | self.workerpipe.read() | 1066 | self.rq.workerpipe.read() |
1055 | return | 1067 | return |
1056 | 1068 | ||
1057 | self.teardown() | ||
1058 | |||
1059 | if len(self.failed_fnids) != 0: | 1069 | if len(self.failed_fnids) != 0: |
1060 | self.rq.state = runQueueFailed | 1070 | self.rq.state = runQueueFailed |
1061 | return | 1071 | return |
@@ -1083,16 +1093,6 @@ class RunQueueExecute: | |||
1083 | valid = bb.utils.better_eval(call, locs) | 1093 | valid = bb.utils.better_eval(call, locs) |
1084 | return valid | 1094 | return valid |
1085 | 1095 | ||
1086 | def teardown(self): | ||
1087 | logger.debug(1, "Teardown for bitbake-worker") | ||
1088 | self.worker.stdin.write("<quit></quit>") | ||
1089 | self.worker.stdin.flush() | ||
1090 | while self.worker.returncode is None: | ||
1091 | self.workerpipe.read() | ||
1092 | self.worker.poll() | ||
1093 | while self.workerpipe.read(): | ||
1094 | continue | ||
1095 | |||
1096 | class RunQueueExecuteDummy(RunQueueExecute): | 1096 | class RunQueueExecuteDummy(RunQueueExecute): |
1097 | def __init__(self, rq): | 1097 | def __init__(self, rq): |
1098 | self.rq = rq | 1098 | self.rq = rq |
@@ -1257,7 +1257,7 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1257 | Run the tasks in a queue prepared by rqdata.prepare() | 1257 | Run the tasks in a queue prepared by rqdata.prepare() |
1258 | """ | 1258 | """ |
1259 | 1259 | ||
1260 | self.workerpipe.read() | 1260 | self.rq.workerpipe.read() |
1261 | 1261 | ||
1262 | 1262 | ||
1263 | if self.stats.total == 0: | 1263 | if self.stats.total == 0: |
@@ -1295,8 +1295,8 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1295 | startevent = runQueueTaskStarted(task, self.stats, self.rq) | 1295 | startevent = runQueueTaskStarted(task, self.stats, self.rq) |
1296 | bb.event.fire(startevent, self.cfgData) | 1296 | bb.event.fire(startevent, self.cfgData) |
1297 | 1297 | ||
1298 | self.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>") | 1298 | self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>") |
1299 | self.worker.stdin.flush() | 1299 | self.rq.worker.stdin.flush() |
1300 | 1300 | ||
1301 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | 1301 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) |
1302 | self.runq_running[task] = 1 | 1302 | self.runq_running[task] = 1 |
@@ -1305,11 +1305,9 @@ class RunQueueExecuteTasks(RunQueueExecute): | |||
1305 | return True | 1305 | return True |
1306 | 1306 | ||
1307 | if self.stats.active > 0: | 1307 | if self.stats.active > 0: |
1308 | self.workerpipe.read() | 1308 | self.rq.workerpipe.read() |
1309 | return 0.5 | 1309 | return 0.5 |
1310 | 1310 | ||
1311 | self.teardown() | ||
1312 | |||
1313 | if len(self.failed_fnids) != 0: | 1311 | if len(self.failed_fnids) != 0: |
1314 | self.rq.state = runQueueFailed | 1312 | self.rq.state = runQueueFailed |
1315 | return True | 1313 | return True |
@@ -1337,7 +1335,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1337 | # If we don't have any setscene functions, skip this step | 1335 | # If we don't have any setscene functions, skip this step |
1338 | if len(self.rqdata.runq_setscene) == 0: | 1336 | if len(self.rqdata.runq_setscene) == 0: |
1339 | rq.scenequeue_covered = set() | 1337 | rq.scenequeue_covered = set() |
1340 | self.teardown() | ||
1341 | rq.state = runQueueRunInit | 1338 | rq.state = runQueueRunInit |
1342 | return | 1339 | return |
1343 | 1340 | ||
@@ -1586,7 +1583,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1586 | Run the tasks in a queue prepared by prepare_runqueue | 1583 | Run the tasks in a queue prepared by prepare_runqueue |
1587 | """ | 1584 | """ |
1588 | 1585 | ||
1589 | self.workerpipe.read() | 1586 | self.rq.workerpipe.read() |
1590 | 1587 | ||
1591 | task = None | 1588 | task = None |
1592 | if self.stats.active < self.number_tasks: | 1589 | if self.stats.active < self.number_tasks: |
@@ -1628,8 +1625,8 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1628 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) | 1625 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) |
1629 | bb.event.fire(startevent, self.cfgData) | 1626 | bb.event.fire(startevent, self.cfgData) |
1630 | 1627 | ||
1631 | self.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>") | 1628 | self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>") |
1632 | self.worker.stdin.flush() | 1629 | self.rq.worker.stdin.flush() |
1633 | 1630 | ||
1634 | self.runq_running[task] = 1 | 1631 | self.runq_running[task] = 1 |
1635 | self.stats.taskActive() | 1632 | self.stats.taskActive() |
@@ -1637,7 +1634,7 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1637 | return True | 1634 | return True |
1638 | 1635 | ||
1639 | if self.stats.active > 0: | 1636 | if self.stats.active > 0: |
1640 | self.workerpipe.read() | 1637 | self.rq.workerpipe.read() |
1641 | return 0.5 | 1638 | return 0.5 |
1642 | 1639 | ||
1643 | # Convert scenequeue_covered task numbers into full taskgraph ids | 1640 | # Convert scenequeue_covered task numbers into full taskgraph ids |
@@ -1652,7 +1649,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute): | |||
1652 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) | 1649 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) |
1653 | 1650 | ||
1654 | self.rq.state = runQueueRunInit | 1651 | self.rq.state = runQueueRunInit |
1655 | self.teardown() | ||
1656 | return True | 1652 | return True |
1657 | 1653 | ||
1658 | def runqueue_process_waitpid(self, task, status): | 1654 | def runqueue_process_waitpid(self, task, status): |
@@ -1747,7 +1743,7 @@ class runQueuePipe(): | |||
1747 | self.d = d | 1743 | self.d = d |
1748 | self.rq = rq | 1744 | self.rq = rq |
1749 | 1745 | ||
1750 | def setrunqueue(self, rq): | 1746 | def setrunqueueexec(self, rq): |
1751 | self.rq = rq | 1747 | self.rq = rq |
1752 | 1748 | ||
1753 | def read(self): | 1749 | def read(self): |