diff options
author | Richard Purdie <rpurdie@linux.intel.com> | 2010-01-20 18:46:02 +0000 |
---|---|---|
committer | Richard Purdie <rpurdie@linux.intel.com> | 2010-01-20 18:46:02 +0000 |
commit | 22c29d8651668195f72e2f6a8e059d625eb511c3 (patch) | |
tree | dd1dd43f0ec47a9964c8a766eb8b3ad75cf51a64 /bitbake/lib/bb/runqueue.py | |
parent | 1bfd6edef9db9c9175058ae801d1b601e4f15263 (diff) | |
download | poky-22c29d8651668195f72e2f6a8e059d625eb511c3.tar.gz |
bitbake: Switch to bitbake-dev version (bitbake master upstream)
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 341 |
1 files changed, 250 insertions, 91 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index cce5da4057..c3ad442e47 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -37,20 +37,38 @@ class RunQueueStats: | |||
37 | """ | 37 | """ |
38 | Holds statistics on the tasks handled by the associated runQueue | 38 | Holds statistics on the tasks handled by the associated runQueue |
39 | """ | 39 | """ |
40 | def __init__(self): | 40 | def __init__(self, total): |
41 | self.completed = 0 | 41 | self.completed = 0 |
42 | self.skipped = 0 | 42 | self.skipped = 0 |
43 | self.failed = 0 | 43 | self.failed = 0 |
44 | self.active = 0 | ||
45 | self.total = total | ||
44 | 46 | ||
45 | def taskFailed(self): | 47 | def taskFailed(self): |
48 | self.active = self.active - 1 | ||
46 | self.failed = self.failed + 1 | 49 | self.failed = self.failed + 1 |
47 | 50 | ||
48 | def taskCompleted(self, number = 1): | 51 | def taskCompleted(self, number = 1): |
52 | self.active = self.active - number | ||
49 | self.completed = self.completed + number | 53 | self.completed = self.completed + number |
50 | 54 | ||
51 | def taskSkipped(self, number = 1): | 55 | def taskSkipped(self, number = 1): |
56 | self.active = self.active + number | ||
52 | self.skipped = self.skipped + number | 57 | self.skipped = self.skipped + number |
53 | 58 | ||
59 | def taskActive(self): | ||
60 | self.active = self.active + 1 | ||
61 | |||
62 | # These values indicate the next step due to be run in the | ||
63 | # runQueue state machine | ||
64 | runQueuePrepare = 2 | ||
65 | runQueueRunInit = 3 | ||
66 | runQueueRunning = 4 | ||
67 | runQueueFailed = 6 | ||
68 | runQueueCleanUp = 7 | ||
69 | runQueueComplete = 8 | ||
70 | runQueueChildProcess = 9 | ||
71 | |||
54 | class RunQueueScheduler: | 72 | class RunQueueScheduler: |
55 | """ | 73 | """ |
56 | Control the order tasks are scheduled in. | 74 | Control the order tasks are scheduled in. |
@@ -142,9 +160,9 @@ class RunQueue: | |||
142 | self.cooker = cooker | 160 | self.cooker = cooker |
143 | self.dataCache = dataCache | 161 | self.dataCache = dataCache |
144 | self.taskData = taskData | 162 | self.taskData = taskData |
163 | self.cfgData = cfgData | ||
145 | self.targets = targets | 164 | self.targets = targets |
146 | 165 | ||
147 | self.cfgdata = cfgData | ||
148 | self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) | 166 | self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) |
149 | self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() | 167 | self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() |
150 | self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" | 168 | self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" |
@@ -152,12 +170,13 @@ class RunQueue: | |||
152 | self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" | 170 | self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" |
153 | 171 | ||
154 | def reset_runqueue(self): | 172 | def reset_runqueue(self): |
155 | |||
156 | self.runq_fnid = [] | 173 | self.runq_fnid = [] |
157 | self.runq_task = [] | 174 | self.runq_task = [] |
158 | self.runq_depends = [] | 175 | self.runq_depends = [] |
159 | self.runq_revdeps = [] | 176 | self.runq_revdeps = [] |
160 | 177 | ||
178 | self.state = runQueuePrepare | ||
179 | |||
161 | def get_user_idstring(self, task): | 180 | def get_user_idstring(self, task): |
162 | fn = self.taskData.fn_index[self.runq_fnid[task]] | 181 | fn = self.taskData.fn_index[self.runq_fnid[task]] |
163 | taskname = self.runq_task[task] | 182 | taskname = self.runq_task[task] |
@@ -653,6 +672,8 @@ class RunQueue: | |||
653 | 672 | ||
654 | #self.dump_data(taskData) | 673 | #self.dump_data(taskData) |
655 | 674 | ||
675 | self.state = runQueueRunInit | ||
676 | |||
656 | def check_stamps(self): | 677 | def check_stamps(self): |
657 | unchecked = {} | 678 | unchecked = {} |
658 | current = [] | 679 | current = [] |
@@ -796,39 +817,51 @@ class RunQueue: | |||
796 | (if the abort on failure configuration option isn't set) | 817 | (if the abort on failure configuration option isn't set) |
797 | """ | 818 | """ |
798 | 819 | ||
799 | failures = 0 | 820 | if self.state is runQueuePrepare: |
800 | while 1: | 821 | self.prepare_runqueue() |
801 | failed_fnids = [] | 822 | |
802 | try: | 823 | if self.state is runQueueRunInit: |
803 | self.execute_runqueue_internal() | 824 | bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") |
804 | finally: | 825 | self.execute_runqueue_initVars() |
805 | if self.master_process: | 826 | |
806 | failed_fnids = self.finish_runqueue() | 827 | if self.state is runQueueRunning: |
807 | if len(failed_fnids) == 0: | 828 | self.execute_runqueue_internal() |
808 | return failures | 829 | |
830 | if self.state is runQueueCleanUp: | ||
831 | self.finish_runqueue() | ||
832 | |||
833 | if self.state is runQueueFailed: | ||
809 | if not self.taskData.tryaltconfigs: | 834 | if not self.taskData.tryaltconfigs: |
810 | raise bb.runqueue.TaskFailure(failed_fnids) | 835 | raise bb.runqueue.TaskFailure(self.failed_fnids) |
811 | for fnid in failed_fnids: | 836 | for fnid in self.failed_fnids: |
812 | #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid], self.runq_task[fnid]) | ||
813 | self.taskData.fail_fnid(fnid) | 837 | self.taskData.fail_fnid(fnid) |
814 | failures = failures + 1 | ||
815 | self.reset_runqueue() | 838 | self.reset_runqueue() |
816 | self.prepare_runqueue() | 839 | |
840 | if self.state is runQueueComplete: | ||
841 | # All done | ||
842 | bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) | ||
843 | return False | ||
844 | |||
845 | if self.state is runQueueChildProcess: | ||
846 | print "Child process" | ||
847 | return False | ||
848 | |||
849 | # Loop | ||
850 | return True | ||
817 | 851 | ||
818 | def execute_runqueue_initVars(self): | 852 | def execute_runqueue_initVars(self): |
819 | 853 | ||
820 | self.stats = RunQueueStats() | 854 | self.stats = RunQueueStats(len(self.runq_fnid)) |
821 | 855 | ||
822 | self.active_builds = 0 | ||
823 | self.runq_buildable = [] | 856 | self.runq_buildable = [] |
824 | self.runq_running = [] | 857 | self.runq_running = [] |
825 | self.runq_complete = [] | 858 | self.runq_complete = [] |
826 | self.build_pids = {} | 859 | self.build_pids = {} |
860 | self.build_pipes = {} | ||
827 | self.failed_fnids = [] | 861 | self.failed_fnids = [] |
828 | self.master_process = True | ||
829 | 862 | ||
830 | # Mark initial buildable tasks | 863 | # Mark initial buildable tasks |
831 | for task in range(len(self.runq_fnid)): | 864 | for task in range(self.stats.total): |
832 | self.runq_running.append(0) | 865 | self.runq_running.append(0) |
833 | self.runq_complete.append(0) | 866 | self.runq_complete.append(0) |
834 | if len(self.runq_depends[task]) == 0: | 867 | if len(self.runq_depends[task]) == 0: |
@@ -836,6 +869,10 @@ class RunQueue: | |||
836 | else: | 869 | else: |
837 | self.runq_buildable.append(0) | 870 | self.runq_buildable.append(0) |
838 | 871 | ||
872 | self.state = runQueueRunning | ||
873 | |||
874 | event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData) | ||
875 | |||
839 | def task_complete(self, task): | 876 | def task_complete(self, task): |
840 | """ | 877 | """ |
841 | Mark a task as completed | 878 | Mark a task as completed |
@@ -858,26 +895,32 @@ class RunQueue: | |||
858 | taskname = self.runq_task[revdep] | 895 | taskname = self.runq_task[revdep] |
859 | bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) | 896 | bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) |
860 | 897 | ||
898 | def task_fail(self, task, exitcode): | ||
899 | """ | ||
900 | Called when a task has failed | ||
901 | Updates the state engine with the failure | ||
902 | """ | ||
903 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode)) | ||
904 | self.stats.taskFailed() | ||
905 | fnid = self.runq_fnid[task] | ||
906 | self.failed_fnids.append(fnid) | ||
907 | bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) | ||
908 | if self.taskData.abort: | ||
909 | self.state = runQueueCleanup | ||
910 | |||
861 | def execute_runqueue_internal(self): | 911 | def execute_runqueue_internal(self): |
862 | """ | 912 | """ |
863 | Run the tasks in a queue prepared by prepare_runqueue | 913 | Run the tasks in a queue prepared by prepare_runqueue |
864 | """ | 914 | """ |
865 | 915 | ||
866 | bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") | 916 | if self.stats.total == 0: |
867 | |||
868 | self.execute_runqueue_initVars() | ||
869 | |||
870 | if len(self.runq_fnid) == 0: | ||
871 | # nothing to do | 917 | # nothing to do |
872 | return [] | 918 | self.state = runQueueCleanup |
873 | |||
874 | def sigint_handler(signum, frame): | ||
875 | raise KeyboardInterrupt | ||
876 | |||
877 | event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata)) | ||
878 | 919 | ||
879 | while True: | 920 | while True: |
880 | task = self.sched.next() | 921 | task = None |
922 | if self.stats.active < self.number_tasks: | ||
923 | task = self.sched.next() | ||
881 | if task is not None: | 924 | if task is not None: |
882 | fn = self.taskData.fn_index[self.runq_fnid[task]] | 925 | fn = self.taskData.fn_index[self.runq_fnid[task]] |
883 | 926 | ||
@@ -885,107 +928,143 @@ class RunQueue: | |||
885 | if self.check_stamp_task(task): | 928 | if self.check_stamp_task(task): |
886 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task))) | 929 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task))) |
887 | self.runq_running[task] = 1 | 930 | self.runq_running[task] = 1 |
931 | self.runq_buildable[task] = 1 | ||
888 | self.task_complete(task) | 932 | self.task_complete(task) |
889 | self.stats.taskCompleted() | 933 | self.stats.taskCompleted() |
890 | self.stats.taskSkipped() | 934 | self.stats.taskSkipped() |
891 | continue | 935 | continue |
892 | 936 | ||
893 | bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task))) | ||
894 | sys.stdout.flush() | 937 | sys.stdout.flush() |
895 | sys.stderr.flush() | 938 | sys.stderr.flush() |
896 | try: | 939 | try: |
940 | pipein, pipeout = os.pipe() | ||
897 | pid = os.fork() | 941 | pid = os.fork() |
898 | except OSError, e: | 942 | except OSError, e: |
899 | bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) | 943 | bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) |
900 | if pid == 0: | 944 | if pid == 0: |
901 | # Bypass master process' handling | 945 | os.close(pipein) |
902 | self.master_process = False | 946 | # Save out the PID so that the event can include it the |
903 | # Stop Ctrl+C being sent to children | 947 | # events |
904 | # signal.signal(signal.SIGINT, signal.SIG_IGN) | 948 | bb.event.worker_pid = os.getpid() |
949 | bb.event.worker_pipe = pipeout | ||
950 | |||
951 | self.state = runQueueChildProcess | ||
905 | # Make the child the process group leader | 952 | # Make the child the process group leader |
906 | os.setpgid(0, 0) | 953 | os.setpgid(0, 0) |
954 | # No stdin | ||
907 | newsi = os.open('/dev/null', os.O_RDWR) | 955 | newsi = os.open('/dev/null', os.O_RDWR) |
908 | os.dup2(newsi, sys.stdin.fileno()) | 956 | os.dup2(newsi, sys.stdin.fileno()) |
909 | self.cooker.configuration.cmd = taskname[3:] | 957 | |
958 | bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) | ||
959 | bb.msg.note(1, bb.msg.domain.RunQueue, | ||
960 | "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, | ||
961 | self.stats.total, | ||
962 | task, | ||
963 | self.get_user_idstring(task))) | ||
964 | |||
910 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) | 965 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) |
911 | try: | 966 | try: |
912 | self.cooker.tryBuild(fn) | 967 | self.cooker.tryBuild(fn, taskname[3:]) |
913 | except bb.build.EventException: | 968 | except bb.build.EventException: |
914 | bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") | 969 | bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") |
915 | sys.exit(1) | 970 | os._exit(1) |
916 | except: | 971 | except: |
917 | bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") | 972 | bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") |
918 | raise | 973 | os._exit(1) |
919 | sys.exit(0) | 974 | os._exit(0) |
975 | |||
920 | self.build_pids[pid] = task | 976 | self.build_pids[pid] = task |
977 | self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) | ||
921 | self.runq_running[task] = 1 | 978 | self.runq_running[task] = 1 |
922 | self.active_builds = self.active_builds + 1 | 979 | self.stats.taskActive() |
923 | if self.active_builds < self.number_tasks: | 980 | if self.stats.active < self.number_tasks: |
924 | continue | 981 | continue |
925 | if self.active_builds > 0: | 982 | |
926 | result = os.waitpid(-1, 0) | 983 | for pipe in self.build_pipes: |
927 | self.active_builds = self.active_builds - 1 | 984 | self.build_pipes[pipe].read() |
985 | |||
986 | if self.stats.active > 0: | ||
987 | result = os.waitpid(-1, os.WNOHANG) | ||
988 | if result[0] is 0 and result[1] is 0: | ||
989 | return | ||
928 | task = self.build_pids[result[0]] | 990 | task = self.build_pids[result[0]] |
991 | del self.build_pids[result[0]] | ||
992 | self.build_pipes[result[0]].close() | ||
993 | del self.build_pipes[result[0]] | ||
929 | if result[1] != 0: | 994 | if result[1] != 0: |
930 | del self.build_pids[result[0]] | 995 | self.task_fail(task, result[1]) |
931 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) | 996 | return |
932 | self.failed_fnids.append(self.runq_fnid[task]) | ||
933 | self.stats.taskFailed() | ||
934 | if not self.taskData.abort: | ||
935 | continue | ||
936 | break | ||
937 | self.task_complete(task) | 997 | self.task_complete(task) |
938 | self.stats.taskCompleted() | 998 | self.stats.taskCompleted() |
939 | del self.build_pids[result[0]] | 999 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) |
940 | continue | 1000 | continue |
1001 | |||
1002 | if len(self.failed_fnids) != 0: | ||
1003 | self.state = runQueueFailed | ||
1004 | return | ||
1005 | |||
1006 | # Sanity Checks | ||
1007 | for task in range(self.stats.total): | ||
1008 | if self.runq_buildable[task] == 0: | ||
1009 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) | ||
1010 | if self.runq_running[task] == 0: | ||
1011 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) | ||
1012 | if self.runq_complete[task] == 0: | ||
1013 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) | ||
1014 | self.state = runQueueComplete | ||
941 | return | 1015 | return |
942 | 1016 | ||
943 | def finish_runqueue(self): | 1017 | def finish_runqueue_now(self): |
1018 | bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) | ||
1019 | for k, v in self.build_pids.iteritems(): | ||
1020 | try: | ||
1021 | os.kill(-k, signal.SIGINT) | ||
1022 | except: | ||
1023 | pass | ||
1024 | for pipe in self.build_pipes: | ||
1025 | self.build_pipes[pipe].read() | ||
1026 | |||
1027 | def finish_runqueue(self, now = False): | ||
1028 | self.state = runQueueCleanUp | ||
1029 | if now: | ||
1030 | self.finish_runqueue_now() | ||
944 | try: | 1031 | try: |
945 | while self.active_builds > 0: | 1032 | while self.stats.active > 0: |
946 | bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds) | 1033 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) |
1034 | bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active) | ||
947 | tasknum = 1 | 1035 | tasknum = 1 |
948 | for k, v in self.build_pids.iteritems(): | 1036 | for k, v in self.build_pids.iteritems(): |
949 | bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) | 1037 | bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) |
950 | tasknum = tasknum + 1 | 1038 | tasknum = tasknum + 1 |
951 | result = os.waitpid(-1, 0) | 1039 | result = os.waitpid(-1, os.WNOHANG) |
1040 | if result[0] is 0 and result[1] is 0: | ||
1041 | return | ||
952 | task = self.build_pids[result[0]] | 1042 | task = self.build_pids[result[0]] |
953 | if result[1] != 0: | ||
954 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) | ||
955 | self.failed_fnids.append(self.runq_fnid[task]) | ||
956 | self.stats.taskFailed() | ||
957 | del self.build_pids[result[0]] | 1043 | del self.build_pids[result[0]] |
958 | self.active_builds = self.active_builds - 1 | 1044 | self.build_pipes[result[0]].close() |
959 | bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) | 1045 | del self.build_pipes[result[0]] |
960 | return self.failed_fnids | 1046 | if result[1] != 0: |
961 | except KeyboardInterrupt: | 1047 | self.task_fail(task, result[1]) |
962 | bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds) | 1048 | else: |
963 | for k, v in self.build_pids.iteritems(): | 1049 | self.stats.taskCompleted() |
964 | try: | 1050 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) |
965 | os.kill(-k, signal.SIGINT) | 1051 | except: |
966 | except: | 1052 | self.finish_runqueue_now() |
967 | pass | ||
968 | raise | 1053 | raise |
969 | 1054 | ||
970 | # Sanity Checks | 1055 | if len(self.failed_fnids) != 0: |
971 | for task in range(len(self.runq_fnid)): | 1056 | self.state = runQueueFailed |
972 | if self.runq_buildable[task] == 0: | 1057 | return |
973 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) | ||
974 | if self.runq_running[task] == 0: | ||
975 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) | ||
976 | if self.runq_complete[task] == 0: | ||
977 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) | ||
978 | |||
979 | bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) | ||
980 | 1058 | ||
981 | return self.failed_fnids | 1059 | self.state = runQueueComplete |
1060 | return | ||
982 | 1061 | ||
983 | def dump_data(self, taskQueue): | 1062 | def dump_data(self, taskQueue): |
984 | """ | 1063 | """ |
985 | Dump some debug information on the internal data structures | 1064 | Dump some debug information on the internal data structures |
986 | """ | 1065 | """ |
987 | bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") | 1066 | bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") |
988 | for task in range(len(self.runq_fnid)): | 1067 | for task in range(len(self.runq_task)): |
989 | bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, | 1068 | bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, |
990 | taskQueue.fn_index[self.runq_fnid[task]], | 1069 | taskQueue.fn_index[self.runq_fnid[task]], |
991 | self.runq_task[task], | 1070 | self.runq_task[task], |
@@ -994,7 +1073,7 @@ class RunQueue: | |||
994 | self.runq_revdeps[task])) | 1073 | self.runq_revdeps[task])) |
995 | 1074 | ||
996 | bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") | 1075 | bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") |
997 | for task1 in range(len(self.runq_fnid)): | 1076 | for task1 in range(len(self.runq_task)): |
998 | if task1 in self.prio_map: | 1077 | if task1 in self.prio_map: |
999 | task = self.prio_map[task1] | 1078 | task = self.prio_map[task1] |
1000 | bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, | 1079 | bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, |
@@ -1005,6 +1084,58 @@ class RunQueue: | |||
1005 | self.runq_revdeps[task])) | 1084 | self.runq_revdeps[task])) |
1006 | 1085 | ||
1007 | 1086 | ||
1087 | class TaskFailure(Exception): | ||
1088 | """ | ||
1089 | Exception raised when a task in a runqueue fails | ||
1090 | """ | ||
1091 | def __init__(self, x): | ||
1092 | self.args = x | ||
1093 | |||
1094 | |||
1095 | class runQueueExitWait(bb.event.Event): | ||
1096 | """ | ||
1097 | Event when waiting for task processes to exit | ||
1098 | """ | ||
1099 | |||
1100 | def __init__(self, remain): | ||
1101 | self.remain = remain | ||
1102 | self.message = "Waiting for %s active tasks to finish" % remain | ||
1103 | bb.event.Event.__init__(self) | ||
1104 | |||
1105 | class runQueueEvent(bb.event.Event): | ||
1106 | """ | ||
1107 | Base runQueue event class | ||
1108 | """ | ||
1109 | def __init__(self, task, stats, rq): | ||
1110 | self.taskid = task | ||
1111 | self.taskstring = rq.get_user_idstring(task) | ||
1112 | self.stats = stats | ||
1113 | bb.event.Event.__init__(self) | ||
1114 | |||
1115 | class runQueueTaskStarted(runQueueEvent): | ||
1116 | """ | ||
1117 | Event notifing a task was started | ||
1118 | """ | ||
1119 | def __init__(self, task, stats, rq): | ||
1120 | runQueueEvent.__init__(self, task, stats, rq) | ||
1121 | self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) | ||
1122 | |||
1123 | class runQueueTaskFailed(runQueueEvent): | ||
1124 | """ | ||
1125 | Event notifing a task failed | ||
1126 | """ | ||
1127 | def __init__(self, task, stats, rq): | ||
1128 | runQueueEvent.__init__(self, task, stats, rq) | ||
1129 | self.message = "Task %s failed (%s)" % (task, self.taskstring) | ||
1130 | |||
1131 | class runQueueTaskCompleted(runQueueEvent): | ||
1132 | """ | ||
1133 | Event notifing a task completed | ||
1134 | """ | ||
1135 | def __init__(self, task, stats, rq): | ||
1136 | runQueueEvent.__init__(self, task, stats, rq) | ||
1137 | self.message = "Task %s completed (%s)" % (task, self.taskstring) | ||
1138 | |||
1008 | def check_stamp_fn(fn, taskname, d): | 1139 | def check_stamp_fn(fn, taskname, d): |
1009 | rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) | 1140 | rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) |
1010 | fnid = rq.taskData.getfn_id(fn) | 1141 | fnid = rq.taskData.getfn_id(fn) |
@@ -1013,3 +1144,31 @@ def check_stamp_fn(fn, taskname, d): | |||
1013 | return rq.check_stamp_task(taskid) | 1144 | return rq.check_stamp_task(taskid) |
1014 | return None | 1145 | return None |
1015 | 1146 | ||
1147 | class runQueuePipe(): | ||
1148 | """ | ||
1149 | Abstraction for a pipe between a worker thread and the server | ||
1150 | """ | ||
1151 | def __init__(self, pipein, pipeout, d): | ||
1152 | self.fd = pipein | ||
1153 | os.close(pipeout) | ||
1154 | self.queue = "" | ||
1155 | self.d = d | ||
1156 | |||
1157 | def read(self): | ||
1158 | start = len(self.queue) | ||
1159 | self.queue = self.queue + os.read(self.fd, 1024) | ||
1160 | end = len(self.queue) | ||
1161 | index = self.queue.find("</event>") | ||
1162 | while index != -1: | ||
1163 | bb.event.fire_from_worker(self.queue[:index+8], self.d) | ||
1164 | self.queue = self.queue[index+8:] | ||
1165 | index = self.queue.find("</event>") | ||
1166 | return (end > start) | ||
1167 | |||
1168 | def close(self): | ||
1169 | while self.read(): | ||
1170 | continue | ||
1171 | if len(self.queue) > 0: | ||
1172 | print "Warning, worker left partial message" | ||
1173 | os.close(self.fd) | ||
1174 | |||