summaryrefslogtreecommitdiffstats
path: root/bitbake-dev/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake-dev/lib/bb/runqueue.py')
-rw-r--r--bitbake-dev/lib/bb/runqueue.py1174
1 files changed, 0 insertions, 1174 deletions
diff --git a/bitbake-dev/lib/bb/runqueue.py b/bitbake-dev/lib/bb/runqueue.py
deleted file mode 100644
index c3ad442e47..0000000000
--- a/bitbake-dev/lib/bb/runqueue.py
+++ /dev/null
@@ -1,1174 +0,0 @@
1#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25from bb import msg, data, event, mkdirhier, utils
26import bb, os, sys
27import signal
28import stat
29
30class TaskFailure(Exception):
31 """Exception raised when a task in a runqueue fails"""
32 def __init__(self, x):
33 self.args = x
34
35
36class RunQueueStats:
37 """
38 Holds statistics on the tasks handled by the associated runQueue
39 """
40 def __init__(self, total):
41 self.completed = 0
42 self.skipped = 0
43 self.failed = 0
44 self.active = 0
45 self.total = total
46
47 def taskFailed(self):
48 self.active = self.active - 1
49 self.failed = self.failed + 1
50
51 def taskCompleted(self, number = 1):
52 self.active = self.active - number
53 self.completed = self.completed + number
54
55 def taskSkipped(self, number = 1):
56 self.active = self.active + number
57 self.skipped = self.skipped + number
58
59 def taskActive(self):
60 self.active = self.active + 1
61
62# These values indicate the next step due to be run in the
63# runQueue state machine
64runQueuePrepare = 2
65runQueueRunInit = 3
66runQueueRunning = 4
67runQueueFailed = 6
68runQueueCleanUp = 7
69runQueueComplete = 8
70runQueueChildProcess = 9
71
72class RunQueueScheduler:
73 """
74 Control the order tasks are scheduled in.
75 """
76 def __init__(self, runqueue):
77 """
78 The default scheduler just returns the first buildable task (the
79 priority map is sorted by task numer)
80 """
81 self.rq = runqueue
82 numTasks = len(self.rq.runq_fnid)
83
84 self.prio_map = []
85 self.prio_map.extend(range(numTasks))
86
87 def next(self):
88 """
89 Return the id of the first task we find that is buildable
90 """
91 for task1 in range(len(self.rq.runq_fnid)):
92 task = self.prio_map[task1]
93 if self.rq.runq_running[task] == 1:
94 continue
95 if self.rq.runq_buildable[task] == 1:
96 return task
97
98class RunQueueSchedulerSpeed(RunQueueScheduler):
99 """
100 A scheduler optimised for speed. The priority map is sorted by task weight,
101 heavier weighted tasks (tasks needed by the most other tasks) are run first.
102 """
103 def __init__(self, runqueue):
104 """
105 The priority map is sorted by task weight.
106 """
107 from copy import deepcopy
108
109 self.rq = runqueue
110
111 sortweight = deepcopy(self.rq.runq_weight)
112 sortweight.sort()
113 copyweight = deepcopy(self.rq.runq_weight)
114 self.prio_map = []
115
116 for weight in sortweight:
117 idx = copyweight.index(weight)
118 self.prio_map.append(idx)
119 copyweight[idx] = -1
120
121 self.prio_map.reverse()
122
123class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
124 """
125 A scheduler optimised to complete .bb files are quickly as possible. The
126 priority map is sorted by task weight, but then reordered so once a given
127 .bb file starts to build, its completed as quickly as possible. This works
128 well where disk space is at a premium and classes like OE's rm_work are in
129 force.
130 """
131 def __init__(self, runqueue):
132 RunQueueSchedulerSpeed.__init__(self, runqueue)
133 from copy import deepcopy
134
135 #FIXME - whilst this groups all fnids together it does not reorder the
136 #fnid groups optimally.
137
138 basemap = deepcopy(self.prio_map)
139 self.prio_map = []
140 while (len(basemap) > 0):
141 entry = basemap.pop(0)
142 self.prio_map.append(entry)
143 fnid = self.rq.runq_fnid[entry]
144 todel = []
145 for entry in basemap:
146 entry_fnid = self.rq.runq_fnid[entry]
147 if entry_fnid == fnid:
148 todel.append(basemap.index(entry))
149 self.prio_map.append(entry)
150 todel.reverse()
151 for idx in todel:
152 del basemap[idx]
153
154class RunQueue:
155 """
156 BitBake Run Queue implementation
157 """
158 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
159 self.reset_runqueue()
160 self.cooker = cooker
161 self.dataCache = dataCache
162 self.taskData = taskData
163 self.cfgData = cfgData
164 self.targets = targets
165
166 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
167 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
168 self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
169 self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
170 self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
171
172 def reset_runqueue(self):
173 self.runq_fnid = []
174 self.runq_task = []
175 self.runq_depends = []
176 self.runq_revdeps = []
177
178 self.state = runQueuePrepare
179
180 def get_user_idstring(self, task):
181 fn = self.taskData.fn_index[self.runq_fnid[task]]
182 taskname = self.runq_task[task]
183 return "%s, %s" % (fn, taskname)
184
185 def get_task_id(self, fnid, taskname):
186 for listid in range(len(self.runq_fnid)):
187 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
188 return listid
189 return None
190
191 def circular_depchains_handler(self, tasks):
192 """
193 Some tasks aren't buildable, likely due to circular dependency issues.
194 Identify the circular dependencies and print them in a user readable format.
195 """
196 from copy import deepcopy
197
198 valid_chains = []
199 explored_deps = {}
200 msgs = []
201
202 def chain_reorder(chain):
203 """
204 Reorder a dependency chain so the lowest task id is first
205 """
206 lowest = 0
207 new_chain = []
208 for entry in range(len(chain)):
209 if chain[entry] < chain[lowest]:
210 lowest = entry
211 new_chain.extend(chain[lowest:])
212 new_chain.extend(chain[:lowest])
213 return new_chain
214
215 def chain_compare_equal(chain1, chain2):
216 """
217 Compare two dependency chains and see if they're the same
218 """
219 if len(chain1) != len(chain2):
220 return False
221 for index in range(len(chain1)):
222 if chain1[index] != chain2[index]:
223 return False
224 return True
225
226 def chain_array_contains(chain, chain_array):
227 """
228 Return True if chain_array contains chain
229 """
230 for ch in chain_array:
231 if chain_compare_equal(ch, chain):
232 return True
233 return False
234
235 def find_chains(taskid, prev_chain):
236 prev_chain.append(taskid)
237 total_deps = []
238 total_deps.extend(self.runq_revdeps[taskid])
239 for revdep in self.runq_revdeps[taskid]:
240 if revdep in prev_chain:
241 idx = prev_chain.index(revdep)
242 # To prevent duplicates, reorder the chain to start with the lowest taskid
243 # and search through an array of those we've already printed
244 chain = prev_chain[idx:]
245 new_chain = chain_reorder(chain)
246 if not chain_array_contains(new_chain, valid_chains):
247 valid_chains.append(new_chain)
248 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
249 for dep in new_chain:
250 msgs.append(" Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
251 msgs.append("\n")
252 if len(valid_chains) > 10:
253 msgs.append("Aborted dependency loops search after 10 matches.\n")
254 return msgs
255 continue
256 scan = False
257 if revdep not in explored_deps:
258 scan = True
259 elif revdep in explored_deps[revdep]:
260 scan = True
261 else:
262 for dep in prev_chain:
263 if dep in explored_deps[revdep]:
264 scan = True
265 if scan:
266 find_chains(revdep, deepcopy(prev_chain))
267 for dep in explored_deps[revdep]:
268 if dep not in total_deps:
269 total_deps.append(dep)
270
271 explored_deps[taskid] = total_deps
272
273 for task in tasks:
274 find_chains(task, [])
275
276 return msgs
277
278 def calculate_task_weights(self, endpoints):
279 """
280 Calculate a number representing the "weight" of each task. Heavier weighted tasks
281 have more dependencies and hence should be executed sooner for maximum speed.
282
283 This function also sanity checks the task list finding tasks that its not
284 possible to execute due to circular dependencies.
285 """
286
287 numTasks = len(self.runq_fnid)
288 weight = []
289 deps_left = []
290 task_done = []
291
292 for listid in range(numTasks):
293 task_done.append(False)
294 weight.append(0)
295 deps_left.append(len(self.runq_revdeps[listid]))
296
297 for listid in endpoints:
298 weight[listid] = 1
299 task_done[listid] = True
300
301 while 1:
302 next_points = []
303 for listid in endpoints:
304 for revdep in self.runq_depends[listid]:
305 weight[revdep] = weight[revdep] + weight[listid]
306 deps_left[revdep] = deps_left[revdep] - 1
307 if deps_left[revdep] == 0:
308 next_points.append(revdep)
309 task_done[revdep] = True
310 endpoints = next_points
311 if len(next_points) == 0:
312 break
313
314 # Circular dependency sanity check
315 problem_tasks = []
316 for task in range(numTasks):
317 if task_done[task] is False or deps_left[task] != 0:
318 problem_tasks.append(task)
319 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
320 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
321
322 if problem_tasks:
323 message = "Unbuildable tasks were found.\n"
324 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
325 message = message + "Identifying dependency loops (this may take a short while)...\n"
326 bb.msg.error(bb.msg.domain.RunQueue, message)
327
328 msgs = self.circular_depchains_handler(problem_tasks)
329
330 message = "\n"
331 for msg in msgs:
332 message = message + msg
333 bb.msg.fatal(bb.msg.domain.RunQueue, message)
334
335 return weight
336
337 def prepare_runqueue(self):
338 """
339 Turn a set of taskData into a RunQueue and compute data needed
340 to optimise the execution order.
341 """
342
343 runq_build = []
344 recursive_tdepends = {}
345 runq_recrdepends = []
346 tdepends_fnid = {}
347
348 taskData = self.taskData
349
350 if len(taskData.tasks_name) == 0:
351 # Nothing to do
352 return
353
354 bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
355
356 # Step A - Work out a list of tasks to run
357 #
358 # Taskdata gives us a list of possible providers for every build and run
359 # target ordered by priority. It also gives information on each of those
360 # providers.
361 #
362 # To create the actual list of tasks to execute we fix the list of
363 # providers and then resolve the dependencies into task IDs. This
364 # process is repeated for each type of dependency (tdepends, deptask,
365 # rdeptast, recrdeptask, idepends).
366
367 def add_build_dependencies(depids, tasknames, depends):
368 for depid in depids:
369 # Won't be in build_targets if ASSUME_PROVIDED
370 if depid not in taskData.build_targets:
371 continue
372 depdata = taskData.build_targets[depid][0]
373 if depdata is None:
374 continue
375 dep = taskData.fn_index[depdata]
376 for taskname in tasknames:
377 taskid = taskData.gettask_id(dep, taskname, False)
378 if taskid is not None:
379 depends.append(taskid)
380
381 def add_runtime_dependencies(depids, tasknames, depends):
382 for depid in depids:
383 if depid not in taskData.run_targets:
384 continue
385 depdata = taskData.run_targets[depid][0]
386 if depdata is None:
387 continue
388 dep = taskData.fn_index[depdata]
389 for taskname in tasknames:
390 taskid = taskData.gettask_id(dep, taskname, False)
391 if taskid is not None:
392 depends.append(taskid)
393
394 for task in range(len(taskData.tasks_name)):
395 depends = []
396 recrdepends = []
397 fnid = taskData.tasks_fnid[task]
398 fn = taskData.fn_index[fnid]
399 task_deps = self.dataCache.task_deps[fn]
400
401 bb.msg.debug(2, bb.msg.domain.RunQueue, "Processing %s:%s" %(fn, taskData.tasks_name[task]))
402
403 if fnid not in taskData.failed_fnids:
404
405 # Resolve task internal dependencies
406 #
407 # e.g. addtask before X after Y
408 depends = taskData.tasks_tdepends[task]
409
410 # Resolve 'deptask' dependencies
411 #
412 # e.g. do_sometask[deptask] = "do_someothertask"
413 # (makes sure sometask runs after someothertask of all DEPENDS)
414 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
415 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
416 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
417
418 # Resolve 'rdeptask' dependencies
419 #
420 # e.g. do_sometask[rdeptask] = "do_someothertask"
421 # (makes sure sometask runs after someothertask of all RDEPENDS)
422 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
423 taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
424 add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
425
426 # Resolve inter-task dependencies
427 #
428 # e.g. do_sometask[depends] = "targetname:do_someothertask"
429 # (makes sure sometask runs after targetname's someothertask)
430 if fnid not in tdepends_fnid:
431 tdepends_fnid[fnid] = set()
432 idepends = taskData.tasks_idepends[task]
433 for (depid, idependtask) in idepends:
434 if depid in taskData.build_targets:
435 # Won't be in build_targets if ASSUME_PROVIDED
436 depdata = taskData.build_targets[depid][0]
437 if depdata is not None:
438 dep = taskData.fn_index[depdata]
439 taskid = taskData.gettask_id(dep, idependtask)
440 depends.append(taskid)
441 if depdata != fnid:
442 tdepends_fnid[fnid].add(taskid)
443
444
445 # Resolve recursive 'recrdeptask' dependencies (A)
446 #
447 # e.g. do_sometask[recrdeptask] = "do_someothertask"
448 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
449 # We cover the recursive part of the dependencies below
450 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
451 for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
452 recrdepends.append(taskname)
453 add_build_dependencies(taskData.depids[fnid], [taskname], depends)
454 add_runtime_dependencies(taskData.rdepids[fnid], [taskname], depends)
455
456 # Rmove all self references
457 if task in depends:
458 newdep = []
459 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
460 for dep in depends:
461 if task != dep:
462 newdep.append(dep)
463 depends = newdep
464
465 self.runq_fnid.append(taskData.tasks_fnid[task])
466 self.runq_task.append(taskData.tasks_name[task])
467 self.runq_depends.append(set(depends))
468 self.runq_revdeps.append(set())
469
470 runq_build.append(0)
471 runq_recrdepends.append(recrdepends)
472
473 #
474 # Build a list of recursive cumulative dependencies for each fnid
475 # We do this by fnid, since if A depends on some task in B
476 # we're interested in later tasks B's fnid might have but B itself
477 # doesn't depend on
478 #
479 # Algorithm is O(tasks) + O(tasks)*O(fnids)
480 #
481 reccumdepends = {}
482 for task in range(len(self.runq_fnid)):
483 fnid = self.runq_fnid[task]
484 if fnid not in reccumdepends:
485 if fnid in tdepends_fnid:
486 reccumdepends[fnid] = tdepends_fnid[fnid]
487 else:
488 reccumdepends[fnid] = set()
489 reccumdepends[fnid].update(self.runq_depends[task])
490 for task in range(len(self.runq_fnid)):
491 taskfnid = self.runq_fnid[task]
492 for fnid in reccumdepends:
493 if task in reccumdepends[fnid]:
494 reccumdepends[fnid].add(task)
495 if taskfnid in reccumdepends:
496 reccumdepends[fnid].update(reccumdepends[taskfnid])
497
498
499 # Resolve recursive 'recrdeptask' dependencies (B)
500 #
501 # e.g. do_sometask[recrdeptask] = "do_someothertask"
502 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
503 for task in range(len(self.runq_fnid)):
504 if len(runq_recrdepends[task]) > 0:
505 taskfnid = self.runq_fnid[task]
506 for dep in reccumdepends[taskfnid]:
507 # Ignore self references
508 if dep == task:
509 continue
510 for taskname in runq_recrdepends[task]:
511 if taskData.tasks_name[dep] == taskname:
512 self.runq_depends[task].add(dep)
513
514 # Step B - Mark all active tasks
515 #
516 # Start with the tasks we were asked to run and mark all dependencies
517 # as active too. If the task is to be 'forced', clear its stamp. Once
518 # all active tasks are marked, prune the ones we don't need.
519
520 bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
521
522 def mark_active(listid, depth):
523 """
524 Mark an item as active along with its depends
525 (calls itself recursively)
526 """
527
528 if runq_build[listid] == 1:
529 return
530
531 runq_build[listid] = 1
532
533 depends = self.runq_depends[listid]
534 for depend in depends:
535 mark_active(depend, depth+1)
536
537 self.target_pairs = []
538 for target in self.targets:
539 targetid = taskData.getbuild_id(target[0])
540
541 if targetid not in taskData.build_targets:
542 continue
543
544 if targetid in taskData.failed_deps:
545 continue
546
547 fnid = taskData.build_targets[targetid][0]
548 fn = taskData.fn_index[fnid]
549 self.target_pairs.append((fn, target[1]))
550
551 # Remove stamps for targets if force mode active
552 if self.cooker.configuration.force:
553 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
554 bb.build.del_stamp(target[1], self.dataCache, fn)
555
556 if fnid in taskData.failed_fnids:
557 continue
558
559 if target[1] not in taskData.tasks_lookup[fnid]:
560 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
561
562 listid = taskData.tasks_lookup[fnid][target[1]]
563
564 mark_active(listid, 1)
565
566 # Step C - Prune all inactive tasks
567 #
568 # Once all active tasks are marked, prune the ones we don't need.
569
570 maps = []
571 delcount = 0
572 for listid in range(len(self.runq_fnid)):
573 if runq_build[listid-delcount] == 1:
574 maps.append(listid-delcount)
575 else:
576 del self.runq_fnid[listid-delcount]
577 del self.runq_task[listid-delcount]
578 del self.runq_depends[listid-delcount]
579 del runq_build[listid-delcount]
580 del self.runq_revdeps[listid-delcount]
581 delcount = delcount + 1
582 maps.append(-1)
583
584 #
585 # Step D - Sanity checks and computation
586 #
587
588 # Check to make sure we still have tasks to run
589 if len(self.runq_fnid) == 0:
590 if not taskData.abort:
591 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
592 else:
593 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
594
595 bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
596
597 # Remap the dependencies to account for the deleted tasks
598 # Check we didn't delete a task we depend on
599 for listid in range(len(self.runq_fnid)):
600 newdeps = []
601 origdeps = self.runq_depends[listid]
602 for origdep in origdeps:
603 if maps[origdep] == -1:
604 bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
605 newdeps.append(maps[origdep])
606 self.runq_depends[listid] = set(newdeps)
607
608 bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
609
610 # Generate a list of reverse dependencies to ease future calculations
611 for listid in range(len(self.runq_fnid)):
612 for dep in self.runq_depends[listid]:
613 self.runq_revdeps[dep].add(listid)
614
615 # Identify tasks at the end of dependency chains
616 # Error on circular dependency loops (length two)
617 endpoints = []
618 for listid in range(len(self.runq_fnid)):
619 revdeps = self.runq_revdeps[listid]
620 if len(revdeps) == 0:
621 endpoints.append(listid)
622 for dep in revdeps:
623 if dep in self.runq_depends[listid]:
624 #self.dump_data(taskData)
625 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
626
627 bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
628
629 # Calculate task weights
630 # Check of higher length circular dependencies
631 self.runq_weight = self.calculate_task_weights(endpoints)
632
633 # Decide what order to execute the tasks in, pick a scheduler
634 #self.sched = RunQueueScheduler(self)
635 if self.scheduler == "completion":
636 self.sched = RunQueueSchedulerCompletion(self)
637 else:
638 self.sched = RunQueueSchedulerSpeed(self)
639
640 # Sanity Check - Check for multiple tasks building the same provider
641 prov_list = {}
642 seen_fn = []
643 for task in range(len(self.runq_fnid)):
644 fn = taskData.fn_index[self.runq_fnid[task]]
645 if fn in seen_fn:
646 continue
647 seen_fn.append(fn)
648 for prov in self.dataCache.fn_provides[fn]:
649 if prov not in prov_list:
650 prov_list[prov] = [fn]
651 elif fn not in prov_list[prov]:
652 prov_list[prov].append(fn)
653 error = False
654 for prov in prov_list:
655 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
656 error = True
657 bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
658 #if error:
659 # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
660
661
662 # Create a whitelist usable by the stamp checks
663 stampfnwhitelist = []
664 for entry in self.stampwhitelist.split():
665 entryid = self.taskData.getbuild_id(entry)
666 if entryid not in self.taskData.build_targets:
667 continue
668 fnid = self.taskData.build_targets[entryid][0]
669 fn = self.taskData.fn_index[fnid]
670 stampfnwhitelist.append(fn)
671 self.stampfnwhitelist = stampfnwhitelist
672
673 #self.dump_data(taskData)
674
675 self.state = runQueueRunInit
676
677 def check_stamps(self):
678 unchecked = {}
679 current = []
680 notcurrent = []
681 buildable = []
682
683 if self.stamppolicy == "perfile":
684 fulldeptree = False
685 else:
686 fulldeptree = True
687 stampwhitelist = []
688 if self.stamppolicy == "whitelist":
689 stampwhitelist = self.self.stampfnwhitelist
690
691 for task in range(len(self.runq_fnid)):
692 unchecked[task] = ""
693 if len(self.runq_depends[task]) == 0:
694 buildable.append(task)
695
696 def check_buildable(self, task, buildable):
697 for revdep in self.runq_revdeps[task]:
698 alldeps = 1
699 for dep in self.runq_depends[revdep]:
700 if dep in unchecked:
701 alldeps = 0
702 if alldeps == 1:
703 if revdep in unchecked:
704 buildable.append(revdep)
705
706 for task in range(len(self.runq_fnid)):
707 if task not in unchecked:
708 continue
709 fn = self.taskData.fn_index[self.runq_fnid[task]]
710 taskname = self.runq_task[task]
711 stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
712 # If the stamp is missing its not current
713 if not os.access(stampfile, os.F_OK):
714 del unchecked[task]
715 notcurrent.append(task)
716 check_buildable(self, task, buildable)
717 continue
718 # If its a 'nostamp' task, it's not current
719 taskdep = self.dataCache.task_deps[fn]
720 if 'nostamp' in taskdep and task in taskdep['nostamp']:
721 del unchecked[task]
722 notcurrent.append(task)
723 check_buildable(self, task, buildable)
724 continue
725
726 while (len(buildable) > 0):
727 nextbuildable = []
728 for task in buildable:
729 if task in unchecked:
730 fn = self.taskData.fn_index[self.runq_fnid[task]]
731 taskname = self.runq_task[task]
732 stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
733 iscurrent = True
734
735 t1 = os.stat(stampfile)[stat.ST_MTIME]
736 for dep in self.runq_depends[task]:
737 if iscurrent:
738 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
739 taskname2 = self.runq_task[dep]
740 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
741 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
742 if dep in notcurrent:
743 iscurrent = False
744 else:
745 t2 = os.stat(stampfile2)[stat.ST_MTIME]
746 if t1 < t2:
747 iscurrent = False
748 del unchecked[task]
749 if iscurrent:
750 current.append(task)
751 else:
752 notcurrent.append(task)
753
754 check_buildable(self, task, nextbuildable)
755
756 buildable = nextbuildable
757
758 #for task in range(len(self.runq_fnid)):
759 # fn = self.taskData.fn_index[self.runq_fnid[task]]
760 # taskname = self.runq_task[task]
761 # print "%s %s.%s" % (task, taskname, fn)
762
763 #print "Unchecked: %s" % unchecked
764 #print "Current: %s" % current
765 #print "Not current: %s" % notcurrent
766
767 if len(unchecked) > 0:
768 bb.fatal("check_stamps fatal internal error")
769 return current
770
771 def check_stamp_task(self, task):
772
773 if self.stamppolicy == "perfile":
774 fulldeptree = False
775 else:
776 fulldeptree = True
777 stampwhitelist = []
778 if self.stamppolicy == "whitelist":
779 stampwhitelist = self.stampfnwhitelist
780
781 fn = self.taskData.fn_index[self.runq_fnid[task]]
782 taskname = self.runq_task[task]
783 stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
784 # If the stamp is missing its not current
785 if not os.access(stampfile, os.F_OK):
786 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile)
787 return False
788 # If its a 'nostamp' task, it's not current
789 taskdep = self.dataCache.task_deps[fn]
790 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
791 bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
792 return False
793
794 iscurrent = True
795 t1 = os.stat(stampfile)[stat.ST_MTIME]
796 for dep in self.runq_depends[task]:
797 if iscurrent:
798 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
799 taskname2 = self.runq_task[dep]
800 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
801 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
802 try:
803 t2 = os.stat(stampfile2)[stat.ST_MTIME]
804 if t1 < t2:
805 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile,stampfile2))
806 iscurrent = False
807 except:
808 bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2 ,stampfile))
809 iscurrent = False
810
811 return iscurrent
812
813 def execute_runqueue(self):
814 """
815 Run the tasks in a queue prepared by prepare_runqueue
816 Upon failure, optionally try to recover the build using any alternate providers
817 (if the abort on failure configuration option isn't set)
818 """
819
820 if self.state is runQueuePrepare:
821 self.prepare_runqueue()
822
823 if self.state is runQueueRunInit:
824 bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
825 self.execute_runqueue_initVars()
826
827 if self.state is runQueueRunning:
828 self.execute_runqueue_internal()
829
830 if self.state is runQueueCleanUp:
831 self.finish_runqueue()
832
833 if self.state is runQueueFailed:
834 if not self.taskData.tryaltconfigs:
835 raise bb.runqueue.TaskFailure(self.failed_fnids)
836 for fnid in self.failed_fnids:
837 self.taskData.fail_fnid(fnid)
838 self.reset_runqueue()
839
840 if self.state is runQueueComplete:
841 # All done
842 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
843 return False
844
845 if self.state is runQueueChildProcess:
846 print "Child process"
847 return False
848
849 # Loop
850 return True
851
852 def execute_runqueue_initVars(self):
853
854 self.stats = RunQueueStats(len(self.runq_fnid))
855
856 self.runq_buildable = []
857 self.runq_running = []
858 self.runq_complete = []
859 self.build_pids = {}
860 self.build_pipes = {}
861 self.failed_fnids = []
862
863 # Mark initial buildable tasks
864 for task in range(self.stats.total):
865 self.runq_running.append(0)
866 self.runq_complete.append(0)
867 if len(self.runq_depends[task]) == 0:
868 self.runq_buildable.append(1)
869 else:
870 self.runq_buildable.append(0)
871
872 self.state = runQueueRunning
873
874 event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp), self.cfgData)
875
876 def task_complete(self, task):
877 """
878 Mark a task as completed
879 Look at the reverse dependencies and mark any task with
880 completed dependencies as buildable
881 """
882 self.runq_complete[task] = 1
883 for revdep in self.runq_revdeps[task]:
884 if self.runq_running[revdep] == 1:
885 continue
886 if self.runq_buildable[revdep] == 1:
887 continue
888 alldeps = 1
889 for dep in self.runq_depends[revdep]:
890 if self.runq_complete[dep] != 1:
891 alldeps = 0
892 if alldeps == 1:
893 self.runq_buildable[revdep] = 1
894 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
895 taskname = self.runq_task[revdep]
896 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
897
898 def task_fail(self, task, exitcode):
899 """
900 Called when a task has failed
901 Updates the state engine with the failure
902 """
903 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode))
904 self.stats.taskFailed()
905 fnid = self.runq_fnid[task]
906 self.failed_fnids.append(fnid)
907 bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
908 if self.taskData.abort:
909 self.state = runQueueCleanup
910
911 def execute_runqueue_internal(self):
912 """
913 Run the tasks in a queue prepared by prepare_runqueue
914 """
915
916 if self.stats.total == 0:
917 # nothing to do
918 self.state = runQueueCleanup
919
920 while True:
921 task = None
922 if self.stats.active < self.number_tasks:
923 task = self.sched.next()
924 if task is not None:
925 fn = self.taskData.fn_index[self.runq_fnid[task]]
926
927 taskname = self.runq_task[task]
928 if self.check_stamp_task(task):
929 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
930 self.runq_running[task] = 1
931 self.runq_buildable[task] = 1
932 self.task_complete(task)
933 self.stats.taskCompleted()
934 self.stats.taskSkipped()
935 continue
936
937 sys.stdout.flush()
938 sys.stderr.flush()
939 try:
940 pipein, pipeout = os.pipe()
941 pid = os.fork()
942 except OSError, e:
943 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
944 if pid == 0:
945 os.close(pipein)
946 # Save out the PID so that the event can include it the
947 # events
948 bb.event.worker_pid = os.getpid()
949 bb.event.worker_pipe = pipeout
950
951 self.state = runQueueChildProcess
952 # Make the child the process group leader
953 os.setpgid(0, 0)
954 # No stdin
955 newsi = os.open('/dev/null', os.O_RDWR)
956 os.dup2(newsi, sys.stdin.fileno())
957
958 bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
959 bb.msg.note(1, bb.msg.domain.RunQueue,
960 "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1,
961 self.stats.total,
962 task,
963 self.get_user_idstring(task)))
964
965 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
966 try:
967 self.cooker.tryBuild(fn, taskname[3:])
968 except bb.build.EventException:
969 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
970 os._exit(1)
971 except:
972 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
973 os._exit(1)
974 os._exit(0)
975
976 self.build_pids[pid] = task
977 self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
978 self.runq_running[task] = 1
979 self.stats.taskActive()
980 if self.stats.active < self.number_tasks:
981 continue
982
983 for pipe in self.build_pipes:
984 self.build_pipes[pipe].read()
985
986 if self.stats.active > 0:
987 result = os.waitpid(-1, os.WNOHANG)
988 if result[0] is 0 and result[1] is 0:
989 return
990 task = self.build_pids[result[0]]
991 del self.build_pids[result[0]]
992 self.build_pipes[result[0]].close()
993 del self.build_pipes[result[0]]
994 if result[1] != 0:
995 self.task_fail(task, result[1])
996 return
997 self.task_complete(task)
998 self.stats.taskCompleted()
999 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
1000 continue
1001
1002 if len(self.failed_fnids) != 0:
1003 self.state = runQueueFailed
1004 return
1005
1006 # Sanity Checks
1007 for task in range(self.stats.total):
1008 if self.runq_buildable[task] == 0:
1009 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
1010 if self.runq_running[task] == 0:
1011 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
1012 if self.runq_complete[task] == 0:
1013 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
1014 self.state = runQueueComplete
1015 return
1016
1017 def finish_runqueue_now(self):
1018 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active)
1019 for k, v in self.build_pids.iteritems():
1020 try:
1021 os.kill(-k, signal.SIGINT)
1022 except:
1023 pass
1024 for pipe in self.build_pipes:
1025 self.build_pipes[pipe].read()
1026
1027 def finish_runqueue(self, now = False):
1028 self.state = runQueueCleanUp
1029 if now:
1030 self.finish_runqueue_now()
1031 try:
1032 while self.stats.active > 0:
1033 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1034 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active)
1035 tasknum = 1
1036 for k, v in self.build_pids.iteritems():
1037 bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
1038 tasknum = tasknum + 1
1039 result = os.waitpid(-1, os.WNOHANG)
1040 if result[0] is 0 and result[1] is 0:
1041 return
1042 task = self.build_pids[result[0]]
1043 del self.build_pids[result[0]]
1044 self.build_pipes[result[0]].close()
1045 del self.build_pipes[result[0]]
1046 if result[1] != 0:
1047 self.task_fail(task, result[1])
1048 else:
1049 self.stats.taskCompleted()
1050 bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
1051 except:
1052 self.finish_runqueue_now()
1053 raise
1054
1055 if len(self.failed_fnids) != 0:
1056 self.state = runQueueFailed
1057 return
1058
1059 self.state = runQueueComplete
1060 return
1061
1062 def dump_data(self, taskQueue):
1063 """
1064 Dump some debug information on the internal data structures
1065 """
1066 bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
1067 for task in range(len(self.runq_task)):
1068 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
1069 taskQueue.fn_index[self.runq_fnid[task]],
1070 self.runq_task[task],
1071 self.runq_weight[task],
1072 self.runq_depends[task],
1073 self.runq_revdeps[task]))
1074
1075 bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
1076 for task1 in range(len(self.runq_task)):
1077 if task1 in self.prio_map:
1078 task = self.prio_map[task1]
1079 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
1080 taskQueue.fn_index[self.runq_fnid[task]],
1081 self.runq_task[task],
1082 self.runq_weight[task],
1083 self.runq_depends[task],
1084 self.runq_revdeps[task]))
1085
1086
1087class TaskFailure(Exception):
1088 """
1089 Exception raised when a task in a runqueue fails
1090 """
1091 def __init__(self, x):
1092 self.args = x
1093
1094
1095class runQueueExitWait(bb.event.Event):
1096 """
1097 Event when waiting for task processes to exit
1098 """
1099
1100 def __init__(self, remain):
1101 self.remain = remain
1102 self.message = "Waiting for %s active tasks to finish" % remain
1103 bb.event.Event.__init__(self)
1104
1105class runQueueEvent(bb.event.Event):
1106 """
1107 Base runQueue event class
1108 """
1109 def __init__(self, task, stats, rq):
1110 self.taskid = task
1111 self.taskstring = rq.get_user_idstring(task)
1112 self.stats = stats
1113 bb.event.Event.__init__(self)
1114
1115class runQueueTaskStarted(runQueueEvent):
1116 """
1117 Event notifing a task was started
1118 """
1119 def __init__(self, task, stats, rq):
1120 runQueueEvent.__init__(self, task, stats, rq)
1121 self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring)
1122
1123class runQueueTaskFailed(runQueueEvent):
1124 """
1125 Event notifing a task failed
1126 """
1127 def __init__(self, task, stats, rq):
1128 runQueueEvent.__init__(self, task, stats, rq)
1129 self.message = "Task %s failed (%s)" % (task, self.taskstring)
1130
1131class runQueueTaskCompleted(runQueueEvent):
1132 """
1133 Event notifing a task completed
1134 """
1135 def __init__(self, task, stats, rq):
1136 runQueueEvent.__init__(self, task, stats, rq)
1137 self.message = "Task %s completed (%s)" % (task, self.taskstring)
1138
1139def check_stamp_fn(fn, taskname, d):
1140 rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1141 fnid = rq.taskData.getfn_id(fn)
1142 taskid = rq.get_task_id(fnid, taskname)
1143 if taskid is not None:
1144 return rq.check_stamp_task(taskid)
1145 return None
1146
1147class runQueuePipe():
1148 """
1149 Abstraction for a pipe between a worker thread and the server
1150 """
1151 def __init__(self, pipein, pipeout, d):
1152 self.fd = pipein
1153 os.close(pipeout)
1154 self.queue = ""
1155 self.d = d
1156
1157 def read(self):
1158 start = len(self.queue)
1159 self.queue = self.queue + os.read(self.fd, 1024)
1160 end = len(self.queue)
1161 index = self.queue.find("</event>")
1162 while index != -1:
1163 bb.event.fire_from_worker(self.queue[:index+8], self.d)
1164 self.queue = self.queue[index+8:]
1165 index = self.queue.find("</event>")
1166 return (end > start)
1167
1168 def close(self):
1169 while self.read():
1170 continue
1171 if len(self.queue) > 0:
1172 print "Warning, worker left partial message"
1173 os.close(self.fd)
1174