diff options
Diffstat (limited to 'bitbake-dev/lib/bb/runqueue.py')
-rw-r--r-- | bitbake-dev/lib/bb/runqueue.py | 1174 |
1 files changed, 0 insertions, 1174 deletions
diff --git a/bitbake-dev/lib/bb/runqueue.py b/bitbake-dev/lib/bb/runqueue.py deleted file mode 100644 index c3ad442e47..0000000000 --- a/bitbake-dev/lib/bb/runqueue.py +++ /dev/null | |||
@@ -1,1174 +0,0 @@ | |||
1 | #!/usr/bin/env python | ||
2 | # ex:ts=4:sw=4:sts=4:et | ||
3 | # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- | ||
4 | """ | ||
5 | BitBake 'RunQueue' implementation | ||
6 | |||
7 | Handles preparation and execution of a queue of tasks | ||
8 | """ | ||
9 | |||
10 | # Copyright (C) 2006-2007 Richard Purdie | ||
11 | # | ||
12 | # This program is free software; you can redistribute it and/or modify | ||
13 | # it under the terms of the GNU General Public License version 2 as | ||
14 | # published by the Free Software Foundation. | ||
15 | # | ||
16 | # This program is distributed in the hope that it will be useful, | ||
17 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
18 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
19 | # GNU General Public License for more details. | ||
20 | # | ||
21 | # You should have received a copy of the GNU General Public License along | ||
22 | # with this program; if not, write to the Free Software Foundation, Inc., | ||
23 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
24 | |||
25 | from bb import msg, data, event, mkdirhier, utils | ||
26 | import bb, os, sys | ||
27 | import signal | ||
28 | import stat | ||
29 | |||
30 | class TaskFailure(Exception): | ||
31 | """Exception raised when a task in a runqueue fails""" | ||
32 | def __init__(self, x): | ||
33 | self.args = x | ||
34 | |||
35 | |||
36 | class RunQueueStats: | ||
37 | """ | ||
38 | Holds statistics on the tasks handled by the associated runQueue | ||
39 | """ | ||
40 | def __init__(self, total): | ||
41 | self.completed = 0 | ||
42 | self.skipped = 0 | ||
43 | self.failed = 0 | ||
44 | self.active = 0 | ||
45 | self.total = total | ||
46 | |||
47 | def taskFailed(self): | ||
48 | self.active = self.active - 1 | ||
49 | self.failed = self.failed + 1 | ||
50 | |||
51 | def taskCompleted(self, number = 1): | ||
52 | self.active = self.active - number | ||
53 | self.completed = self.completed + number | ||
54 | |||
55 | def taskSkipped(self, number = 1): | ||
56 | self.active = self.active + number | ||
57 | self.skipped = self.skipped + number | ||
58 | |||
59 | def taskActive(self): | ||
60 | self.active = self.active + 1 | ||
61 | |||
62 | # These values indicate the next step due to be run in the | ||
63 | # runQueue state machine | ||
64 | runQueuePrepare = 2 | ||
65 | runQueueRunInit = 3 | ||
66 | runQueueRunning = 4 | ||
67 | runQueueFailed = 6 | ||
68 | runQueueCleanUp = 7 | ||
69 | runQueueComplete = 8 | ||
70 | runQueueChildProcess = 9 | ||
71 | |||
72 | class RunQueueScheduler: | ||
73 | """ | ||
74 | Control the order tasks are scheduled in. | ||
75 | """ | ||
76 | def __init__(self, runqueue): | ||
77 | """ | ||
78 | The default scheduler just returns the first buildable task (the | ||
79 | priority map is sorted by task numer) | ||
80 | """ | ||
81 | self.rq = runqueue | ||
82 | numTasks = len(self.rq.runq_fnid) | ||
83 | |||
84 | self.prio_map = [] | ||
85 | self.prio_map.extend(range(numTasks)) | ||
86 | |||
87 | def next(self): | ||
88 | """ | ||
89 | Return the id of the first task we find that is buildable | ||
90 | """ | ||
91 | for task1 in range(len(self.rq.runq_fnid)): | ||
92 | task = self.prio_map[task1] | ||
93 | if self.rq.runq_running[task] == 1: | ||
94 | continue | ||
95 | if self.rq.runq_buildable[task] == 1: | ||
96 | return task | ||
97 | |||
98 | class RunQueueSchedulerSpeed(RunQueueScheduler): | ||
99 | """ | ||
100 | A scheduler optimised for speed. The priority map is sorted by task weight, | ||
101 | heavier weighted tasks (tasks needed by the most other tasks) are run first. | ||
102 | """ | ||
103 | def __init__(self, runqueue): | ||
104 | """ | ||
105 | The priority map is sorted by task weight. | ||
106 | """ | ||
107 | from copy import deepcopy | ||
108 | |||
109 | self.rq = runqueue | ||
110 | |||
111 | sortweight = deepcopy(self.rq.runq_weight) | ||
112 | sortweight.sort() | ||
113 | copyweight = deepcopy(self.rq.runq_weight) | ||
114 | self.prio_map = [] | ||
115 | |||
116 | for weight in sortweight: | ||
117 | idx = copyweight.index(weight) | ||
118 | self.prio_map.append(idx) | ||
119 | copyweight[idx] = -1 | ||
120 | |||
121 | self.prio_map.reverse() | ||
122 | |||
123 | class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): | ||
124 | """ | ||
125 | A scheduler optimised to complete .bb files are quickly as possible. The | ||
126 | priority map is sorted by task weight, but then reordered so once a given | ||
127 | .bb file starts to build, its completed as quickly as possible. This works | ||
128 | well where disk space is at a premium and classes like OE's rm_work are in | ||
129 | force. | ||
130 | """ | ||
131 | def __init__(self, runqueue): | ||
132 | RunQueueSchedulerSpeed.__init__(self, runqueue) | ||
133 | from copy import deepcopy | ||
134 | |||
135 | #FIXME - whilst this groups all fnids together it does not reorder the | ||
136 | #fnid groups optimally. | ||
137 | |||
138 | basemap = deepcopy(self.prio_map) | ||
139 | self.prio_map = [] | ||
140 | while (len(basemap) > 0): | ||
141 | entry = basemap.pop(0) | ||
142 | self.prio_map.append(entry) | ||
143 | fnid = self.rq.runq_fnid[entry] | ||
144 | todel = [] | ||
145 | for entry in basemap: | ||
146 | entry_fnid = self.rq.runq_fnid[entry] | ||
147 | if entry_fnid == fnid: | ||
148 | todel.append(basemap.index(entry)) | ||
149 | self.prio_map.append(entry) | ||
150 | todel.reverse() | ||
151 | for idx in todel: | ||
152 | del basemap[idx] | ||
153 | |||
154 | class RunQueue: | ||
155 | """ | ||
156 | BitBake Run Queue implementation | ||
157 | """ | ||
158 | def __init__(self, cooker, cfgData, dataCache, taskData, targets): | ||
159 | self.reset_runqueue() | ||
160 | self.cooker = cooker | ||
161 | self.dataCache = dataCache | ||
162 | self.taskData = taskData | ||
163 | self.cfgData = cfgData | ||
164 | self.targets = targets | ||
165 | |||
166 | self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1) | ||
167 | self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split() | ||
168 | self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed" | ||
169 | self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile" | ||
170 | self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or "" | ||
171 | |||
172 | def reset_runqueue(self): | ||
173 | self.runq_fnid = [] | ||
174 | self.runq_task = [] | ||
175 | self.runq_depends = [] | ||
176 | self.runq_revdeps = [] | ||
177 | |||
178 | self.state = runQueuePrepare | ||
179 | |||
180 | def get_user_idstring(self, task): | ||
181 | fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
182 | taskname = self.runq_task[task] | ||
183 | return "%s, %s" % (fn, taskname) | ||
184 | |||
185 | def get_task_id(self, fnid, taskname): | ||
186 | for listid in range(len(self.runq_fnid)): | ||
187 | if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname: | ||
188 | return listid | ||
189 | return None | ||
190 | |||
191 | def circular_depchains_handler(self, tasks): | ||
192 | """ | ||
193 | Some tasks aren't buildable, likely due to circular dependency issues. | ||
194 | Identify the circular dependencies and print them in a user readable format. | ||
195 | """ | ||
196 | from copy import deepcopy | ||
197 | |||
198 | valid_chains = [] | ||
199 | explored_deps = {} | ||
200 | msgs = [] | ||
201 | |||
202 | def chain_reorder(chain): | ||
203 | """ | ||
204 | Reorder a dependency chain so the lowest task id is first | ||
205 | """ | ||
206 | lowest = 0 | ||
207 | new_chain = [] | ||
208 | for entry in range(len(chain)): | ||
209 | if chain[entry] < chain[lowest]: | ||
210 | lowest = entry | ||
211 | new_chain.extend(chain[lowest:]) | ||
212 | new_chain.extend(chain[:lowest]) | ||
213 | return new_chain | ||
214 | |||
215 | def chain_compare_equal(chain1, chain2): | ||
216 | """ | ||
217 | Compare two dependency chains and see if they're the same | ||
218 | """ | ||
219 | if len(chain1) != len(chain2): | ||
220 | return False | ||
221 | for index in range(len(chain1)): | ||
222 | if chain1[index] != chain2[index]: | ||
223 | return False | ||
224 | return True | ||
225 | |||
226 | def chain_array_contains(chain, chain_array): | ||
227 | """ | ||
228 | Return True if chain_array contains chain | ||
229 | """ | ||
230 | for ch in chain_array: | ||
231 | if chain_compare_equal(ch, chain): | ||
232 | return True | ||
233 | return False | ||
234 | |||
235 | def find_chains(taskid, prev_chain): | ||
236 | prev_chain.append(taskid) | ||
237 | total_deps = [] | ||
238 | total_deps.extend(self.runq_revdeps[taskid]) | ||
239 | for revdep in self.runq_revdeps[taskid]: | ||
240 | if revdep in prev_chain: | ||
241 | idx = prev_chain.index(revdep) | ||
242 | # To prevent duplicates, reorder the chain to start with the lowest taskid | ||
243 | # and search through an array of those we've already printed | ||
244 | chain = prev_chain[idx:] | ||
245 | new_chain = chain_reorder(chain) | ||
246 | if not chain_array_contains(new_chain, valid_chains): | ||
247 | valid_chains.append(new_chain) | ||
248 | msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) | ||
249 | for dep in new_chain: | ||
250 | msgs.append(" Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep])) | ||
251 | msgs.append("\n") | ||
252 | if len(valid_chains) > 10: | ||
253 | msgs.append("Aborted dependency loops search after 10 matches.\n") | ||
254 | return msgs | ||
255 | continue | ||
256 | scan = False | ||
257 | if revdep not in explored_deps: | ||
258 | scan = True | ||
259 | elif revdep in explored_deps[revdep]: | ||
260 | scan = True | ||
261 | else: | ||
262 | for dep in prev_chain: | ||
263 | if dep in explored_deps[revdep]: | ||
264 | scan = True | ||
265 | if scan: | ||
266 | find_chains(revdep, deepcopy(prev_chain)) | ||
267 | for dep in explored_deps[revdep]: | ||
268 | if dep not in total_deps: | ||
269 | total_deps.append(dep) | ||
270 | |||
271 | explored_deps[taskid] = total_deps | ||
272 | |||
273 | for task in tasks: | ||
274 | find_chains(task, []) | ||
275 | |||
276 | return msgs | ||
277 | |||
278 | def calculate_task_weights(self, endpoints): | ||
279 | """ | ||
280 | Calculate a number representing the "weight" of each task. Heavier weighted tasks | ||
281 | have more dependencies and hence should be executed sooner for maximum speed. | ||
282 | |||
283 | This function also sanity checks the task list finding tasks that its not | ||
284 | possible to execute due to circular dependencies. | ||
285 | """ | ||
286 | |||
287 | numTasks = len(self.runq_fnid) | ||
288 | weight = [] | ||
289 | deps_left = [] | ||
290 | task_done = [] | ||
291 | |||
292 | for listid in range(numTasks): | ||
293 | task_done.append(False) | ||
294 | weight.append(0) | ||
295 | deps_left.append(len(self.runq_revdeps[listid])) | ||
296 | |||
297 | for listid in endpoints: | ||
298 | weight[listid] = 1 | ||
299 | task_done[listid] = True | ||
300 | |||
301 | while 1: | ||
302 | next_points = [] | ||
303 | for listid in endpoints: | ||
304 | for revdep in self.runq_depends[listid]: | ||
305 | weight[revdep] = weight[revdep] + weight[listid] | ||
306 | deps_left[revdep] = deps_left[revdep] - 1 | ||
307 | if deps_left[revdep] == 0: | ||
308 | next_points.append(revdep) | ||
309 | task_done[revdep] = True | ||
310 | endpoints = next_points | ||
311 | if len(next_points) == 0: | ||
312 | break | ||
313 | |||
314 | # Circular dependency sanity check | ||
315 | problem_tasks = [] | ||
316 | for task in range(numTasks): | ||
317 | if task_done[task] is False or deps_left[task] != 0: | ||
318 | problem_tasks.append(task) | ||
319 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task))) | ||
320 | bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task])) | ||
321 | |||
322 | if problem_tasks: | ||
323 | message = "Unbuildable tasks were found.\n" | ||
324 | message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" | ||
325 | message = message + "Identifying dependency loops (this may take a short while)...\n" | ||
326 | bb.msg.error(bb.msg.domain.RunQueue, message) | ||
327 | |||
328 | msgs = self.circular_depchains_handler(problem_tasks) | ||
329 | |||
330 | message = "\n" | ||
331 | for msg in msgs: | ||
332 | message = message + msg | ||
333 | bb.msg.fatal(bb.msg.domain.RunQueue, message) | ||
334 | |||
335 | return weight | ||
336 | |||
337 | def prepare_runqueue(self): | ||
338 | """ | ||
339 | Turn a set of taskData into a RunQueue and compute data needed | ||
340 | to optimise the execution order. | ||
341 | """ | ||
342 | |||
343 | runq_build = [] | ||
344 | recursive_tdepends = {} | ||
345 | runq_recrdepends = [] | ||
346 | tdepends_fnid = {} | ||
347 | |||
348 | taskData = self.taskData | ||
349 | |||
350 | if len(taskData.tasks_name) == 0: | ||
351 | # Nothing to do | ||
352 | return | ||
353 | |||
354 | bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue") | ||
355 | |||
356 | # Step A - Work out a list of tasks to run | ||
357 | # | ||
358 | # Taskdata gives us a list of possible providers for every build and run | ||
359 | # target ordered by priority. It also gives information on each of those | ||
360 | # providers. | ||
361 | # | ||
362 | # To create the actual list of tasks to execute we fix the list of | ||
363 | # providers and then resolve the dependencies into task IDs. This | ||
364 | # process is repeated for each type of dependency (tdepends, deptask, | ||
365 | # rdeptast, recrdeptask, idepends). | ||
366 | |||
367 | def add_build_dependencies(depids, tasknames, depends): | ||
368 | for depid in depids: | ||
369 | # Won't be in build_targets if ASSUME_PROVIDED | ||
370 | if depid not in taskData.build_targets: | ||
371 | continue | ||
372 | depdata = taskData.build_targets[depid][0] | ||
373 | if depdata is None: | ||
374 | continue | ||
375 | dep = taskData.fn_index[depdata] | ||
376 | for taskname in tasknames: | ||
377 | taskid = taskData.gettask_id(dep, taskname, False) | ||
378 | if taskid is not None: | ||
379 | depends.append(taskid) | ||
380 | |||
381 | def add_runtime_dependencies(depids, tasknames, depends): | ||
382 | for depid in depids: | ||
383 | if depid not in taskData.run_targets: | ||
384 | continue | ||
385 | depdata = taskData.run_targets[depid][0] | ||
386 | if depdata is None: | ||
387 | continue | ||
388 | dep = taskData.fn_index[depdata] | ||
389 | for taskname in tasknames: | ||
390 | taskid = taskData.gettask_id(dep, taskname, False) | ||
391 | if taskid is not None: | ||
392 | depends.append(taskid) | ||
393 | |||
394 | for task in range(len(taskData.tasks_name)): | ||
395 | depends = [] | ||
396 | recrdepends = [] | ||
397 | fnid = taskData.tasks_fnid[task] | ||
398 | fn = taskData.fn_index[fnid] | ||
399 | task_deps = self.dataCache.task_deps[fn] | ||
400 | |||
401 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Processing %s:%s" %(fn, taskData.tasks_name[task])) | ||
402 | |||
403 | if fnid not in taskData.failed_fnids: | ||
404 | |||
405 | # Resolve task internal dependencies | ||
406 | # | ||
407 | # e.g. addtask before X after Y | ||
408 | depends = taskData.tasks_tdepends[task] | ||
409 | |||
410 | # Resolve 'deptask' dependencies | ||
411 | # | ||
412 | # e.g. do_sometask[deptask] = "do_someothertask" | ||
413 | # (makes sure sometask runs after someothertask of all DEPENDS) | ||
414 | if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']: | ||
415 | tasknames = task_deps['deptask'][taskData.tasks_name[task]].split() | ||
416 | add_build_dependencies(taskData.depids[fnid], tasknames, depends) | ||
417 | |||
418 | # Resolve 'rdeptask' dependencies | ||
419 | # | ||
420 | # e.g. do_sometask[rdeptask] = "do_someothertask" | ||
421 | # (makes sure sometask runs after someothertask of all RDEPENDS) | ||
422 | if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']: | ||
423 | taskname = task_deps['rdeptask'][taskData.tasks_name[task]] | ||
424 | add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends) | ||
425 | |||
426 | # Resolve inter-task dependencies | ||
427 | # | ||
428 | # e.g. do_sometask[depends] = "targetname:do_someothertask" | ||
429 | # (makes sure sometask runs after targetname's someothertask) | ||
430 | if fnid not in tdepends_fnid: | ||
431 | tdepends_fnid[fnid] = set() | ||
432 | idepends = taskData.tasks_idepends[task] | ||
433 | for (depid, idependtask) in idepends: | ||
434 | if depid in taskData.build_targets: | ||
435 | # Won't be in build_targets if ASSUME_PROVIDED | ||
436 | depdata = taskData.build_targets[depid][0] | ||
437 | if depdata is not None: | ||
438 | dep = taskData.fn_index[depdata] | ||
439 | taskid = taskData.gettask_id(dep, idependtask) | ||
440 | depends.append(taskid) | ||
441 | if depdata != fnid: | ||
442 | tdepends_fnid[fnid].add(taskid) | ||
443 | |||
444 | |||
445 | # Resolve recursive 'recrdeptask' dependencies (A) | ||
446 | # | ||
447 | # e.g. do_sometask[recrdeptask] = "do_someothertask" | ||
448 | # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) | ||
449 | # We cover the recursive part of the dependencies below | ||
450 | if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']: | ||
451 | for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split(): | ||
452 | recrdepends.append(taskname) | ||
453 | add_build_dependencies(taskData.depids[fnid], [taskname], depends) | ||
454 | add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends) | ||
455 | |||
456 | # Rmove all self references | ||
457 | if task in depends: | ||
458 | newdep = [] | ||
459 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends)) | ||
460 | for dep in depends: | ||
461 | if task != dep: | ||
462 | newdep.append(dep) | ||
463 | depends = newdep | ||
464 | |||
465 | self.runq_fnid.append(taskData.tasks_fnid[task]) | ||
466 | self.runq_task.append(taskData.tasks_name[task]) | ||
467 | self.runq_depends.append(set(depends)) | ||
468 | self.runq_revdeps.append(set()) | ||
469 | |||
470 | runq_build.append(0) | ||
471 | runq_recrdepends.append(recrdepends) | ||
472 | |||
473 | # | ||
474 | # Build a list of recursive cumulative dependencies for each fnid | ||
475 | # We do this by fnid, since if A depends on some task in B | ||
476 | # we're interested in later tasks B's fnid might have but B itself | ||
477 | # doesn't depend on | ||
478 | # | ||
479 | # Algorithm is O(tasks) + O(tasks)*O(fnids) | ||
480 | # | ||
481 | reccumdepends = {} | ||
482 | for task in range(len(self.runq_fnid)): | ||
483 | fnid = self.runq_fnid[task] | ||
484 | if fnid not in reccumdepends: | ||
485 | if fnid in tdepends_fnid: | ||
486 | reccumdepends[fnid] = tdepends_fnid[fnid] | ||
487 | else: | ||
488 | reccumdepends[fnid] = set() | ||
489 | reccumdepends[fnid].update(self.runq_depends[task]) | ||
490 | for task in range(len(self.runq_fnid)): | ||
491 | taskfnid = self.runq_fnid[task] | ||
492 | for fnid in reccumdepends: | ||
493 | if task in reccumdepends[fnid]: | ||
494 | reccumdepends[fnid].add(task) | ||
495 | if taskfnid in reccumdepends: | ||
496 | reccumdepends[fnid].update(reccumdepends[taskfnid]) | ||
497 | |||
498 | |||
499 | # Resolve recursive 'recrdeptask' dependencies (B) | ||
500 | # | ||
501 | # e.g. do_sometask[recrdeptask] = "do_someothertask" | ||
502 | # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) | ||
503 | for task in range(len(self.runq_fnid)): | ||
504 | if len(runq_recrdepends[task]) > 0: | ||
505 | taskfnid = self.runq_fnid[task] | ||
506 | for dep in reccumdepends[taskfnid]: | ||
507 | # Ignore self references | ||
508 | if dep == task: | ||
509 | continue | ||
510 | for taskname in runq_recrdepends[task]: | ||
511 | if taskData.tasks_name[dep] == taskname: | ||
512 | self.runq_depends[task].add(dep) | ||
513 | |||
514 | # Step B - Mark all active tasks | ||
515 | # | ||
516 | # Start with the tasks we were asked to run and mark all dependencies | ||
517 | # as active too. If the task is to be 'forced', clear its stamp. Once | ||
518 | # all active tasks are marked, prune the ones we don't need. | ||
519 | |||
520 | bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks") | ||
521 | |||
522 | def mark_active(listid, depth): | ||
523 | """ | ||
524 | Mark an item as active along with its depends | ||
525 | (calls itself recursively) | ||
526 | """ | ||
527 | |||
528 | if runq_build[listid] == 1: | ||
529 | return | ||
530 | |||
531 | runq_build[listid] = 1 | ||
532 | |||
533 | depends = self.runq_depends[listid] | ||
534 | for depend in depends: | ||
535 | mark_active(depend, depth+1) | ||
536 | |||
537 | self.target_pairs = [] | ||
538 | for target in self.targets: | ||
539 | targetid = taskData.getbuild_id(target[0]) | ||
540 | |||
541 | if targetid not in taskData.build_targets: | ||
542 | continue | ||
543 | |||
544 | if targetid in taskData.failed_deps: | ||
545 | continue | ||
546 | |||
547 | fnid = taskData.build_targets[targetid][0] | ||
548 | fn = taskData.fn_index[fnid] | ||
549 | self.target_pairs.append((fn, target[1])) | ||
550 | |||
551 | # Remove stamps for targets if force mode active | ||
552 | if self.cooker.configuration.force: | ||
553 | bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn)) | ||
554 | bb.build.del_stamp(target[1], self.dataCache, fn) | ||
555 | |||
556 | if fnid in taskData.failed_fnids: | ||
557 | continue | ||
558 | |||
559 | if target[1] not in taskData.tasks_lookup[fnid]: | ||
560 | bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0])) | ||
561 | |||
562 | listid = taskData.tasks_lookup[fnid][target[1]] | ||
563 | |||
564 | mark_active(listid, 1) | ||
565 | |||
566 | # Step C - Prune all inactive tasks | ||
567 | # | ||
568 | # Once all active tasks are marked, prune the ones we don't need. | ||
569 | |||
570 | maps = [] | ||
571 | delcount = 0 | ||
572 | for listid in range(len(self.runq_fnid)): | ||
573 | if runq_build[listid-delcount] == 1: | ||
574 | maps.append(listid-delcount) | ||
575 | else: | ||
576 | del self.runq_fnid[listid-delcount] | ||
577 | del self.runq_task[listid-delcount] | ||
578 | del self.runq_depends[listid-delcount] | ||
579 | del runq_build[listid-delcount] | ||
580 | del self.runq_revdeps[listid-delcount] | ||
581 | delcount = delcount + 1 | ||
582 | maps.append(-1) | ||
583 | |||
584 | # | ||
585 | # Step D - Sanity checks and computation | ||
586 | # | ||
587 | |||
588 | # Check to make sure we still have tasks to run | ||
589 | if len(self.runq_fnid) == 0: | ||
590 | if not taskData.abort: | ||
591 | bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") | ||
592 | else: | ||
593 | bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.") | ||
594 | |||
595 | bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid))) | ||
596 | |||
597 | # Remap the dependencies to account for the deleted tasks | ||
598 | # Check we didn't delete a task we depend on | ||
599 | for listid in range(len(self.runq_fnid)): | ||
600 | newdeps = [] | ||
601 | origdeps = self.runq_depends[listid] | ||
602 | for origdep in origdeps: | ||
603 | if maps[origdep] == -1: | ||
604 | bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!") | ||
605 | newdeps.append(maps[origdep]) | ||
606 | self.runq_depends[listid] = set(newdeps) | ||
607 | |||
608 | bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings") | ||
609 | |||
610 | # Generate a list of reverse dependencies to ease future calculations | ||
611 | for listid in range(len(self.runq_fnid)): | ||
612 | for dep in self.runq_depends[listid]: | ||
613 | self.runq_revdeps[dep].add(listid) | ||
614 | |||
615 | # Identify tasks at the end of dependency chains | ||
616 | # Error on circular dependency loops (length two) | ||
617 | endpoints = [] | ||
618 | for listid in range(len(self.runq_fnid)): | ||
619 | revdeps = self.runq_revdeps[listid] | ||
620 | if len(revdeps) == 0: | ||
621 | endpoints.append(listid) | ||
622 | for dep in revdeps: | ||
623 | if dep in self.runq_depends[listid]: | ||
624 | #self.dump_data(taskData) | ||
625 | bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid])) | ||
626 | |||
627 | bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints)) | ||
628 | |||
629 | # Calculate task weights | ||
630 | # Check of higher length circular dependencies | ||
631 | self.runq_weight = self.calculate_task_weights(endpoints) | ||
632 | |||
633 | # Decide what order to execute the tasks in, pick a scheduler | ||
634 | #self.sched = RunQueueScheduler(self) | ||
635 | if self.scheduler == "completion": | ||
636 | self.sched = RunQueueSchedulerCompletion(self) | ||
637 | else: | ||
638 | self.sched = RunQueueSchedulerSpeed(self) | ||
639 | |||
640 | # Sanity Check - Check for multiple tasks building the same provider | ||
641 | prov_list = {} | ||
642 | seen_fn = [] | ||
643 | for task in range(len(self.runq_fnid)): | ||
644 | fn = taskData.fn_index[self.runq_fnid[task]] | ||
645 | if fn in seen_fn: | ||
646 | continue | ||
647 | seen_fn.append(fn) | ||
648 | for prov in self.dataCache.fn_provides[fn]: | ||
649 | if prov not in prov_list: | ||
650 | prov_list[prov] = [fn] | ||
651 | elif fn not in prov_list[prov]: | ||
652 | prov_list[prov].append(fn) | ||
653 | error = False | ||
654 | for prov in prov_list: | ||
655 | if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist: | ||
656 | error = True | ||
657 | bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov]))) | ||
658 | #if error: | ||
659 | # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...") | ||
660 | |||
661 | |||
662 | # Create a whitelist usable by the stamp checks | ||
663 | stampfnwhitelist = [] | ||
664 | for entry in self.stampwhitelist.split(): | ||
665 | entryid = self.taskData.getbuild_id(entry) | ||
666 | if entryid not in self.taskData.build_targets: | ||
667 | continue | ||
668 | fnid = self.taskData.build_targets[entryid][0] | ||
669 | fn = self.taskData.fn_index[fnid] | ||
670 | stampfnwhitelist.append(fn) | ||
671 | self.stampfnwhitelist = stampfnwhitelist | ||
672 | |||
673 | #self.dump_data(taskData) | ||
674 | |||
675 | self.state = runQueueRunInit | ||
676 | |||
677 | def check_stamps(self): | ||
678 | unchecked = {} | ||
679 | current = [] | ||
680 | notcurrent = [] | ||
681 | buildable = [] | ||
682 | |||
683 | if self.stamppolicy == "perfile": | ||
684 | fulldeptree = False | ||
685 | else: | ||
686 | fulldeptree = True | ||
687 | stampwhitelist = [] | ||
688 | if self.stamppolicy == "whitelist": | ||
689 | stampwhitelist = self.self.stampfnwhitelist | ||
690 | |||
691 | for task in range(len(self.runq_fnid)): | ||
692 | unchecked[task] = "" | ||
693 | if len(self.runq_depends[task]) == 0: | ||
694 | buildable.append(task) | ||
695 | |||
696 | def check_buildable(self, task, buildable): | ||
697 | for revdep in self.runq_revdeps[task]: | ||
698 | alldeps = 1 | ||
699 | for dep in self.runq_depends[revdep]: | ||
700 | if dep in unchecked: | ||
701 | alldeps = 0 | ||
702 | if alldeps == 1: | ||
703 | if revdep in unchecked: | ||
704 | buildable.append(revdep) | ||
705 | |||
706 | for task in range(len(self.runq_fnid)): | ||
707 | if task not in unchecked: | ||
708 | continue | ||
709 | fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
710 | taskname = self.runq_task[task] | ||
711 | stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname) | ||
712 | # If the stamp is missing its not current | ||
713 | if not os.access(stampfile, os.F_OK): | ||
714 | del unchecked[task] | ||
715 | notcurrent.append(task) | ||
716 | check_buildable(self, task, buildable) | ||
717 | continue | ||
718 | # If its a 'nostamp' task, it's not current | ||
719 | taskdep = self.dataCache.task_deps[fn] | ||
720 | if 'nostamp' in taskdep and task in taskdep['nostamp']: | ||
721 | del unchecked[task] | ||
722 | notcurrent.append(task) | ||
723 | check_buildable(self, task, buildable) | ||
724 | continue | ||
725 | |||
726 | while (len(buildable) > 0): | ||
727 | nextbuildable = [] | ||
728 | for task in buildable: | ||
729 | if task in unchecked: | ||
730 | fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
731 | taskname = self.runq_task[task] | ||
732 | stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname) | ||
733 | iscurrent = True | ||
734 | |||
735 | t1 = os.stat(stampfile)[stat.ST_MTIME] | ||
736 | for dep in self.runq_depends[task]: | ||
737 | if iscurrent: | ||
738 | fn2 = self.taskData.fn_index[self.runq_fnid[dep]] | ||
739 | taskname2 = self.runq_task[dep] | ||
740 | stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2) | ||
741 | if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): | ||
742 | if dep in notcurrent: | ||
743 | iscurrent = False | ||
744 | else: | ||
745 | t2 = os.stat(stampfile2)[stat.ST_MTIME] | ||
746 | if t1 < t2: | ||
747 | iscurrent = False | ||
748 | del unchecked[task] | ||
749 | if iscurrent: | ||
750 | current.append(task) | ||
751 | else: | ||
752 | notcurrent.append(task) | ||
753 | |||
754 | check_buildable(self, task, nextbuildable) | ||
755 | |||
756 | buildable = nextbuildable | ||
757 | |||
758 | #for task in range(len(self.runq_fnid)): | ||
759 | # fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
760 | # taskname = self.runq_task[task] | ||
761 | # print "%s %s.%s" % (task, taskname, fn) | ||
762 | |||
763 | #print "Unchecked: %s" % unchecked | ||
764 | #print "Current: %s" % current | ||
765 | #print "Not current: %s" % notcurrent | ||
766 | |||
767 | if len(unchecked) > 0: | ||
768 | bb.fatal("check_stamps fatal internal error") | ||
769 | return current | ||
770 | |||
771 | def check_stamp_task(self, task): | ||
772 | |||
773 | if self.stamppolicy == "perfile": | ||
774 | fulldeptree = False | ||
775 | else: | ||
776 | fulldeptree = True | ||
777 | stampwhitelist = [] | ||
778 | if self.stamppolicy == "whitelist": | ||
779 | stampwhitelist = self.stampfnwhitelist | ||
780 | |||
781 | fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
782 | taskname = self.runq_task[task] | ||
783 | stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname) | ||
784 | # If the stamp is missing its not current | ||
785 | if not os.access(stampfile, os.F_OK): | ||
786 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile) | ||
787 | return False | ||
788 | # If its a 'nostamp' task, it's not current | ||
789 | taskdep = self.dataCache.task_deps[fn] | ||
790 | if 'nostamp' in taskdep and taskname in taskdep['nostamp']: | ||
791 | bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname)) | ||
792 | return False | ||
793 | |||
794 | iscurrent = True | ||
795 | t1 = os.stat(stampfile)[stat.ST_MTIME] | ||
796 | for dep in self.runq_depends[task]: | ||
797 | if iscurrent: | ||
798 | fn2 = self.taskData.fn_index[self.runq_fnid[dep]] | ||
799 | taskname2 = self.runq_task[dep] | ||
800 | stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2) | ||
801 | if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): | ||
802 | try: | ||
803 | t2 = os.stat(stampfile2)[stat.ST_MTIME] | ||
804 | if t1 < t2: | ||
805 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile,stampfile2)) | ||
806 | iscurrent = False | ||
807 | except: | ||
808 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2 ,stampfile)) | ||
809 | iscurrent = False | ||
810 | |||
811 | return iscurrent | ||
812 | |||
813 | def execute_runqueue(self): | ||
814 | """ | ||
815 | Run the tasks in a queue prepared by prepare_runqueue | ||
816 | Upon failure, optionally try to recover the build using any alternate providers | ||
817 | (if the abort on failure configuration option isn't set) | ||
818 | """ | ||
819 | |||
820 | if self.state is runQueuePrepare: | ||
821 | self.prepare_runqueue() | ||
822 | |||
823 | if self.state is runQueueRunInit: | ||
824 | bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue") | ||
825 | self.execute_runqueue_initVars() | ||
826 | |||
827 | if self.state is runQueueRunning: | ||
828 | self.execute_runqueue_internal() | ||
829 | |||
830 | if self.state is runQueueCleanUp: | ||
831 | self.finish_runqueue() | ||
832 | |||
833 | if self.state is runQueueFailed: | ||
834 | if not self.taskData.tryaltconfigs: | ||
835 | raise bb.runqueue.TaskFailure(self.failed_fnids) | ||
836 | for fnid in self.failed_fnids: | ||
837 | self.taskData.fail_fnid(fnid) | ||
838 | self.reset_runqueue() | ||
839 | |||
840 | if self.state is runQueueComplete: | ||
841 | # All done | ||
842 | bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed)) | ||
843 | return False | ||
844 | |||
845 | if self.state is runQueueChildProcess: | ||
846 | print "Child process" | ||
847 | return False | ||
848 | |||
849 | # Loop | ||
850 | return True | ||
851 | |||
852 | def execute_runqueue_initVars(self): | ||
853 | |||
854 | self.stats = RunQueueStats(len(self.runq_fnid)) | ||
855 | |||
856 | self.runq_buildable = [] | ||
857 | self.runq_running = [] | ||
858 | self.runq_complete = [] | ||
859 | self.build_pids = {} | ||
860 | self.build_pipes = {} | ||
861 | self.failed_fnids = [] | ||
862 | |||
863 | # Mark initial buildable tasks | ||
864 | for task in range(self.stats.total): | ||
865 | self.runq_running.append(0) | ||
866 | self.runq_complete.append(0) | ||
867 | if len(self.runq_depends[task]) == 0: | ||
868 | self.runq_buildable.append(1) | ||
869 | else: | ||
870 | self.runq_buildable.append(0) | ||
871 | |||
872 | self.state = runQueueRunning | ||
873 | |||
874 | event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData) | ||
875 | |||
876 | def task_complete(self, task): | ||
877 | """ | ||
878 | Mark a task as completed | ||
879 | Look at the reverse dependencies and mark any task with | ||
880 | completed dependencies as buildable | ||
881 | """ | ||
882 | self.runq_complete[task] = 1 | ||
883 | for revdep in self.runq_revdeps[task]: | ||
884 | if self.runq_running[revdep] == 1: | ||
885 | continue | ||
886 | if self.runq_buildable[revdep] == 1: | ||
887 | continue | ||
888 | alldeps = 1 | ||
889 | for dep in self.runq_depends[revdep]: | ||
890 | if self.runq_complete[dep] != 1: | ||
891 | alldeps = 0 | ||
892 | if alldeps == 1: | ||
893 | self.runq_buildable[revdep] = 1 | ||
894 | fn = self.taskData.fn_index[self.runq_fnid[revdep]] | ||
895 | taskname = self.runq_task[revdep] | ||
896 | bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname)) | ||
897 | |||
898 | def task_fail(self, task, exitcode): | ||
899 | """ | ||
900 | Called when a task has failed | ||
901 | Updates the state engine with the failure | ||
902 | """ | ||
903 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode)) | ||
904 | self.stats.taskFailed() | ||
905 | fnid = self.runq_fnid[task] | ||
906 | self.failed_fnids.append(fnid) | ||
907 | bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData) | ||
908 | if self.taskData.abort: | ||
909 | self.state = runQueueCleanup | ||
910 | |||
911 | def execute_runqueue_internal(self): | ||
912 | """ | ||
913 | Run the tasks in a queue prepared by prepare_runqueue | ||
914 | """ | ||
915 | |||
916 | if self.stats.total == 0: | ||
917 | # nothing to do | ||
918 | self.state = runQueueCleanup | ||
919 | |||
920 | while True: | ||
921 | task = None | ||
922 | if self.stats.active < self.number_tasks: | ||
923 | task = self.sched.next() | ||
924 | if task is not None: | ||
925 | fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
926 | |||
927 | taskname = self.runq_task[task] | ||
928 | if self.check_stamp_task(task): | ||
929 | bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task))) | ||
930 | self.runq_running[task] = 1 | ||
931 | self.runq_buildable[task] = 1 | ||
932 | self.task_complete(task) | ||
933 | self.stats.taskCompleted() | ||
934 | self.stats.taskSkipped() | ||
935 | continue | ||
936 | |||
937 | sys.stdout.flush() | ||
938 | sys.stderr.flush() | ||
939 | try: | ||
940 | pipein, pipeout = os.pipe() | ||
941 | pid = os.fork() | ||
942 | except OSError, e: | ||
943 | bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) | ||
944 | if pid == 0: | ||
945 | os.close(pipein) | ||
946 | # Save out the PID so that the event can include it the | ||
947 | # events | ||
948 | bb.event.worker_pid = os.getpid() | ||
949 | bb.event.worker_pipe = pipeout | ||
950 | |||
951 | self.state = runQueueChildProcess | ||
952 | # Make the child the process group leader | ||
953 | os.setpgid(0, 0) | ||
954 | # No stdin | ||
955 | newsi = os.open('/dev/null', os.O_RDWR) | ||
956 | os.dup2(newsi, sys.stdin.fileno()) | ||
957 | |||
958 | bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) | ||
959 | bb.msg.note(1, bb.msg.domain.RunQueue, | ||
960 | "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, | ||
961 | self.stats.total, | ||
962 | task, | ||
963 | self.get_user_idstring(task))) | ||
964 | |||
965 | bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) | ||
966 | try: | ||
967 | self.cooker.tryBuild(fn, taskname[3:]) | ||
968 | except bb.build.EventException: | ||
969 | bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") | ||
970 | os._exit(1) | ||
971 | except: | ||
972 | bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") | ||
973 | os._exit(1) | ||
974 | os._exit(0) | ||
975 | |||
976 | self.build_pids[pid] = task | ||
977 | self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) | ||
978 | self.runq_running[task] = 1 | ||
979 | self.stats.taskActive() | ||
980 | if self.stats.active < self.number_tasks: | ||
981 | continue | ||
982 | |||
983 | for pipe in self.build_pipes: | ||
984 | self.build_pipes[pipe].read() | ||
985 | |||
986 | if self.stats.active > 0: | ||
987 | result = os.waitpid(-1, os.WNOHANG) | ||
988 | if result[0] is 0 and result[1] is 0: | ||
989 | return | ||
990 | task = self.build_pids[result[0]] | ||
991 | del self.build_pids[result[0]] | ||
992 | self.build_pipes[result[0]].close() | ||
993 | del self.build_pipes[result[0]] | ||
994 | if result[1] != 0: | ||
995 | self.task_fail(task, result[1]) | ||
996 | return | ||
997 | self.task_complete(task) | ||
998 | self.stats.taskCompleted() | ||
999 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) | ||
1000 | continue | ||
1001 | |||
1002 | if len(self.failed_fnids) != 0: | ||
1003 | self.state = runQueueFailed | ||
1004 | return | ||
1005 | |||
1006 | # Sanity Checks | ||
1007 | for task in range(self.stats.total): | ||
1008 | if self.runq_buildable[task] == 0: | ||
1009 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task) | ||
1010 | if self.runq_running[task] == 0: | ||
1011 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task) | ||
1012 | if self.runq_complete[task] == 0: | ||
1013 | bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task) | ||
1014 | self.state = runQueueComplete | ||
1015 | return | ||
1016 | |||
1017 | def finish_runqueue_now(self): | ||
1018 | bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active) | ||
1019 | for k, v in self.build_pids.iteritems(): | ||
1020 | try: | ||
1021 | os.kill(-k, signal.SIGINT) | ||
1022 | except: | ||
1023 | pass | ||
1024 | for pipe in self.build_pipes: | ||
1025 | self.build_pipes[pipe].read() | ||
1026 | |||
1027 | def finish_runqueue(self, now = False): | ||
1028 | self.state = runQueueCleanUp | ||
1029 | if now: | ||
1030 | self.finish_runqueue_now() | ||
1031 | try: | ||
1032 | while self.stats.active > 0: | ||
1033 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | ||
1034 | bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active) | ||
1035 | tasknum = 1 | ||
1036 | for k, v in self.build_pids.iteritems(): | ||
1037 | bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k)) | ||
1038 | tasknum = tasknum + 1 | ||
1039 | result = os.waitpid(-1, os.WNOHANG) | ||
1040 | if result[0] is 0 and result[1] is 0: | ||
1041 | return | ||
1042 | task = self.build_pids[result[0]] | ||
1043 | del self.build_pids[result[0]] | ||
1044 | self.build_pipes[result[0]].close() | ||
1045 | del self.build_pipes[result[0]] | ||
1046 | if result[1] != 0: | ||
1047 | self.task_fail(task, result[1]) | ||
1048 | else: | ||
1049 | self.stats.taskCompleted() | ||
1050 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData) | ||
1051 | except: | ||
1052 | self.finish_runqueue_now() | ||
1053 | raise | ||
1054 | |||
1055 | if len(self.failed_fnids) != 0: | ||
1056 | self.state = runQueueFailed | ||
1057 | return | ||
1058 | |||
1059 | self.state = runQueueComplete | ||
1060 | return | ||
1061 | |||
1062 | def dump_data(self, taskQueue): | ||
1063 | """ | ||
1064 | Dump some debug information on the internal data structures | ||
1065 | """ | ||
1066 | bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:") | ||
1067 | for task in range(len(self.runq_task)): | ||
1068 | bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, | ||
1069 | taskQueue.fn_index[self.runq_fnid[task]], | ||
1070 | self.runq_task[task], | ||
1071 | self.runq_weight[task], | ||
1072 | self.runq_depends[task], | ||
1073 | self.runq_revdeps[task])) | ||
1074 | |||
1075 | bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:") | ||
1076 | for task1 in range(len(self.runq_task)): | ||
1077 | if task1 in self.prio_map: | ||
1078 | task = self.prio_map[task1] | ||
1079 | bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task, | ||
1080 | taskQueue.fn_index[self.runq_fnid[task]], | ||
1081 | self.runq_task[task], | ||
1082 | self.runq_weight[task], | ||
1083 | self.runq_depends[task], | ||
1084 | self.runq_revdeps[task])) | ||
1085 | |||
1086 | |||
1087 | class TaskFailure(Exception): | ||
1088 | """ | ||
1089 | Exception raised when a task in a runqueue fails | ||
1090 | """ | ||
1091 | def __init__(self, x): | ||
1092 | self.args = x | ||
1093 | |||
1094 | |||
1095 | class runQueueExitWait(bb.event.Event): | ||
1096 | """ | ||
1097 | Event when waiting for task processes to exit | ||
1098 | """ | ||
1099 | |||
1100 | def __init__(self, remain): | ||
1101 | self.remain = remain | ||
1102 | self.message = "Waiting for %s active tasks to finish" % remain | ||
1103 | bb.event.Event.__init__(self) | ||
1104 | |||
1105 | class runQueueEvent(bb.event.Event): | ||
1106 | """ | ||
1107 | Base runQueue event class | ||
1108 | """ | ||
1109 | def __init__(self, task, stats, rq): | ||
1110 | self.taskid = task | ||
1111 | self.taskstring = rq.get_user_idstring(task) | ||
1112 | self.stats = stats | ||
1113 | bb.event.Event.__init__(self) | ||
1114 | |||
1115 | class runQueueTaskStarted(runQueueEvent): | ||
1116 | """ | ||
1117 | Event notifing a task was started | ||
1118 | """ | ||
1119 | def __init__(self, task, stats, rq): | ||
1120 | runQueueEvent.__init__(self, task, stats, rq) | ||
1121 | self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring) | ||
1122 | |||
1123 | class runQueueTaskFailed(runQueueEvent): | ||
1124 | """ | ||
1125 | Event notifing a task failed | ||
1126 | """ | ||
1127 | def __init__(self, task, stats, rq): | ||
1128 | runQueueEvent.__init__(self, task, stats, rq) | ||
1129 | self.message = "Task %s failed (%s)" % (task, self.taskstring) | ||
1130 | |||
1131 | class runQueueTaskCompleted(runQueueEvent): | ||
1132 | """ | ||
1133 | Event notifing a task completed | ||
1134 | """ | ||
1135 | def __init__(self, task, stats, rq): | ||
1136 | runQueueEvent.__init__(self, task, stats, rq) | ||
1137 | self.message = "Task %s completed (%s)" % (task, self.taskstring) | ||
1138 | |||
1139 | def check_stamp_fn(fn, taskname, d): | ||
1140 | rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) | ||
1141 | fnid = rq.taskData.getfn_id(fn) | ||
1142 | taskid = rq.get_task_id(fnid, taskname) | ||
1143 | if taskid is not None: | ||
1144 | return rq.check_stamp_task(taskid) | ||
1145 | return None | ||
1146 | |||
1147 | class runQueuePipe(): | ||
1148 | """ | ||
1149 | Abstraction for a pipe between a worker thread and the server | ||
1150 | """ | ||
1151 | def __init__(self, pipein, pipeout, d): | ||
1152 | self.fd = pipein | ||
1153 | os.close(pipeout) | ||
1154 | self.queue = "" | ||
1155 | self.d = d | ||
1156 | |||
1157 | def read(self): | ||
1158 | start = len(self.queue) | ||
1159 | self.queue = self.queue + os.read(self.fd, 1024) | ||
1160 | end = len(self.queue) | ||
1161 | index = self.queue.find("</event>") | ||
1162 | while index != -1: | ||
1163 | bb.event.fire_from_worker(self.queue[:index+8], self.d) | ||
1164 | self.queue = self.queue[index+8:] | ||
1165 | index = self.queue.find("</event>") | ||
1166 | return (end > start) | ||
1167 | |||
1168 | def close(self): | ||
1169 | while self.read(): | ||
1170 | continue | ||
1171 | if len(self.queue) > 0: | ||
1172 | print "Warning, worker left partial message" | ||
1173 | os.close(self.fd) | ||
1174 | |||