summaryrefslogtreecommitdiffstats
path: root/bitbake-dev/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake-dev/lib/bb/runqueue.py')
-rw-r--r--bitbake-dev/lib/bb/runqueue.py98
1 files changed, 73 insertions, 25 deletions
diff --git a/bitbake-dev/lib/bb/runqueue.py b/bitbake-dev/lib/bb/runqueue.py
index 8b6e12d185..c3ad442e47 100644
--- a/bitbake-dev/lib/bb/runqueue.py
+++ b/bitbake-dev/lib/bb/runqueue.py
@@ -857,6 +857,7 @@ class RunQueue:
857 self.runq_running = [] 857 self.runq_running = []
858 self.runq_complete = [] 858 self.runq_complete = []
859 self.build_pids = {} 859 self.build_pids = {}
860 self.build_pipes = {}
860 self.failed_fnids = [] 861 self.failed_fnids = []
861 862
862 # Mark initial buildable tasks 863 # Mark initial buildable tasks
@@ -870,7 +871,7 @@ class RunQueue:
870 871
871 self.state = runQueueRunning 872 self.state = runQueueRunning
872 873
873 event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgData)) 874 event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData)
874 875
875 def task_complete(self, task): 876 def task_complete(self, task):
876 """ 877 """
@@ -903,7 +904,7 @@ class RunQueue:
903 self.stats.taskFailed() 904 self.stats.taskFailed()
904 fnid = self.runq_fnid[task] 905 fnid = self.runq_fnid[task]
905 self.failed_fnids.append(fnid) 906 self.failed_fnids.append(fnid)
906 bb.event.fire(runQueueTaskFailed(task, self.stats, self, self.cfgData)) 907 bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
907 if self.taskData.abort: 908 if self.taskData.abort:
908 self.state = runQueueCleanup 909 self.state = runQueueCleanup
909 910
@@ -935,53 +936,67 @@ class RunQueue:
935 936
936 sys.stdout.flush() 937 sys.stdout.flush()
937 sys.stderr.flush() 938 sys.stderr.flush()
938 try: 939 try:
940 pipein, pipeout = os.pipe()
939 pid = os.fork() 941 pid = os.fork()
940 except OSError, e: 942 except OSError, e:
941 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) 943 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
942 if pid == 0: 944 if pid == 0:
945 os.close(pipein)
943 # Save out the PID so that the event can include it the 946 # Save out the PID so that the event can include it the
944 # events 947 # events
945 bb.event.worker_pid = os.getpid() 948 bb.event.worker_pid = os.getpid()
949 bb.event.worker_pipe = pipeout
946 950
947 bb.event.fire(runQueueTaskStarted(task, self.stats, self, self.cfgData))
948 bb.msg.note(1, bb.msg.domain.RunQueue,
949 "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1,
950 self.stats.total,
951 task,
952 self.get_user_idstring(task)))
953 self.state = runQueueChildProcess 951 self.state = runQueueChildProcess
954 # Make the child the process group leader 952 # Make the child the process group leader
955 os.setpgid(0, 0) 953 os.setpgid(0, 0)
954 # No stdin
956 newsi = os.open('/dev/null', os.O_RDWR) 955 newsi = os.open('/dev/null', os.O_RDWR)
957 os.dup2(newsi, sys.stdin.fileno()) 956 os.dup2(newsi, sys.stdin.fileno())
957
958 bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
959 bb.msg.note(1, bb.msg.domain.RunQueue,
960 "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1,
961 self.stats.total,
962 task,
963 self.get_user_idstring(task)))
964
958 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) 965 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
959 try: 966 try:
960 self.cooker.tryBuild(fn, taskname[3:]) 967 self.cooker.tryBuild(fn, taskname[3:])
961 except bb.build.EventException: 968 except bb.build.EventException:
962 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") 969 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
963 sys.exit(1) 970 os._exit(1)
964 except: 971 except:
965 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") 972 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
966 raise 973 os._exit(1)
967 sys.exit(0) 974 os._exit(0)
975
968 self.build_pids[pid] = task 976 self.build_pids[pid] = task
977 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
969 self.runq_running[task] = 1 978 self.runq_running[task] = 1
970 self.stats.taskActive() 979 self.stats.taskActive()
971 if self.stats.active < self.number_tasks: 980 if self.stats.active < self.number_tasks:
972 continue 981 continue
982
983 for pipe in self.build_pipes:
984 self.build_pipes[pipe].read()
985
973 if self.stats.active > 0: 986 if self.stats.active > 0:
974 result = os.waitpid(-1, os.WNOHANG) 987 result = os.waitpid(-1, os.WNOHANG)
975 if result[0] is 0 and result[1] is 0: 988 if result[0] is 0 and result[1] is 0:
976 return 989 return
977 task = self.build_pids[result[0]] 990 task = self.build_pids[result[0]]
978 del self.build_pids[result[0]] 991 del self.build_pids[result[0]]
992 self.build_pipes[result[0]].close()
993 del self.build_pipes[result[0]]
979 if result[1] != 0: 994 if result[1] != 0:
980 self.task_fail(task, result[1]) 995 self.task_fail(task, result[1])
981 return 996 return
982 self.task_complete(task) 997 self.task_complete(task)
983 self.stats.taskCompleted() 998 self.stats.taskCompleted()
984 bb.event.fire(runQueueTaskCompleted(task, self.stats, self, self.cfgData)) 999 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
985 continue 1000 continue
986 1001
987 if len(self.failed_fnids) != 0: 1002 if len(self.failed_fnids) != 0:
@@ -1006,6 +1021,8 @@ class RunQueue:
1006 os.kill(-k, signal.SIGINT) 1021 os.kill(-k, signal.SIGINT)
1007 except: 1022 except:
1008 pass 1023 pass
1024 for pipe in self.build_pipes:
1025 self.build_pipes[pipe].read()
1009 1026
1010 def finish_runqueue(self, now = False): 1027 def finish_runqueue(self, now = False):
1011 self.state = runQueueCleanUp 1028 self.state = runQueueCleanUp
@@ -1013,7 +1030,7 @@ class RunQueue:
1013 self.finish_runqueue_now() 1030 self.finish_runqueue_now()
1014 try: 1031 try:
1015 while self.stats.active > 0: 1032 while self.stats.active > 0:
1016 bb.event.fire(runQueueExitWait(self.stats.active, self.cfgData)) 1033 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1017 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active) 1034 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active)
1018 tasknum = 1 1035 tasknum = 1
1019 for k, v in self.build_pids.iteritems(): 1036 for k, v in self.build_pids.iteritems():
@@ -1024,11 +1041,13 @@ class RunQueue:
1024 return 1041 return
1025 task = self.build_pids[result[0]] 1042 task = self.build_pids[result[0]]
1026 del self.build_pids[result[0]] 1043 del self.build_pids[result[0]]
1044 self.build_pipes[result[0]].close()
1045 del self.build_pipes[result[0]]
1027 if result[1] != 0: 1046 if result[1] != 0:
1028 self.task_fail(task, result[1]) 1047 self.task_fail(task, result[1])
1029 else: 1048 else:
1030 self.stats.taskCompleted() 1049 self.stats.taskCompleted()
1031 bb.event.fire(runQueueTaskCompleted(task, self.stats, self, self.cfgData)) 1050 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
1032 except: 1051 except:
1033 self.finish_runqueue_now() 1052 self.finish_runqueue_now()
1034 raise 1053 raise
@@ -1078,43 +1097,43 @@ class runQueueExitWait(bb.event.Event):
1078 Event when waiting for task processes to exit 1097 Event when waiting for task processes to exit
1079 """ 1098 """
1080 1099
1081 def __init__(self, remain, d): 1100 def __init__(self, remain):
1082 self.remain = remain 1101 self.remain = remain
1083 self.message = "Waiting for %s active tasks to finish" % remain 1102 self.message = "Waiting for %s active tasks to finish" % remain
1084 bb.event.Event.__init__(self, d) 1103 bb.event.Event.__init__(self)
1085 1104
1086class runQueueEvent(bb.event.Event): 1105class runQueueEvent(bb.event.Event):
1087 """ 1106 """
1088 Base runQueue event class 1107 Base runQueue event class
1089 """ 1108 """
1090 def __init__(self, task, stats, rq, d): 1109 def __init__(self, task, stats, rq):
1091 self.taskid = task 1110 self.taskid = task
1092 self.taskstring = rq.get_user_idstring(task) 1111 self.taskstring = rq.get_user_idstring(task)
1093 self.stats = stats 1112 self.stats = stats
1094 bb.event.Event.__init__(self, d) 1113 bb.event.Event.__init__(self)
1095 1114
1096class runQueueTaskStarted(runQueueEvent): 1115class runQueueTaskStarted(runQueueEvent):
1097 """ 1116 """
1098 Event notifing a task was started 1117 Event notifing a task was started
1099 """ 1118 """
1100 def __init__(self, task, stats, rq, d): 1119 def __init__(self, task, stats, rq):
1101 runQueueEvent.__init__(self, task, stats, rq, d) 1120 runQueueEvent.__init__(self, task, stats, rq)
1102 self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) 1121 self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring)
1103 1122
1104class runQueueTaskFailed(runQueueEvent): 1123class runQueueTaskFailed(runQueueEvent):
1105 """ 1124 """
1106 Event notifing a task failed 1125 Event notifing a task failed
1107 """ 1126 """
1108 def __init__(self, task, stats, rq, d): 1127 def __init__(self, task, stats, rq):
1109 runQueueEvent.__init__(self, task, stats, rq, d) 1128 runQueueEvent.__init__(self, task, stats, rq)
1110 self.message = "Task %s failed (%s)" % (task, self.taskstring) 1129 self.message = "Task %s failed (%s)" % (task, self.taskstring)
1111 1130
1112class runQueueTaskCompleted(runQueueEvent): 1131class runQueueTaskCompleted(runQueueEvent):
1113 """ 1132 """
1114 Event notifing a task completed 1133 Event notifing a task completed
1115 """ 1134 """
1116 def __init__(self, task, stats, rq, d): 1135 def __init__(self, task, stats, rq):
1117 runQueueEvent.__init__(self, task, stats, rq, d) 1136 runQueueEvent.__init__(self, task, stats, rq)
1118 self.message = "Task %s completed (%s)" % (task, self.taskstring) 1137 self.message = "Task %s completed (%s)" % (task, self.taskstring)
1119 1138
1120def check_stamp_fn(fn, taskname, d): 1139def check_stamp_fn(fn, taskname, d):
@@ -1124,3 +1143,32 @@ def check_stamp_fn(fn, taskname, d):
1124 if taskid is not None: 1143 if taskid is not None:
1125 return rq.check_stamp_task(taskid) 1144 return rq.check_stamp_task(taskid)
1126 return None 1145 return None
1146
1147class runQueuePipe():
1148 """
1149 Abstraction for a pipe between a worker thread and the server
1150 """
1151 def __init__(self, pipein, pipeout, d):
1152 self.fd = pipein
1153 os.close(pipeout)
1154 self.queue = ""
1155 self.d = d
1156
1157 def read(self):
1158 start = len(self.queue)
1159 self.queue = self.queue + os.read(self.fd, 1024)
1160 end = len(self.queue)
1161 index = self.queue.find("</event>")
1162 while index != -1:
1163 bb.event.fire_from_worker(self.queue[:index+8], self.d)
1164 self.queue = self.queue[index+8:]
1165 index = self.queue.find("</event>")
1166 return (end > start)
1167
1168 def close(self):
1169 while self.read():
1170 continue
1171 if len(self.queue) > 0:
1172 print "Warning, worker left partial message"
1173 os.close(self.fd)
1174