summaryrefslogtreecommitdiffstats
path: root/bitbake-dev/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <richard@openedhand.com>2008-09-30 15:08:33 +0000
committerRichard Purdie <richard@openedhand.com>2008-09-30 15:08:33 +0000
commitc30eddb243e7e65f67f656e62848a033cf6f2e5c (patch)
tree110dd95788b76f55d31cb8d30aac2de8400b6f4a /bitbake-dev/lib/bb/runqueue.py
parent5ef0510474004eeb2ae8a99b64e2febb1920e077 (diff)
downloadpoky-c30eddb243e7e65f67f656e62848a033cf6f2e5c.tar.gz
Add bitbake-dev to allow ease of testing and development of bitbake trunk
git-svn-id: https://svn.o-hand.com/repos/poky/trunk@5337 311d38ba-8fff-0310-9ca6-ca027cbcb966
Diffstat (limited to 'bitbake-dev/lib/bb/runqueue.py')
-rw-r--r--bitbake-dev/lib/bb/runqueue.py1157
1 files changed, 1157 insertions, 0 deletions
diff --git a/bitbake-dev/lib/bb/runqueue.py b/bitbake-dev/lib/bb/runqueue.py
new file mode 100644
index 0000000000..4130b50641
--- /dev/null
+++ b/bitbake-dev/lib/bb/runqueue.py
@@ -0,0 +1,1157 @@
1#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25from bb import msg, data, event, mkdirhier, utils
26from sets import Set
27import bb, os, sys
28import signal
29import stat
30
31class TaskFailure(Exception):
32 """Exception raised when a task in a runqueue fails"""
33 def __init__(self, x):
34 self.args = x
35
36
37class RunQueueStats:
38 """
39 Holds statistics on the tasks handled by the associated runQueue
40 """
41 def __init__(self, total):
42 self.completed = 0
43 self.skipped = 0
44 self.failed = 0
45 self.active = 0
46 self.total = total
47
48 def taskFailed(self):
49 self.active = self.active - 1
50 self.failed = self.failed + 1
51
52 def taskCompleted(self, number = 1):
53 self.active = self.active - number
54 self.completed = self.completed + number
55
56 def taskSkipped(self, number = 1):
57 self.active = self.active + number
58 self.skipped = self.skipped + number
59
60 def taskActive(self):
61 self.active = self.active + 1
62
63# These values indicate the next step due to be run in the
64# runQueue state machine
65runQueuePrepare = 2
66runQueueRunInit = 3
67runQueueRunning = 4
68runQueueFailed = 6
69runQueueCleanUp = 7
70runQueueComplete = 8
71runQueueChildProcess = 9
72
73class RunQueueScheduler:
74 """
75 Control the order tasks are scheduled in.
76 """
77 def __init__(self, runqueue):
78 """
79 The default scheduler just returns the first buildable task (the
80 priority map is sorted by task numer)
81 """
82 self.rq = runqueue
83 numTasks = len(self.rq.runq_fnid)
84
85 self.prio_map = []
86 self.prio_map.extend(range(numTasks))
87
88 def next(self):
89 """
90 Return the id of the first task we find that is buildable
91 """
92 for task1 in range(len(self.rq.runq_fnid)):
93 task = self.prio_map[task1]
94 if self.rq.runq_running[task] == 1:
95 continue
96 if self.rq.runq_buildable[task] == 1:
97 return task
98
99class RunQueueSchedulerSpeed(RunQueueScheduler):
100 """
101 A scheduler optimised for speed. The priority map is sorted by task weight,
102 heavier weighted tasks (tasks needed by the most other tasks) are run first.
103 """
104 def __init__(self, runqueue):
105 """
106 The priority map is sorted by task weight.
107 """
108 from copy import deepcopy
109
110 self.rq = runqueue
111
112 sortweight = deepcopy(self.rq.runq_weight)
113 sortweight.sort()
114 copyweight = deepcopy(self.rq.runq_weight)
115 self.prio_map = []
116
117 for weight in sortweight:
118 idx = copyweight.index(weight)
119 self.prio_map.append(idx)
120 copyweight[idx] = -1
121
122 self.prio_map.reverse()
123
124class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
125 """
126 A scheduler optimised to complete .bb files are quickly as possible. The
127 priority map is sorted by task weight, but then reordered so once a given
128 .bb file starts to build, its completed as quickly as possible. This works
129 well where disk space is at a premium and classes like OE's rm_work are in
130 force.
131 """
132 def __init__(self, runqueue):
133 RunQueueSchedulerSpeed.__init__(self, runqueue)
134 from copy import deepcopy
135
136 #FIXME - whilst this groups all fnids together it does not reorder the
137 #fnid groups optimally.
138
139 basemap = deepcopy(self.prio_map)
140 self.prio_map = []
141 while (len(basemap) > 0):
142 entry = basemap.pop(0)
143 self.prio_map.append(entry)
144 fnid = self.rq.runq_fnid[entry]
145 todel = []
146 for entry in basemap:
147 entry_fnid = self.rq.runq_fnid[entry]
148 if entry_fnid == fnid:
149 todel.append(basemap.index(entry))
150 self.prio_map.append(entry)
151 todel.reverse()
152 for idx in todel:
153 del basemap[idx]
154
155class RunQueue:
156 """
157 BitBake Run Queue implementation
158 """
159 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
160 self.reset_runqueue()
161 self.cooker = cooker
162 self.dataCache = dataCache
163 self.taskData = taskData
164 self.cfgData = cfgData
165 self.targets = targets
166
167 self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
168 self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData, 1) or "").split()
169 self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
170 self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
171 self.stampwhitelist = bb.data.getVar("BB_STAMP_WHITELIST", cfgData, 1) or ""
172
173 def reset_runqueue(self):
174 self.runq_fnid = []
175 self.runq_task = []
176 self.runq_depends = []
177 self.runq_revdeps = []
178
179 self.state = runQueuePrepare
180
181 def get_user_idstring(self, task):
182 fn = self.taskData.fn_index[self.runq_fnid[task]]
183 taskname = self.runq_task[task]
184 return "%s, %s" % (fn, taskname)
185
186 def get_task_id(self, fnid, taskname):
187 for listid in range(len(self.runq_fnid)):
188 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
189 return listid
190 return None
191
192 def circular_depchains_handler(self, tasks):
193 """
194 Some tasks aren't buildable, likely due to circular dependency issues.
195 Identify the circular dependencies and print them in a user readable format.
196 """
197 from copy import deepcopy
198
199 valid_chains = []
200 explored_deps = {}
201 msgs = []
202
203 def chain_reorder(chain):
204 """
205 Reorder a dependency chain so the lowest task id is first
206 """
207 lowest = 0
208 new_chain = []
209 for entry in range(len(chain)):
210 if chain[entry] < chain[lowest]:
211 lowest = entry
212 new_chain.extend(chain[lowest:])
213 new_chain.extend(chain[:lowest])
214 return new_chain
215
216 def chain_compare_equal(chain1, chain2):
217 """
218 Compare two dependency chains and see if they're the same
219 """
220 if len(chain1) != len(chain2):
221 return False
222 for index in range(len(chain1)):
223 if chain1[index] != chain2[index]:
224 return False
225 return True
226
227 def chain_array_contains(chain, chain_array):
228 """
229 Return True if chain_array contains chain
230 """
231 for ch in chain_array:
232 if chain_compare_equal(ch, chain):
233 return True
234 return False
235
236 def find_chains(taskid, prev_chain):
237 prev_chain.append(taskid)
238 total_deps = []
239 total_deps.extend(self.runq_revdeps[taskid])
240 for revdep in self.runq_revdeps[taskid]:
241 if revdep in prev_chain:
242 idx = prev_chain.index(revdep)
243 # To prevent duplicates, reorder the chain to start with the lowest taskid
244 # and search through an array of those we've already printed
245 chain = prev_chain[idx:]
246 new_chain = chain_reorder(chain)
247 if not chain_array_contains(new_chain, valid_chains):
248 valid_chains.append(new_chain)
249 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
250 for dep in new_chain:
251 msgs.append(" Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
252 msgs.append("\n")
253 if len(valid_chains) > 10:
254 msgs.append("Aborted dependency loops search after 10 matches.\n")
255 return msgs
256 continue
257 scan = False
258 if revdep not in explored_deps:
259 scan = True
260 elif revdep in explored_deps[revdep]:
261 scan = True
262 else:
263 for dep in prev_chain:
264 if dep in explored_deps[revdep]:
265 scan = True
266 if scan:
267 find_chains(revdep, deepcopy(prev_chain))
268 for dep in explored_deps[revdep]:
269 if dep not in total_deps:
270 total_deps.append(dep)
271
272 explored_deps[taskid] = total_deps
273
274 for task in tasks:
275 find_chains(task, [])
276
277 return msgs
278
279 def calculate_task_weights(self, endpoints):
280 """
281 Calculate a number representing the "weight" of each task. Heavier weighted tasks
282 have more dependencies and hence should be executed sooner for maximum speed.
283
284 This function also sanity checks the task list finding tasks that its not
285 possible to execute due to circular dependencies.
286 """
287
288 numTasks = len(self.runq_fnid)
289 weight = []
290 deps_left = []
291 task_done = []
292
293 for listid in range(numTasks):
294 task_done.append(False)
295 weight.append(0)
296 deps_left.append(len(self.runq_revdeps[listid]))
297
298 for listid in endpoints:
299 weight[listid] = 1
300 task_done[listid] = True
301
302 while 1:
303 next_points = []
304 for listid in endpoints:
305 for revdep in self.runq_depends[listid]:
306 weight[revdep] = weight[revdep] + weight[listid]
307 deps_left[revdep] = deps_left[revdep] - 1
308 if deps_left[revdep] == 0:
309 next_points.append(revdep)
310 task_done[revdep] = True
311 endpoints = next_points
312 if len(next_points) == 0:
313 break
314
315 # Circular dependency sanity check
316 problem_tasks = []
317 for task in range(numTasks):
318 if task_done[task] is False or deps_left[task] != 0:
319 problem_tasks.append(task)
320 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
321 bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
322
323 if problem_tasks:
324 message = "Unbuildable tasks were found.\n"
325 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
326 message = message + "Identifying dependency loops (this may take a short while)...\n"
327 bb.msg.error(bb.msg.domain.RunQueue, message)
328
329 msgs = self.circular_depchains_handler(problem_tasks)
330
331 message = "\n"
332 for msg in msgs:
333 message = message + msg
334 bb.msg.fatal(bb.msg.domain.RunQueue, message)
335
336 return weight
337
338 def prepare_runqueue(self):
339 """
340 Turn a set of taskData into a RunQueue and compute data needed
341 to optimise the execution order.
342 """
343
344 depends = []
345 runq_build = []
346 recursive_tdepends = {}
347
348 taskData = self.taskData
349
350 if len(taskData.tasks_name) == 0:
351 # Nothing to do
352 return
353
354 bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
355
356 # Step A - Work out a list of tasks to run
357 #
358 # Taskdata gives us a list of possible providers for a every target
359 # ordered by priority (build_targets, run_targets). It also gives
360 # information on each of those providers.
361 #
362 # To create the actual list of tasks to execute we fix the list of
363 # providers and then resolve the dependencies into task IDs. This
364 # process is repeated for each type of dependency (tdepends, deptask,
365 # rdeptast, recrdeptask, idepends).
366
367 for task in range(len(taskData.tasks_name)):
368 fnid = taskData.tasks_fnid[task]
369 fn = taskData.fn_index[fnid]
370 task_deps = self.dataCache.task_deps[fn]
371
372 if fnid not in taskData.failed_fnids:
373
374 # Resolve task internal dependencies
375 #
376 # e.g. addtask before X after Y
377 depends = taskData.tasks_tdepends[task]
378
379 # Resolve 'deptask' dependencies
380 #
381 # e.g. do_sometask[deptask] = "do_someothertask"
382 # (makes sure sometask runs after someothertask of all DEPENDS)
383 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
384 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
385 for depid in taskData.depids[fnid]:
386 # Won't be in build_targets if ASSUME_PROVIDED
387 if depid in taskData.build_targets:
388 depdata = taskData.build_targets[depid][0]
389 if depdata is not None:
390 dep = taskData.fn_index[depdata]
391 for taskname in tasknames:
392 depends.append(taskData.gettask_id(dep, taskname))
393
394 # Resolve 'rdeptask' dependencies
395 #
396 # e.g. do_sometask[rdeptask] = "do_someothertask"
397 # (makes sure sometask runs after someothertask of all RDEPENDS)
398 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
399 taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
400 for depid in taskData.rdepids[fnid]:
401 if depid in taskData.run_targets:
402 depdata = taskData.run_targets[depid][0]
403 if depdata is not None:
404 dep = taskData.fn_index[depdata]
405 depends.append(taskData.gettask_id(dep, taskname))
406
407 # Resolve inter-task dependencies
408 #
409 # e.g. do_sometask[depends] = "targetname:do_someothertask"
410 # (makes sure sometask runs after targetname's someothertask)
411 idepends = taskData.tasks_idepends[task]
412 for (depid, idependtask) in idepends:
413 if depid in taskData.build_targets:
414 # Won't be in build_targets if ASSUME_PROVIDED
415 depdata = taskData.build_targets[depid][0]
416 if depdata is not None:
417 dep = taskData.fn_index[depdata]
418 depends.append(taskData.gettask_id(dep, idependtask))
419
420 # Create a list of recursive dependent tasks (from tdepends) and cache
421 def get_recursive_tdepends(task):
422 if not task:
423 return []
424 if task in recursive_tdepends:
425 return recursive_tdepends[task]
426
427 fnid = taskData.tasks_fnid[task]
428 taskids = taskData.gettask_ids(fnid)
429
430 rectdepends = taskids
431 nextdeps = taskids
432 while len(nextdeps) != 0:
433 newdeps = []
434 for nextdep in nextdeps:
435 for tdepend in taskData.tasks_tdepends[nextdep]:
436 if tdepend not in rectdepends:
437 rectdepends.append(tdepend)
438 newdeps.append(tdepend)
439 nextdeps = newdeps
440 recursive_tdepends[task] = rectdepends
441 return rectdepends
442
443 # Using the list of tdepends for this task create a list of
444 # the recursive idepends we have
445 def get_recursive_idepends(task):
446 if not task:
447 return []
448 rectdepends = get_recursive_tdepends(task)
449
450 recidepends = []
451 for tdepend in rectdepends:
452 for idepend in taskData.tasks_idepends[tdepend]:
453 recidepends.append(idepend)
454 return recidepends
455
456 def add_recursive_build(depid, depfnid):
457 """
458 Add build depends of depid to depends
459 (if we've not see it before)
460 (calls itself recursively)
461 """
462 if str(depid) in dep_seen:
463 return
464 dep_seen.append(depid)
465 if depid in taskData.build_targets:
466 depdata = taskData.build_targets[depid][0]
467 if depdata is not None:
468 dep = taskData.fn_index[depdata]
469 # Need to avoid creating new tasks here
470 taskid = taskData.gettask_id(dep, taskname, False)
471 if taskid is not None:
472 depends.append(taskid)
473 fnid = taskData.tasks_fnid[taskid]
474 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
475 else:
476 fnid = taskData.getfn_id(dep)
477 for nextdepid in taskData.depids[fnid]:
478 if nextdepid not in dep_seen:
479 add_recursive_build(nextdepid, fnid)
480 for nextdepid in taskData.rdepids[fnid]:
481 if nextdepid not in rdep_seen:
482 add_recursive_run(nextdepid, fnid)
483 for (idependid, idependtask) in get_recursive_idepends(taskid):
484 if idependid not in dep_seen:
485 add_recursive_build(idependid, fnid)
486
487 def add_recursive_run(rdepid, depfnid):
488 """
489 Add runtime depends of rdepid to depends
490 (if we've not see it before)
491 (calls itself recursively)
492 """
493 if str(rdepid) in rdep_seen:
494 return
495 rdep_seen.append(rdepid)
496 if rdepid in taskData.run_targets:
497 depdata = taskData.run_targets[rdepid][0]
498 if depdata is not None:
499 dep = taskData.fn_index[depdata]
500 # Need to avoid creating new tasks here
501 taskid = taskData.gettask_id(dep, taskname, False)
502 if taskid is not None:
503 depends.append(taskid)
504 fnid = taskData.tasks_fnid[taskid]
505 #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
506 else:
507 fnid = taskData.getfn_id(dep)
508 for nextdepid in taskData.depids[fnid]:
509 if nextdepid not in dep_seen:
510 add_recursive_build(nextdepid, fnid)
511 for nextdepid in taskData.rdepids[fnid]:
512 if nextdepid not in rdep_seen:
513 add_recursive_run(nextdepid, fnid)
514 for (idependid, idependtask) in get_recursive_idepends(taskid):
515 if idependid not in dep_seen:
516 add_recursive_build(idependid, fnid)
517
518 # Resolve recursive 'recrdeptask' dependencies
519 #
520 # e.g. do_sometask[recrdeptask] = "do_someothertask"
521 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
522 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
523 for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
524 dep_seen = []
525 rdep_seen = []
526 idep_seen = []
527 for depid in taskData.depids[fnid]:
528 add_recursive_build(depid, fnid)
529 for rdepid in taskData.rdepids[fnid]:
530 add_recursive_run(rdepid, fnid)
531 deptaskid = taskData.gettask_id(fn, taskname, False)
532 for (idependid, idependtask) in get_recursive_idepends(deptaskid):
533 add_recursive_build(idependid, fnid)
534
535 # Rmove all self references
536 if task in depends:
537 newdep = []
538 bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
539 for dep in depends:
540 if task != dep:
541 newdep.append(dep)
542 depends = newdep
543
544
545 self.runq_fnid.append(taskData.tasks_fnid[task])
546 self.runq_task.append(taskData.tasks_name[task])
547 self.runq_depends.append(Set(depends))
548 self.runq_revdeps.append(Set())
549
550 runq_build.append(0)
551
552 # Step B - Mark all active tasks
553 #
554 # Start with the tasks we were asked to run and mark all dependencies
555 # as active too. If the task is to be 'forced', clear its stamp. Once
556 # all active tasks are marked, prune the ones we don't need.
557
558 bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
559
560 def mark_active(listid, depth):
561 """
562 Mark an item as active along with its depends
563 (calls itself recursively)
564 """
565
566 if runq_build[listid] == 1:
567 return
568
569 runq_build[listid] = 1
570
571 depends = self.runq_depends[listid]
572 for depend in depends:
573 mark_active(depend, depth+1)
574
575 self.target_pairs = []
576 for target in self.targets:
577 targetid = taskData.getbuild_id(target[0])
578
579 if targetid not in taskData.build_targets:
580 continue
581
582 if targetid in taskData.failed_deps:
583 continue
584
585 fnid = taskData.build_targets[targetid][0]
586 fn = taskData.fn_index[fnid]
587 self.target_pairs.append((fn, target[1]))
588
589 # Remove stamps for targets if force mode active
590 if self.cooker.configuration.force:
591 bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
592 bb.build.del_stamp(target[1], self.dataCache, fn)
593
594 if fnid in taskData.failed_fnids:
595 continue
596
597 if target[1] not in taskData.tasks_lookup[fnid]:
598 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
599
600 listid = taskData.tasks_lookup[fnid][target[1]]
601
602 mark_active(listid, 1)
603
604 # Step C - Prune all inactive tasks
605 #
606 # Once all active tasks are marked, prune the ones we don't need.
607
608 maps = []
609 delcount = 0
610 for listid in range(len(self.runq_fnid)):
611 if runq_build[listid-delcount] == 1:
612 maps.append(listid-delcount)
613 else:
614 del self.runq_fnid[listid-delcount]
615 del self.runq_task[listid-delcount]
616 del self.runq_depends[listid-delcount]
617 del runq_build[listid-delcount]
618 del self.runq_revdeps[listid-delcount]
619 delcount = delcount + 1
620 maps.append(-1)
621
622 #
623 # Step D - Sanity checks and computation
624 #
625
626 # Check to make sure we still have tasks to run
627 if len(self.runq_fnid) == 0:
628 if not taskData.abort:
629 bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
630 else:
631 bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
632
633 bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
634
635 # Remap the dependencies to account for the deleted tasks
636 # Check we didn't delete a task we depend on
637 for listid in range(len(self.runq_fnid)):
638 newdeps = []
639 origdeps = self.runq_depends[listid]
640 for origdep in origdeps:
641 if maps[origdep] == -1:
642 bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
643 newdeps.append(maps[origdep])
644 self.runq_depends[listid] = Set(newdeps)
645
646 bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
647
648 # Generate a list of reverse dependencies to ease future calculations
649 for listid in range(len(self.runq_fnid)):
650 for dep in self.runq_depends[listid]:
651 self.runq_revdeps[dep].add(listid)
652
653 # Identify tasks at the end of dependency chains
654 # Error on circular dependency loops (length two)
655 endpoints = []
656 for listid in range(len(self.runq_fnid)):
657 revdeps = self.runq_revdeps[listid]
658 if len(revdeps) == 0:
659 endpoints.append(listid)
660 for dep in revdeps:
661 if dep in self.runq_depends[listid]:
662 #self.dump_data(taskData)
663 bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
664
665 bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
666
667 # Calculate task weights
668 # Check of higher length circular dependencies
669 self.runq_weight = self.calculate_task_weights(endpoints)
670
671 # Decide what order to execute the tasks in, pick a scheduler
672 #self.sched = RunQueueScheduler(self)
673 if self.scheduler == "completion":
674 self.sched = RunQueueSchedulerCompletion(self)
675 else:
676 self.sched = RunQueueSchedulerSpeed(self)
677
678 # Sanity Check - Check for multiple tasks building the same provider
679 prov_list = {}
680 seen_fn = []
681 for task in range(len(self.runq_fnid)):
682 fn = taskData.fn_index[self.runq_fnid[task]]
683 if fn in seen_fn:
684 continue
685 seen_fn.append(fn)
686 for prov in self.dataCache.fn_provides[fn]:
687 if prov not in prov_list:
688 prov_list[prov] = [fn]
689 elif fn not in prov_list[prov]:
690 prov_list[prov].append(fn)
691 error = False
692 for prov in prov_list:
693 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
694 error = True
695 bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
696 #if error:
697 # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
698
699
700 # Create a whitelist usable by the stamp checks
701 stampfnwhitelist = []
702 for entry in self.stampwhitelist.split():
703 entryid = self.taskData.getbuild_id(entry)
704 if entryid not in self.taskData.build_targets:
705 continue
706 fnid = self.taskData.build_targets[entryid][0]
707 fn = self.taskData.fn_index[fnid]
708 stampfnwhitelist.append(fn)
709 self.stampfnwhitelist = stampfnwhitelist
710
711 #self.dump_data(taskData)
712
713 self.state = runQueueRunInit
714
715 def check_stamps(self):
716 unchecked = {}
717 current = []
718 notcurrent = []
719 buildable = []
720
721 if self.stamppolicy == "perfile":
722 fulldeptree = False
723 else:
724 fulldeptree = True
725 stampwhitelist = []
726 if self.stamppolicy == "whitelist":
727 stampwhitelist = self.self.stampfnwhitelist
728
729 for task in range(len(self.runq_fnid)):
730 unchecked[task] = ""
731 if len(self.runq_depends[task]) == 0:
732 buildable.append(task)
733
734 def check_buildable(self, task, buildable):
735 for revdep in self.runq_revdeps[task]:
736 alldeps = 1
737 for dep in self.runq_depends[revdep]:
738 if dep in unchecked:
739 alldeps = 0
740 if alldeps == 1:
741 if revdep in unchecked:
742 buildable.append(revdep)
743
744 for task in range(len(self.runq_fnid)):
745 if task not in unchecked:
746 continue
747 fn = self.taskData.fn_index[self.runq_fnid[task]]
748 taskname = self.runq_task[task]
749 stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
750 # If the stamp is missing its not current
751 if not os.access(stampfile, os.F_OK):
752 del unchecked[task]
753 notcurrent.append(task)
754 check_buildable(self, task, buildable)
755 continue
756 # If its a 'nostamp' task, it's not current
757 taskdep = self.dataCache.task_deps[fn]
758 if 'nostamp' in taskdep and task in taskdep['nostamp']:
759 del unchecked[task]
760 notcurrent.append(task)
761 check_buildable(self, task, buildable)
762 continue
763
764 while (len(buildable) > 0):
765 nextbuildable = []
766 for task in buildable:
767 if task in unchecked:
768 fn = self.taskData.fn_index[self.runq_fnid[task]]
769 taskname = self.runq_task[task]
770 stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
771 iscurrent = True
772
773 t1 = os.stat(stampfile)[stat.ST_MTIME]
774 for dep in self.runq_depends[task]:
775 if iscurrent:
776 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
777 taskname2 = self.runq_task[dep]
778 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
779 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
780 if dep in notcurrent:
781 iscurrent = False
782 else:
783 t2 = os.stat(stampfile2)[stat.ST_MTIME]
784 if t1 < t2:
785 iscurrent = False
786 del unchecked[task]
787 if iscurrent:
788 current.append(task)
789 else:
790 notcurrent.append(task)
791
792 check_buildable(self, task, nextbuildable)
793
794 buildable = nextbuildable
795
796 #for task in range(len(self.runq_fnid)):
797 # fn = self.taskData.fn_index[self.runq_fnid[task]]
798 # taskname = self.runq_task[task]
799 # print "%s %s.%s" % (task, taskname, fn)
800
801 #print "Unchecked: %s" % unchecked
802 #print "Current: %s" % current
803 #print "Not current: %s" % notcurrent
804
805 if len(unchecked) > 0:
806 bb.fatal("check_stamps fatal internal error")
807 return current
808
809 def check_stamp_task(self, task):
810
811 if self.stamppolicy == "perfile":
812 fulldeptree = False
813 else:
814 fulldeptree = True
815 stampwhitelist = []
816 if self.stamppolicy == "whitelist":
817 stampwhitelist = self.stampfnwhitelist
818
819 fn = self.taskData.fn_index[self.runq_fnid[task]]
820 taskname = self.runq_task[task]
821 stampfile = "%s.%s" % (self.dataCache.stamp[fn], taskname)
822 # If the stamp is missing its not current
823 if not os.access(stampfile, os.F_OK):
824 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s not available\n" % stampfile)
825 return False
826 # If its a 'nostamp' task, it's not current
827 taskdep = self.dataCache.task_deps[fn]
828 if 'nostamp' in taskdep and task in taskdep['nostamp']:
829 bb.msg.debug(2, bb.msg.domain.RunQueue, "%s.%s is nostamp\n" % (fn, taskname))
830 return False
831
832 iscurrent = True
833 t1 = os.stat(stampfile)[stat.ST_MTIME]
834 for dep in self.runq_depends[task]:
835 if iscurrent:
836 fn2 = self.taskData.fn_index[self.runq_fnid[dep]]
837 taskname2 = self.runq_task[dep]
838 stampfile2 = "%s.%s" % (self.dataCache.stamp[fn2], taskname2)
839 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
840 try:
841 t2 = os.stat(stampfile2)[stat.ST_MTIME]
842 if t1 < t2:
843 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stampfile %s < %s" % (stampfile,stampfile2))
844 iscurrent = False
845 except:
846 bb.msg.debug(2, bb.msg.domain.RunQueue, "Exception reading %s for %s" % (stampfile2 ,stampfile))
847 iscurrent = False
848
849 return iscurrent
850
851 def execute_runqueue(self):
852 """
853 Run the tasks in a queue prepared by prepare_runqueue
854 Upon failure, optionally try to recover the build using any alternate providers
855 (if the abort on failure configuration option isn't set)
856 """
857
858 if self.state is runQueuePrepare:
859 self.prepare_runqueue()
860
861 if self.state is runQueueRunInit:
862 bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
863 self.execute_runqueue_initVars()
864
865 if self.state is runQueueRunning:
866 self.execute_runqueue_internal()
867
868 if self.state is runQueueCleanUp:
869 self.finish_runqueue()
870
871 if self.state is runQueueFailed:
872 if self.taskData.abort:
873 raise bb.runqueue.TaskFailure(self.failed_fnids)
874 for fnid in self.failed_fnids:
875 self.taskData.fail_fnid(fnid)
876 self.reset_runqueue()
877
878 if self.state is runQueueComplete:
879 # All done
880 bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
881 return False
882
883 if self.state is runQueueChildProcess:
884 print "Child process"
885 return False
886
887 # Loop
888 return True
889
890 def execute_runqueue_initVars(self):
891
892 self.stats = RunQueueStats(len(self.runq_fnid))
893
894 self.runq_buildable = []
895 self.runq_running = []
896 self.runq_complete = []
897 self.build_pids = {}
898 self.failed_fnids = []
899
900 # Mark initial buildable tasks
901 for task in range(self.stats.total):
902 self.runq_running.append(0)
903 self.runq_complete.append(0)
904 if len(self.runq_depends[task]) == 0:
905 self.runq_buildable.append(1)
906 else:
907 self.runq_buildable.append(0)
908
909 self.state = runQueueRunning
910
911 event.fire(bb.event.StampUpdate(self.target_pairs, self.dataCache.stamp, self.cfgData))
912
913 def task_complete(self, task):
914 """
915 Mark a task as completed
916 Look at the reverse dependencies and mark any task with
917 completed dependencies as buildable
918 """
919 self.runq_complete[task] = 1
920 for revdep in self.runq_revdeps[task]:
921 if self.runq_running[revdep] == 1:
922 continue
923 if self.runq_buildable[revdep] == 1:
924 continue
925 alldeps = 1
926 for dep in self.runq_depends[revdep]:
927 if self.runq_complete[dep] != 1:
928 alldeps = 0
929 if alldeps == 1:
930 self.runq_buildable[revdep] = 1
931 fn = self.taskData.fn_index[self.runq_fnid[revdep]]
932 taskname = self.runq_task[revdep]
933 bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
934
935 def task_fail(self, task, exitcode):
936 """
937 Called when a task has failed
938 Updates the state engine with the failure
939 """
940 bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed with %s" % (task, self.get_user_idstring(task), exitcode))
941 self.stats.taskFailed()
942 fnid = self.runq_fnid[task]
943 self.failed_fnids.append(fnid)
944 bb.event.fire(runQueueTaskFailed(task, self.stats, self, self.cfgData))
945 if self.taskData.abort:
946 self.state = runQueueCleanup
947
948 def execute_runqueue_internal(self):
949 """
950 Run the tasks in a queue prepared by prepare_runqueue
951 """
952
953 if self.stats.total == 0:
954 # nothing to do
955 self.state = runQueueCleanup
956
957 while True:
958 task = None
959 if self.stats.active < self.number_tasks:
960 task = self.sched.next()
961 if task is not None:
962 fn = self.taskData.fn_index[self.runq_fnid[task]]
963
964 taskname = self.runq_task[task]
965 if self.check_stamp_task(task):
966 bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
967 self.runq_running[task] = 1
968 self.runq_buildable[task] = 1
969 self.task_complete(task)
970 self.stats.taskCompleted()
971 self.stats.taskSkipped()
972 continue
973
974 bb.event.fire(runQueueTaskStarted(task, self.stats, self, self.cfgData))
975 bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + 1, self.stats.total, task, self.get_user_idstring(task)))
976 sys.stdout.flush()
977 sys.stderr.flush()
978 try:
979 pid = os.fork()
980 except OSError, e:
981 bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
982 if pid == 0:
983 self.state = runQueueChildProcess
984 # Make the child the process group leader
985 os.setpgid(0, 0)
986 newsi = os.open('/dev/null', os.O_RDWR)
987 os.dup2(newsi, sys.stdin.fileno())
988 self.cooker.configuration.cmd = taskname[3:]
989 bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
990 try:
991 self.cooker.tryBuild(fn)
992 except bb.build.EventException:
993 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
994 sys.exit(1)
995 except:
996 bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
997 raise
998 sys.exit(0)
999 self.build_pids[pid] = task
1000 self.runq_running[task] = 1
1001 self.stats.taskActive()
1002 if self.stats.active < self.number_tasks:
1003 continue
1004 if self.stats.active > 0:
1005 result = os.waitpid(-1, os.WNOHANG)
1006 if result[0] is 0 and result[1] is 0:
1007 return
1008 task = self.build_pids[result[0]]
1009 del self.build_pids[result[0]]
1010 if result[1] != 0:
1011 self.task_fail(task, result[1])
1012 return
1013 self.task_complete(task)
1014 self.stats.taskCompleted()
1015 bb.event.fire(runQueueTaskCompleted(task, self.stats, self, self.cfgData))
1016 continue
1017
1018 if len(self.failed_fnids) != 0:
1019 self.state = runQueueFailed
1020 return
1021
1022 # Sanity Checks
1023 for task in range(self.stats.total):
1024 if self.runq_buildable[task] == 0:
1025 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
1026 if self.runq_running[task] == 0:
1027 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
1028 if self.runq_complete[task] == 0:
1029 bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
1030 self.state = runQueueComplete
1031 return
1032
1033 def finish_runqueue_now(self):
1034 bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.stats.active)
1035 for k, v in self.build_pids.iteritems():
1036 try:
1037 os.kill(-k, signal.SIGINT)
1038 except:
1039 pass
1040
1041 def finish_runqueue(self, now = False):
1042 self.state = runQueueCleanUp
1043 if now:
1044 self.finish_runqueue_now()
1045 try:
1046 while self.stats.active > 0:
1047 bb.event.fire(runQueueExitWait(self.stats.active, self.cfgData))
1048 bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.stats.active)
1049 tasknum = 1
1050 for k, v in self.build_pids.iteritems():
1051 bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
1052 tasknum = tasknum + 1
1053 result = os.waitpid(-1, os.WNOHANG)
1054 if result[0] is 0 and result[1] is 0:
1055 return
1056 task = self.build_pids[result[0]]
1057 del self.build_pids[result[0]]
1058 if result[1] != 0:
1059 self.task_fail(task, result[1])
1060 else:
1061 self.stats.taskCompleted()
1062 bb.event.fire(runQueueTaskCompleted(task, self.stats, self, self.cfgData))
1063 except:
1064 self.finish_runqueue_now()
1065 raise
1066
1067 if len(self.failed_fnids) != 0:
1068 self.state = runQueueFailed
1069 return
1070
1071 self.state = runQueueComplete
1072 return
1073
1074 def dump_data(self, taskQueue):
1075 """
1076 Dump some debug information on the internal data structures
1077 """
1078 bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
1079 for task in range(len(self.runq_task)):
1080 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
1081 taskQueue.fn_index[self.runq_fnid[task]],
1082 self.runq_task[task],
1083 self.runq_weight[task],
1084 self.runq_depends[task],
1085 self.runq_revdeps[task]))
1086
1087 bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
1088 for task1 in range(len(self.runq_task)):
1089 if task1 in self.prio_map:
1090 task = self.prio_map[task1]
1091 bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
1092 taskQueue.fn_index[self.runq_fnid[task]],
1093 self.runq_task[task],
1094 self.runq_weight[task],
1095 self.runq_depends[task],
1096 self.runq_revdeps[task]))
1097
1098
1099class TaskFailure(Exception):
1100 """
1101 Exception raised when a task in a runqueue fails
1102 """
1103 def __init__(self, x):
1104 self.args = x
1105
1106
1107class runQueueExitWait(bb.event.Event):
1108 """
1109 Event when waiting for task processes to exit
1110 """
1111
1112 def __init__(self, remain, d):
1113 self.remain = remain
1114 self.message = "Waiting for %s active tasks to finish" % remain
1115 bb.event.Event.__init__(self, d)
1116
1117class runQueueEvent(bb.event.Event):
1118 """
1119 Base runQueue event class
1120 """
1121 def __init__(self, task, stats, rq, d):
1122 self.taskid = task
1123 self.taskstring = rq.get_user_idstring(task)
1124 self.stats = stats
1125 bb.event.Event.__init__(self, d)
1126
1127class runQueueTaskStarted(runQueueEvent):
1128 """
1129 Event notifing a task was started
1130 """
1131 def __init__(self, task, stats, rq, d):
1132 runQueueEvent.__init__(self, task, stats, rq, d)
1133 self.message = "Running task %s (%d of %d) (%s)" % (task, stats.completed + stats.active + 1, self.stats.total, self.taskstring)
1134
1135class runQueueTaskFailed(runQueueEvent):
1136 """
1137 Event notifing a task failed
1138 """
1139 def __init__(self, task, stats, rq, d):
1140 runQueueEvent.__init__(self, task, stats, rq, d)
1141 self.message = "Task %s failed (%s)" % (task, self.taskstring)
1142
1143class runQueueTaskCompleted(runQueueEvent):
1144 """
1145 Event notifing a task completed
1146 """
1147 def __init__(self, task, stats, rq, d):
1148 runQueueEvent.__init__(self, task, stats, rq, d)
1149 self.message = "Task %s completed (%s)" % (task, self.taskstring)
1150
1151def check_stamp_fn(fn, taskname, d):
1152 rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
1153 fnid = rq.taskData.getfn_id(fn)
1154 taskid = rq.get_task_id(fnid, taskname)
1155 if taskid is not None:
1156 return rq.check_stamp_task(taskid)
1157 return None