summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py341
1 files changed, 250 insertions, 91 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index cce5da4057..c3ad442e47 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -37,20 +37,38 @@ class RunQueueStats:
37 """ 37 """
38 Holds statistics on the tasks handled by the associated runQueue 38 Holds statistics on the tasks handled by the associated runQueue
39 """ 39 """
40 def __init__(self): 40 def __init__(self, total):
41 self.completed = 0 41 self.completed = 0
42 self.skipped = 0 42 self.skipped = 0
43 self.failed = 0 43 self.failed = 0
44 self.active = 0
45 self.total = total
44 46
45 def taskFailed(self): 47 def taskFailed(self):
48 self.active = self.active - 1
46 self.failed = self.failed + 1 49 self.failed = self.failed + 1
47 50
48 def taskCompleted(self, number = 1): 51 def taskCompleted(self, number = 1):
52 self.active = self.active - number
49 self.completed = self.completed + number 53 self.completed = self.completed + number
50 54
51 def taskSkipped(self, number = 1): 55 def taskSkipped(self, number = 1):
56 self.active = self.active + number
52 self.skipped = self.skipped + number 57 self.skipped = self.skipped + number
53 58
59 def taskActive(self):
60 self.active = self.active + 1
61
62# These values indicate the next step due to be run in the
63# runQueue state machine
64runQueuePrepare = 2
65runQueueRunInit = 3
66runQueueRunning = 4
67runQueueFailed = 6
68runQueueCleanUp = 7
69runQueueComplete = 8
70runQueueChildProcess = 9
71
54class RunQueueScheduler: 72class RunQueueScheduler:
55 """ 73 """
56 Control the order tasks are scheduled in. 74 Control the order tasks are scheduled in.
@@ -142,9 +160,9 @@ class RunQueue:
142 self.cooker = cooker 160 self.cooker = cooker
143 self.dataCache = dataCache 161 self.dataCache = dataCache
144 self.taskData = taskData 162 self.taskData = taskData
163 self.cfgData = cfgData
145 self.targets = targets 164 self.targets = targets
146 165
147 self.cfgdata = cfgData
148 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) 166 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
149 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() 167 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
150 self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" 168 self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
@@ -152,12 +170,13 @@ class RunQueue:
152 self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" 170 self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
153 171
154 def reset_runqueue(self): 172 def reset_runqueue(self):
155
156 self.runq_fnid = [] 173 self.runq_fnid = []
157 self.runq_task = [] 174 self.runq_task = []
158 self.runq_depends = [] 175 self.runq_depends = []
159 self.runq_revdeps = [] 176 self.runq_revdeps = []
160 177
178 self.state = runQueuePrepare
179
161 def get_user_idstring(self, task): 180 def get_user_idstring(self, task):
162 fn = self.taskData.fn_index[self.runq_fnid[task]] 181 fn = self.taskData.fn_index[self.runq_fnid[task]]
163 taskname = self.runq_task[task] 182 taskname = self.runq_task[task]
@@ -653,6 +672,8 @@ class RunQueue:
653 672
654 #self.dump_data(taskData) 673 #self.dump_data(taskData)
655 674
675 self.state = runQueueRunInit
676
656 def check_stamps(self): 677 def check_stamps(self):
657 unchecked = {} 678 unchecked = {}
658 current = [] 679 current = []
@@ -796,39 +817,51 @@ class RunQueue:
796 (if the abort on failure configuration option isn't set) 817 (if the abort on failure configuration option isn't set)
797 """ 818 """
798 819
799 failures = 0 820 if self.state is runQueuePrepare:
800 while 1: 821 self.prepare_runqueue()
801 failed_fnids = [] 822
802 try: 823 if self.state is runQueueRunInit:
803 self.execute_runqueue_internal() 824 bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
804 finally: 825 self.execute_runqueue_initVars()
805 if self.master_process: 826
806 failed_fnids = self.finish_runqueue() 827 if self.state is runQueueRunning:
807 if len(failed_fnids) == 0: 828 self.execute_runqueue_internal()
808 return failures 829
830 if self.state is runQueueCleanUp:
831 self.finish_runqueue()
832
833 if self.state is runQueueFailed:
809 if not self.taskData.tryaltconfigs: 834 if not self.taskData.tryaltconfigs:
810 raise bb.runqueue.TaskFailure(failed_fnids) 835 raise bb.runqueue.TaskFailure(self.failed_fnids)
811 for fnid in failed_fnids: 836 for fnid in self.failed_fnids:
812 #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid], self.runq_task[fnid])
813 self.taskData.fail_fnid(fnid) 837 self.taskData.fail_fnid(fnid)
814 failures = failures + 1
815 self.reset_runqueue() 838 self.reset_runqueue()
816 self.prepare_runqueue() 839
840 if self.state is runQueueComplete:
841 # All done
842 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
843 return False
844
845 if self.state is runQueueChildProcess:
846 print "Child process"
847 return False
848
849 # Loop
850 return True
817 851
818 def execute_runqueue_initVars(self): 852 def execute_runqueue_initVars(self):
819 853
820 self.stats = RunQueueStats() 854 self.stats = RunQueueStats(len(self.runq_fnid))
821 855
822 self.active_builds = 0
823 self.runq_buildable = [] 856 self.runq_buildable = []
824 self.runq_running = [] 857 self.runq_running = []
825 self.runq_complete = [] 858 self.runq_complete = []
826 self.build_pids = {} 859 self.build_pids = {}
860 self.build_pipes = {}
827 self.failed_fnids = [] 861 self.failed_fnids = []
828 self.master_process = True
829 862
830 # Mark initial buildable tasks 863 # Mark initial buildable tasks
831 for task in range(len(self.runq_fnid)): 864 for task in range(self.stats.total):
832 self.runq_running.append(0) 865 self.runq_running.append(0)
833 self.runq_complete.append(0) 866 self.runq_complete.append(0)
834 if len(self.runq_depends[task]) == 0: 867 if len(self.runq_depends[task]) == 0:
@@ -836,6 +869,10 @@ class RunQueue:
836 else: 869 else:
837 self.runq_buildable.append(0) 870 self.runq_buildable.append(0)
838 871
872 self.state = runQueueRunning
873
874 event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData)
875
839 def task_complete(self, task): 876 def task_complete(self, task):
840 """ 877 """
841 Mark a task as completed 878 Mark a task as completed
@@ -858,26 +895,32 @@ class RunQueue:
858 taskname = self.runq_task[revdep] 895 taskname = self.runq_task[revdep]
859 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) 896 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
860 897
898 def task_fail(self, task, exitcode):
899 """
900 Called when a task has failed
901 Updates the state engine with the failure
902 """
903 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode))
904 self.stats.taskFailed()
905 fnid = self.runq_fnid[task]
906 self.failed_fnids.append(fnid)
907 bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
908 if self.taskData.abort:
909 self.state = runQueueCleanup
910
861 def execute_runqueue_internal(self): 911 def execute_runqueue_internal(self):
862 """ 912 """
863 Run the tasks in a queue prepared by prepare_runqueue 913 Run the tasks in a queue prepared by prepare_runqueue
864 """ 914 """
865 915
866 bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") 916 if self.stats.total == 0:
867
868 self.execute_runqueue_initVars()
869
870 if len(self.runq_fnid) == 0:
871 # nothing to do 917 # nothing to do
872 return [] 918 self.state = runQueueCleanup
873
874 def sigint_handler(signum, frame):
875 raise KeyboardInterrupt
876
877 event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgdata))
878 919
879 while True: 920 while True:
880 task = self.sched.next() 921 task = None
922 if self.stats.active < self.number_tasks:
923 task = self.sched.next()
881 if task is not None: 924 if task is not None:
882 fn = self.taskData.fn_index[self.runq_fnid[task]] 925 fn = self.taskData.fn_index[self.runq_fnid[task]]
883 926
@@ -885,107 +928,143 @@ class RunQueue:
885 if self.check_stamp_task(task): 928 if self.check_stamp_task(task):
886 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task))) 929 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
887 self.runq_running[task] = 1 930 self.runq_running[task] = 1
931 self.runq_buildable[task] = 1
888 self.task_complete(task) 932 self.task_complete(task)
889 self.stats.taskCompleted() 933 self.stats.taskCompleted()
890 self.stats.taskSkipped() 934 self.stats.taskSkipped()
891 continue 935 continue
892 936
893 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
894 sys.stdout.flush() 937 sys.stdout.flush()
895 sys.stderr.flush() 938 sys.stderr.flush()
896 try: 939 try:
940 pipein, pipeout = os.pipe()
897 pid = os.fork() 941 pid = os.fork()
898 except OSError, e: 942 except OSError, e:
899 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) 943 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
900 if pid == 0: 944 if pid == 0:
901 # Bypass master process' handling 945 os.close(pipein)
902 self.master_process = False 946 # Save out the PID so that the event can include it the
903 # Stop Ctrl+C being sent to children 947 # events
904 # signal.signal(signal.SIGINT, signal.SIG_IGN) 948 bb.event.worker_pid = os.getpid()
949 bb.event.worker_pipe = pipeout
950
951 self.state = runQueueChildProcess
905 # Make the child the process group leader 952 # Make the child the process group leader
906 os.setpgid(0, 0) 953 os.setpgid(0, 0)
954 # No stdin
907 newsi = os.open('/dev/null', os.O_RDWR) 955 newsi = os.open('/dev/null', os.O_RDWR)
908 os.dup2(newsi, sys.stdin.fileno()) 956 os.dup2(newsi, sys.stdin.fileno())
909 self.cooker.configuration.cmd = taskname[3:] 957
958 bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
959 bb.msg.note(1, bb.msg.domain.RunQueue,
960 "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1,
961 self.stats.total,
962 task,
963 self.get_user_idstring(task)))
964
910 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) 965 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
911 try: 966 try:
912 self.cooker.tryBuild(fn) 967 self.cooker.tryBuild(fn, taskname[3:])
913 except bb.build.EventException: 968 except bb.build.EventException:
914 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") 969 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
915 sys.exit(1) 970 os._exit(1)
916 except: 971 except:
917 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") 972 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
918 raise 973 os._exit(1)
919 sys.exit(0) 974 os._exit(0)
975
920 self.build_pids[pid] = task 976 self.build_pids[pid] = task
977 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
921 self.runq_running[task] = 1 978 self.runq_running[task] = 1
922 self.active_builds = self.active_builds + 1 979 self.stats.taskActive()
923 if self.active_builds < self.number_tasks: 980 if self.stats.active < self.number_tasks:
924 continue 981 continue
925 if self.active_builds > 0: 982
926 result = os.waitpid(-1, 0) 983 for pipe in self.build_pipes:
927 self.active_builds = self.active_builds - 1 984 self.build_pipes[pipe].read()
985
986 if self.stats.active > 0:
987 result = os.waitpid(-1, os.WNOHANG)
988 if result[0] is 0 and result[1] is 0:
989 return
928 task = self.build_pids[result[0]] 990 task = self.build_pids[result[0]]
991 del self.build_pids[result[0]]
992 self.build_pipes[result[0]].close()
993 del self.build_pipes[result[0]]
929 if result[1] != 0: 994 if result[1] != 0:
930 del self.build_pids[result[0]] 995 self.task_fail(task, result[1])
931 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task))) 996 return
932 self.failed_fnids.append(self.runq_fnid[task])
933 self.stats.taskFailed()
934 if not self.taskData.abort:
935 continue
936 break
937 self.task_complete(task) 997 self.task_complete(task)
938 self.stats.taskCompleted() 998 self.stats.taskCompleted()
939 del self.build_pids[result[0]] 999 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
940 continue 1000 continue
1001
1002 if len(self.failed_fnids) != 0:
1003 self.state = runQueueFailed
1004 return
1005
1006 # Sanity Checks
1007 for task in range(self.stats.total):
1008 if self.runq_buildable[task] == 0:
1009 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
1010 if self.runq_running[task] == 0:
1011 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
1012 if self.runq_complete[task] == 0:
1013 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
1014 self.state = runQueueComplete
941 return 1015 return
942 1016
943 def finish_runqueue(self): 1017 def finish_runqueue_now(self):
1018 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active)
1019 for k, v in self.build_pids.iteritems():
1020 try:
1021 os.kill(-k, signal.SIGINT)
1022 except:
1023 pass
1024 for pipe in self.build_pipes:
1025 self.build_pipes[pipe].read()
1026
1027 def finish_runqueue(self, now = False):
1028 self.state = runQueueCleanUp
1029 if now:
1030 self.finish_runqueue_now()
944 try: 1031 try:
945 while self.active_builds > 0: 1032 while self.stats.active > 0:
946 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds) 1033 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1034 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active)
947 tasknum = 1 1035 tasknum = 1
948 for k, v in self.build_pids.iteritems(): 1036 for k, v in self.build_pids.iteritems():
949 bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) 1037 bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
950 tasknum = tasknum + 1 1038 tasknum = tasknum + 1
951 result = os.waitpid(-1, 0) 1039 result = os.waitpid(-1, os.WNOHANG)
1040 if result[0] is 0 and result[1] is 0:
1041 return
952 task = self.build_pids[result[0]] 1042 task = self.build_pids[result[0]]
953 if result[1] != 0:
954 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
955 self.failed_fnids.append(self.runq_fnid[task])
956 self.stats.taskFailed()
957 del self.build_pids[result[0]] 1043 del self.build_pids[result[0]]
958 self.active_builds = self.active_builds - 1 1044 self.build_pipes[result[0]].close()
959 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) 1045 del self.build_pipes[result[0]]
960 return self.failed_fnids 1046 if result[1] != 0:
961 except KeyboardInterrupt: 1047 self.task_fail(task, result[1])
962 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds) 1048 else:
963 for k, v in self.build_pids.iteritems(): 1049 self.stats.taskCompleted()
964 try: 1050 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
965 os.kill(-k, signal.SIGINT) 1051 except:
966 except: 1052 self.finish_runqueue_now()
967 pass
968 raise 1053 raise
969 1054
970 # Sanity Checks 1055 if len(self.failed_fnids) != 0:
971 for task in range(len(self.runq_fnid)): 1056 self.state = runQueueFailed
972 if self.runq_buildable[task] == 0: 1057 return
973 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
974 if self.runq_running[task] == 0:
975 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
976 if self.runq_complete[task] == 0:
977 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
978
979 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
980 1058
981 return self.failed_fnids 1059 self.state = runQueueComplete
1060 return
982 1061
983 def dump_data(self, taskQueue): 1062 def dump_data(self, taskQueue):
984 """ 1063 """
985 Dump some debug information on the internal data structures 1064 Dump some debug information on the internal data structures
986 """ 1065 """
987 bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") 1066 bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
988 for task in range(len(self.runq_fnid)): 1067 for task in range(len(self.runq_task)):
989 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, 1068 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
990 taskQueue.fn_index[self.runq_fnid[task]], 1069 taskQueue.fn_index[self.runq_fnid[task]],
991 self.runq_task[task], 1070 self.runq_task[task],
@@ -994,7 +1073,7 @@ class RunQueue:
994 self.runq_revdeps[task])) 1073 self.runq_revdeps[task]))
995 1074
996 bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") 1075 bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
997 for task1 in range(len(self.runq_fnid)): 1076 for task1 in range(len(self.runq_task)):
998 if task1 in self.prio_map: 1077 if task1 in self.prio_map:
999 task = self.prio_map[task1] 1078 task = self.prio_map[task1]
1000 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, 1079 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
@@ -1005,6 +1084,58 @@ class RunQueue:
1005 self.runq_revdeps[task])) 1084 self.runq_revdeps[task]))
1006 1085
1007 1086
1087class TaskFailure(Exception):
1088 """
1089 Exception raised when a task in a runqueue fails
1090 """
1091 def __init__(self, x):
1092 self.args = x
1093
1094
1095class runQueueExitWait(bb.event.Event):
1096 """
1097 Event when waiting for task processes to exit
1098 """
1099
1100 def __init__(self, remain):
1101 self.remain = remain
1102 self.message = "Waiting for %s active tasks to finish" % remain
1103 bb.event.Event.__init__(self)
1104
1105class runQueueEvent(bb.event.Event):
1106 """
1107 Base runQueue event class
1108 """
1109 def __init__(self, task, stats, rq):
1110 self.taskid = task
1111 self.taskstring = rq.get_user_idstring(task)
1112 self.stats = stats
1113 bb.event.Event.__init__(self)
1114
1115class runQueueTaskStarted(runQueueEvent):
1116 """
1117 Event notifing a task was started
1118 """
1119 def __init__(self, task, stats, rq):
1120 runQueueEvent.__init__(self, task, stats, rq)
1121 self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring)
1122
1123class runQueueTaskFailed(runQueueEvent):
1124 """
1125 Event notifing a task failed
1126 """
1127 def __init__(self, task, stats, rq):
1128 runQueueEvent.__init__(self, task, stats, rq)
1129 self.message = "Task %s failed (%s)" % (task, self.taskstring)
1130
1131class runQueueTaskCompleted(runQueueEvent):
1132 """
1133 Event notifing a task completed
1134 """
1135 def __init__(self, task, stats, rq):
1136 runQueueEvent.__init__(self, task, stats, rq)
1137 self.message = "Task %s completed (%s)" % (task, self.taskstring)
1138
1008def check_stamp_fn(fn, taskname, d): 1139def check_stamp_fn(fn, taskname, d):
1009 rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) 1140 rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1010 fnid = rq.taskData.getfn_id(fn) 1141 fnid = rq.taskData.getfn_id(fn)
@@ -1013,3 +1144,31 @@ def check_stamp_fn(fn, taskname, d):
1013 return rq.check_stamp_task(taskid) 1144 return rq.check_stamp_task(taskid)
1014 return None 1145 return None
1015 1146
1147class runQueuePipe():
1148 """
1149 Abstraction for a pipe between a worker thread and the server
1150 """
1151 def __init__(self, pipein, pipeout, d):
1152 self.fd = pipein
1153 os.close(pipeout)
1154 self.queue = ""
1155 self.d = d
1156
1157 def read(self):
1158 start = len(self.queue)
1159 self.queue = self.queue + os.read(self.fd, 1024)
1160 end = len(self.queue)
1161 index = self.queue.find("</event>")
1162 while index != -1:
1163 bb.event.fire_from_worker(self.queue[:index+8], self.d)
1164 self.queue = self.queue[index+8:]
1165 index = self.queue.find("</event>")
1166 return (end > start)
1167
1168 def close(self):
1169 while self.read():
1170 continue
1171 if len(self.queue) > 0:
1172 print "Warning, worker left partial message"
1173 os.close(self.fd)
1174