diff options
author | Richard Purdie <rpurdie@linux.intel.com> | 2010-08-18 17:13:06 +0100 |
---|---|---|
committer | Richard Purdie <rpurdie@linux.intel.com> | 2010-08-19 11:41:44 +0100 |
commit | 65b068a5f851815eead6b64f366b31fa7dbfe553 (patch) | |
tree | ebe0c13061bcdfe316e55703d0827ee9d767467c /bitbake | |
parent | 96ec9f8a60b2c8d480e052039e1b14a57131fd4f (diff) | |
download | poky-65b068a5f851815eead6b64f366b31fa7dbfe553.tar.gz |
bitbake/runqueue.py: Create RunQueueExecute and RunQueueExecuteTasks classes, further splitting up runqueue
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 272 |
1 files changed, 144 insertions, 128 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 9f714e46ad..86d60fa05b 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -696,7 +696,6 @@ class RunQueueData: | |||
696 | self.rqdata.runq_depends[task], | 696 | self.rqdata.runq_depends[task], |
697 | self.rqdata.runq_revdeps[task])) | 697 | self.rqdata.runq_revdeps[task])) |
698 | 698 | ||
699 | |||
700 | class RunQueue: | 699 | class RunQueue: |
701 | def __init__(self, cooker, cfgData, dataCache, taskData, targets): | 700 | def __init__(self, cooker, cfgData, dataCache, taskData, targets): |
702 | 701 | ||
@@ -704,8 +703,6 @@ class RunQueue: | |||
704 | self.cfgData = cfgData | 703 | self.cfgData = cfgData |
705 | self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets) | 704 | self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets) |
706 | 705 | ||
707 | self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) | ||
708 | self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" | ||
709 | self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" | 706 | self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" |
710 | 707 | ||
711 | self.state = runQueuePrepare | 708 | self.state = runQueuePrepare |
@@ -862,13 +859,14 @@ class RunQueue: | |||
862 | 859 | ||
863 | if self.state is runQueueRunInit: | 860 | if self.state is runQueueRunInit: |
864 | bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") | 861 | bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") |
865 | self.execute_runqueue_initVars() | 862 | self.rqexe = RunQueueExecuteTasks(self) |
863 | self.state = runQueueRunning | ||
866 | 864 | ||
867 | if self.state is runQueueRunning: | 865 | if self.state is runQueueRunning: |
868 | self.execute_runqueue_internal() | 866 | self.rqexe.execute() |
869 | 867 | ||
870 | if self.state is runQueueCleanUp: | 868 | if self.state is runQueueCleanUp: |
871 | self.finish_runqueue() | 869 | self.rqexe.finish() |
872 | 870 | ||
873 | if self.state is runQueueFailed: | 871 | if self.state is runQueueFailed: |
874 | if not self.rqdata.taskData.tryaltconfigs: | 872 | if not self.rqdata.taskData.tryaltconfigs: |
@@ -879,7 +877,7 @@ class RunQueue: | |||
879 | 877 | ||
880 | if self.state is runQueueComplete: | 878 | if self.state is runQueueComplete: |
881 | # All done | 879 | # All done |
882 | bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) | 880 | bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)) |
883 | return False | 881 | return False |
884 | 882 | ||
885 | if self.state is runQueueChildProcess: | 883 | if self.state is runQueueChildProcess: |
@@ -889,9 +887,23 @@ class RunQueue: | |||
889 | # Loop | 887 | # Loop |
890 | return retval | 888 | return retval |
891 | 889 | ||
892 | def execute_runqueue_initVars(self): | 890 | def finish_runqueue(self, now = False): |
891 | if now: | ||
892 | self.rqexe.finish_now() | ||
893 | else: | ||
894 | self.rqexe.finish() | ||
893 | 895 | ||
894 | self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) | 896 | |
897 | class RunQueueExecute: | ||
898 | |||
899 | def __init__(self, rq): | ||
900 | self.rq = rq | ||
901 | self.cooker = rq.cooker | ||
902 | self.cfgData = rq.cfgData | ||
903 | self.rqdata = rq.rqdata | ||
904 | |||
905 | self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1) | ||
906 | self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed" | ||
895 | 907 | ||
896 | self.runq_buildable = [] | 908 | self.runq_buildable = [] |
897 | self.runq_running = [] | 909 | self.runq_running = [] |
@@ -900,6 +912,120 @@ class RunQueue: | |||
900 | self.build_pipes = {} | 912 | self.build_pipes = {} |
901 | self.failed_fnids = [] | 913 | self.failed_fnids = [] |
902 | 914 | ||
915 | def runqueue_process_waitpid(self): | ||
916 | """ | ||
917 | Return none is there are no processes awaiting result collection, otherwise | ||
918 | collect the process exit codes and close the information pipe. | ||
919 | """ | ||
920 | result = os.waitpid(-1, os.WNOHANG) | ||
921 | if result[0] is 0 and result[1] is 0: | ||
922 | return None | ||
923 | task = self.build_pids[result[0]] | ||
924 | del self.build_pids[result[0]] | ||
925 | self.build_pipes[result[0]].close() | ||
926 | del self.build_pipes[result[0]] | ||
927 | if result[1] != 0: | ||
928 | self.task_fail(task, result[1]) | ||
929 | else: | ||
930 | self.task_complete(task) | ||
931 | self.stats.taskCompleted() | ||
932 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) | ||
933 | |||
934 | def finish_now(self): | ||
935 | if self.stats.active: | ||
936 | bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) | ||
937 | for k, v in self.build_pids.iteritems(): | ||
938 | try: | ||
939 | os.kill(-k, signal.SIGINT) | ||
940 | except: | ||
941 | pass | ||
942 | for pipe in self.build_pipes: | ||
943 | self.build_pipes[pipe].read() | ||
944 | |||
945 | def finish(self): | ||
946 | self.rq.state = runQueueCleanUp | ||
947 | |||
948 | for pipe in self.build_pipes: | ||
949 | self.build_pipes[pipe].read() | ||
950 | |||
951 | try: | ||
952 | while self.stats.active > 0: | ||
953 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | ||
954 | if self.runqueue_process_waitpid() is None: | ||
955 | return | ||
956 | except: | ||
957 | self.finish_now() | ||
958 | raise | ||
959 | |||
960 | if len(self.failed_fnids) != 0: | ||
961 | self.rq.state = runQueueFailed | ||
962 | return | ||
963 | |||
964 | self.rq.state = runQueueComplete | ||
965 | return | ||
966 | |||
967 | def fork_off_task(self, fn, task, taskname): | ||
968 | sys.stdout.flush() | ||
969 | sys.stderr.flush() | ||
970 | try: | ||
971 | pipein, pipeout = os.pipe() | ||
972 | pid = os.fork() | ||
973 | except OSError as e: | ||
974 | bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
975 | if pid == 0: | ||
976 | os.close(pipein) | ||
977 | # Save out the PID so that the event can include it the | ||
978 | # events | ||
979 | bb.event.worker_pid = os.getpid() | ||
980 | bb.event.worker_pipe = pipeout | ||
981 | |||
982 | self.rq.state = runQueueChildProcess | ||
983 | # Make the child the process group leader | ||
984 | os.setpgid(0, 0) | ||
985 | # No stdin | ||
986 | newsi = os.open('/dev/null', os.O_RDWR) | ||
987 | os.dup2(newsi, sys.stdin.fileno()) | ||
988 | # Stdout to a logfile | ||
989 | #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True) | ||
990 | #mkdirhier(os.path.dirname(logout)) | ||
991 | #newso = open(logout, 'w') | ||
992 | #os.dup2(newso.fileno(), sys.stdout.fileno()) | ||
993 | #os.dup2(newso.fileno(), sys.stderr.fileno()) | ||
994 | |||
995 | bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) | ||
996 | bb.msg.note(1, bb.msg.domain.RunQueue, | ||
997 | "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, | ||
998 | self.stats.total, | ||
999 | task, | ||
1000 | self.rqdata.get_user_idstring(task))) | ||
1001 | |||
1002 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) | ||
1003 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) | ||
1004 | try: | ||
1005 | the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) | ||
1006 | |||
1007 | if not self.cooker.configuration.dry_run: | ||
1008 | bb.build.exec_task(taskname, the_data) | ||
1009 | os._exit(0) | ||
1010 | |||
1011 | except bb.build.EventException as e: | ||
1012 | event = e.args[1] | ||
1013 | bb.msg.error(bb.msg.domain.Build, "%s event exception, aborting" % bb.event.getName(event)) | ||
1014 | os._exit(1) | ||
1015 | except Exception: | ||
1016 | from traceback import format_exc | ||
1017 | bb.msg.error(bb.msg.domain.Build, "Build of %s %s failed" % (fn, taskname)) | ||
1018 | bb.msg.error(bb.msg.domain.Build, format_exc()) | ||
1019 | os._exit(1) | ||
1020 | os._exit(0) | ||
1021 | return pid, pipein, pipeout | ||
1022 | |||
1023 | class RunQueueExecuteTasks(RunQueueExecute): | ||
1024 | def __init__(self, rq): | ||
1025 | RunQueueExecute.__init__(self, rq) | ||
1026 | |||
1027 | self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) | ||
1028 | |||
903 | # Mark initial buildable tasks | 1029 | # Mark initial buildable tasks |
904 | for task in range(self.stats.total): | 1030 | for task in range(self.stats.total): |
905 | self.runq_running.append(0) | 1031 | self.runq_running.append(0) |
@@ -909,8 +1035,6 @@ class RunQueue: | |||
909 | else: | 1035 | else: |
910 | self.runq_buildable.append(0) | 1036 | self.runq_buildable.append(0) |
911 | 1037 | ||
912 | self.state = runQueueRunning | ||
913 | |||
914 | event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) | 1038 | event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) |
915 | 1039 | ||
916 | schedulers = [obj for obj in globals().itervalues() | 1040 | schedulers = [obj for obj in globals().itervalues() |
@@ -924,6 +1048,7 @@ class RunQueue: | |||
924 | bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers)) | 1048 | bb.error("Available schedulers: %s" % ", ".join(obj.name for obj in schedulers)) |
925 | self.sched = RunQueueSchedulerSpeed(self, self.rqdata) | 1049 | self.sched = RunQueueSchedulerSpeed(self, self.rqdata) |
926 | 1050 | ||
1051 | |||
927 | def task_complete(self, task): | 1052 | def task_complete(self, task): |
928 | """ | 1053 | """ |
929 | Mark a task as completed | 1054 | Mark a task as completed |
@@ -955,18 +1080,18 @@ class RunQueue: | |||
955 | self.stats.taskFailed() | 1080 | self.stats.taskFailed() |
956 | fnid = self.rqdata.runq_fnid[task] | 1081 | fnid = self.rqdata.runq_fnid[task] |
957 | self.failed_fnids.append(fnid) | 1082 | self.failed_fnids.append(fnid) |
958 | bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) | 1083 | bb.event.fire(runQueueTaskFailed(task, self.stats, self.rq), self.cfgData) |
959 | if self.rqdata.taskData.abort: | 1084 | if self.rqdata.taskData.abort: |
960 | self.state = runQueueCleanUp | 1085 | self.rq.state = runQueueCleanUp |
961 | 1086 | ||
962 | def execute_runqueue_internal(self): | 1087 | def execute(self): |
963 | """ | 1088 | """ |
964 | Run the tasks in a queue prepared by rqdata.prepare() | 1089 | Run the tasks in a queue prepared by rqdata.prepare() |
965 | """ | 1090 | """ |
966 | 1091 | ||
967 | if self.stats.total == 0: | 1092 | if self.stats.total == 0: |
968 | # nothing to do | 1093 | # nothing to do |
969 | self.state = runQueueCleanUp | 1094 | self.rq.state = runQueueCleanUp |
970 | 1095 | ||
971 | while True: | 1096 | while True: |
972 | task = None | 1097 | task = None |
@@ -976,7 +1101,7 @@ class RunQueue: | |||
976 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | 1101 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] |
977 | 1102 | ||
978 | taskname = self.rqdata.runq_task[task] | 1103 | taskname = self.rqdata.runq_task[task] |
979 | if self.check_stamp_task(task, taskname): | 1104 | if self.rq.check_stamp_task(task, taskname): |
980 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task))) | 1105 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.rqdata.get_user_idstring(task))) |
981 | self.runq_running[task] = 1 | 1106 | self.runq_running[task] = 1 |
982 | self.runq_buildable[task] = 1 | 1107 | self.runq_buildable[task] = 1 |
@@ -998,12 +1123,12 @@ class RunQueue: | |||
998 | self.build_pipes[pipe].read() | 1123 | self.build_pipes[pipe].read() |
999 | 1124 | ||
1000 | if self.stats.active > 0: | 1125 | if self.stats.active > 0: |
1001 | if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None: | 1126 | if self.runqueue_process_waitpid() is None: |
1002 | return | 1127 | return |
1003 | continue | 1128 | continue |
1004 | 1129 | ||
1005 | if len(self.failed_fnids) != 0: | 1130 | if len(self.failed_fnids) != 0: |
1006 | self.state = runQueueFailed | 1131 | self.rq.state = runQueueFailed |
1007 | return | 1132 | return |
1008 | 1133 | ||
1009 | # Sanity Checks | 1134 | # Sanity Checks |
@@ -1014,118 +1139,9 @@ class RunQueue: | |||
1014 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) | 1139 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) |
1015 | if self.runq_complete[task] == 0: | 1140 | if self.runq_complete[task] == 0: |
1016 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) | 1141 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) |
1017 | self.state = runQueueComplete | 1142 | self.rq.state = runQueueComplete |
1018 | return | 1143 | return |
1019 | 1144 | ||
1020 | def runqueue_process_waitpid(self, success, failure): | ||
1021 | """ | ||
1022 | Return none is there are no processes awaiting result collection, otherwise | ||
1023 | collect the process exit codes and close the information pipe. | ||
1024 | """ | ||
1025 | result = os.waitpid(-1, os.WNOHANG) | ||
1026 | if result[0] is 0 and result[1] is 0: | ||
1027 | return None | ||
1028 | task = self.build_pids[result[0]] | ||
1029 | del self.build_pids[result[0]] | ||
1030 | self.build_pipes[result[0]].close() | ||
1031 | del self.build_pipes[result[0]] | ||
1032 | if result[1] != 0: | ||
1033 | failure(task, result[1]) | ||
1034 | else: | ||
1035 | success(task) | ||
1036 | self.stats.taskCompleted() | ||
1037 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) | ||
1038 | |||
1039 | def finish_runqueue_now(self): | ||
1040 | if self.stats.active: | ||
1041 | bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) | ||
1042 | for k, v in self.build_pids.iteritems(): | ||
1043 | try: | ||
1044 | os.kill(-k, signal.SIGINT) | ||
1045 | except: | ||
1046 | pass | ||
1047 | for pipe in self.build_pipes: | ||
1048 | self.build_pipes[pipe].read() | ||
1049 | |||
1050 | def finish_runqueue(self, now = False): | ||
1051 | self.state = runQueueCleanUp | ||
1052 | |||
1053 | for pipe in self.build_pipes: | ||
1054 | self.build_pipes[pipe].read() | ||
1055 | |||
1056 | if now: | ||
1057 | self.finish_runqueue_now() | ||
1058 | try: | ||
1059 | while self.stats.active > 0: | ||
1060 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | ||
1061 | if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None: | ||
1062 | return | ||
1063 | except: | ||
1064 | self.finish_runqueue_now() | ||
1065 | raise | ||
1066 | |||
1067 | if len(self.failed_fnids) != 0: | ||
1068 | self.state = runQueueFailed | ||
1069 | return | ||
1070 | |||
1071 | self.state = runQueueComplete | ||
1072 | return | ||
1073 | |||
1074 | def fork_off_task(self, fn, task, taskname): | ||
1075 | sys.stdout.flush() | ||
1076 | sys.stderr.flush() | ||
1077 | try: | ||
1078 | pipein, pipeout = os.pipe() | ||
1079 | pid = os.fork() | ||
1080 | except OSError as e: | ||
1081 | bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
1082 | if pid == 0: | ||
1083 | os.close(pipein) | ||
1084 | # Save out the PID so that the event can include it the | ||
1085 | # events | ||
1086 | bb.event.worker_pid = os.getpid() | ||
1087 | bb.event.worker_pipe = pipeout | ||
1088 | |||
1089 | self.state = runQueueChildProcess | ||
1090 | # Make the child the process group leader | ||
1091 | os.setpgid(0, 0) | ||
1092 | # No stdin | ||
1093 | newsi = os.open('/dev/null', os.O_RDWR) | ||
1094 | os.dup2(newsi, sys.stdin.fileno()) | ||
1095 | # Stdout to a logfile | ||
1096 | #logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True) | ||
1097 | #mkdirhier(os.path.dirname(logout)) | ||
1098 | #newso = open(logout, 'w') | ||
1099 | #os.dup2(newso.fileno(), sys.stdout.fileno()) | ||
1100 | #os.dup2(newso.fileno(), sys.stderr.fileno()) | ||
1101 | |||
1102 | bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) | ||
1103 | bb.msg.note(1, bb.msg.domain.RunQueue, | ||
1104 | "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, | ||
1105 | self.stats.total, | ||
1106 | task, | ||
1107 | self.rqdata.get_user_idstring(task))) | ||
1108 | |||
1109 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) | ||
1110 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data) | ||
1111 | try: | ||
1112 | the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data) | ||
1113 | |||
1114 | if not self.cooker.configuration.dry_run: | ||
1115 | bb.build.exec_task(taskname, the_data) | ||
1116 | os._exit(0) | ||
1117 | |||
1118 | except bb.build.EventException as e: | ||
1119 | event = e.args[1] | ||
1120 | bb.msg.error(bb.msg.domain.Build, "%s event exception, aborting" % bb.event.getName(event)) | ||
1121 | os._exit(1) | ||
1122 | except Exception: | ||
1123 | from traceback import format_exc | ||
1124 | bb.msg.error(bb.msg.domain.Build, "Build of %s %s failed" % (fn, taskname)) | ||
1125 | bb.msg.error(bb.msg.domain.Build, format_exc()) | ||
1126 | os._exit(1) | ||
1127 | os._exit(0) | ||
1128 | return pid, pipein, pipeout | ||
1129 | 1145 | ||
1130 | 1146 | ||
1131 | class TaskFailure(Exception): | 1147 | class TaskFailure(Exception): |