summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py2154
1 files changed, 2154 insertions, 0 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
new file mode 100644
index 0000000000..6372b65fd9
--- /dev/null
+++ b/bitbake/lib/bb/runqueue.py
@@ -0,0 +1,2154 @@
1#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import re
34import bb
35from bb import msg, data, event
36from bb import monitordisk
37import subprocess
38
39try:
40 import cPickle as pickle
41except ImportError:
42 import pickle
43
44bblogger = logging.getLogger("BitBake")
45logger = logging.getLogger("BitBake.RunQueue")
46
47__find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' )
48
49class RunQueueStats:
50 """
51 Holds statistics on the tasks handled by the associated runQueue
52 """
53 def __init__(self, total):
54 self.completed = 0
55 self.skipped = 0
56 self.failed = 0
57 self.active = 0
58 self.total = total
59
60 def copy(self):
61 obj = self.__class__(self.total)
62 obj.__dict__.update(self.__dict__)
63 return obj
64
65 def taskFailed(self):
66 self.active = self.active - 1
67 self.failed = self.failed + 1
68
69 def taskCompleted(self, number = 1):
70 self.active = self.active - number
71 self.completed = self.completed + number
72
73 def taskSkipped(self, number = 1):
74 self.active = self.active + number
75 self.skipped = self.skipped + number
76
77 def taskActive(self):
78 self.active = self.active + 1
79
80# These values indicate the next step due to be run in the
81# runQueue state machine
82runQueuePrepare = 2
83runQueueSceneInit = 3
84runQueueSceneRun = 4
85runQueueRunInit = 5
86runQueueRunning = 6
87runQueueFailed = 7
88runQueueCleanUp = 8
89runQueueComplete = 9
90
91class RunQueueScheduler(object):
92 """
93 Control the order tasks are scheduled in.
94 """
95 name = "basic"
96
97 def __init__(self, runqueue, rqdata):
98 """
99 The default scheduler just returns the first buildable task (the
100 priority map is sorted by task numer)
101 """
102 self.rq = runqueue
103 self.rqdata = rqdata
104 self.numTasks = len(self.rqdata.runq_fnid)
105
106 self.prio_map = []
107 self.prio_map.extend(range(self.numTasks))
108
109 self.buildable = []
110 self.stamps = {}
111 for taskid in xrange(self.numTasks):
112 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
113 taskname = self.rqdata.runq_task[taskid]
114 self.stamps[taskid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
115 if self.rq.runq_buildable[taskid] == 1:
116 self.buildable.append(taskid)
117
118 self.rev_prio_map = None
119
120 def next_buildable_task(self):
121 """
122 Return the id of the first task we find that is buildable
123 """
124 self.buildable = [x for x in self.buildable if not self.rq.runq_running[x] == 1]
125 if not self.buildable:
126 return None
127 if len(self.buildable) == 1:
128 taskid = self.buildable[0]
129 stamp = self.stamps[taskid]
130 if stamp not in self.rq.build_stamps.itervalues():
131 return taskid
132
133 if not self.rev_prio_map:
134 self.rev_prio_map = range(self.numTasks)
135 for taskid in xrange(self.numTasks):
136 self.rev_prio_map[self.prio_map[taskid]] = taskid
137
138 best = None
139 bestprio = None
140 for taskid in self.buildable:
141 prio = self.rev_prio_map[taskid]
142 if not bestprio or bestprio > prio:
143 stamp = self.stamps[taskid]
144 if stamp in self.rq.build_stamps.itervalues():
145 continue
146 bestprio = prio
147 best = taskid
148
149 return best
150
151 def next(self):
152 """
153 Return the id of the task we should build next
154 """
155 if self.rq.stats.active < self.rq.number_tasks:
156 return self.next_buildable_task()
157
158 def newbuilable(self, task):
159 self.buildable.append(task)
160
161class RunQueueSchedulerSpeed(RunQueueScheduler):
162 """
163 A scheduler optimised for speed. The priority map is sorted by task weight,
164 heavier weighted tasks (tasks needed by the most other tasks) are run first.
165 """
166 name = "speed"
167
168 def __init__(self, runqueue, rqdata):
169 """
170 The priority map is sorted by task weight.
171 """
172 RunQueueScheduler.__init__(self, runqueue, rqdata)
173
174 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
175 copyweight = copy.deepcopy(self.rqdata.runq_weight)
176 self.prio_map = []
177
178 for weight in sortweight:
179 idx = copyweight.index(weight)
180 self.prio_map.append(idx)
181 copyweight[idx] = -1
182
183 self.prio_map.reverse()
184
185class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
186 """
187 A scheduler optimised to complete .bb files are quickly as possible. The
188 priority map is sorted by task weight, but then reordered so once a given
189 .bb file starts to build, its completed as quickly as possible. This works
190 well where disk space is at a premium and classes like OE's rm_work are in
191 force.
192 """
193 name = "completion"
194
195 def __init__(self, runqueue, rqdata):
196 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
197
198 #FIXME - whilst this groups all fnids together it does not reorder the
199 #fnid groups optimally.
200
201 basemap = copy.deepcopy(self.prio_map)
202 self.prio_map = []
203 while (len(basemap) > 0):
204 entry = basemap.pop(0)
205 self.prio_map.append(entry)
206 fnid = self.rqdata.runq_fnid[entry]
207 todel = []
208 for entry in basemap:
209 entry_fnid = self.rqdata.runq_fnid[entry]
210 if entry_fnid == fnid:
211 todel.append(basemap.index(entry))
212 self.prio_map.append(entry)
213 todel.reverse()
214 for idx in todel:
215 del basemap[idx]
216
217class RunQueueData:
218 """
219 BitBake Run Queue implementation
220 """
221 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
222 self.cooker = cooker
223 self.dataCache = dataCache
224 self.taskData = taskData
225 self.targets = targets
226 self.rq = rq
227 self.warn_multi_bb = False
228
229 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
230 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
231
232 self.reset()
233
234 def reset(self):
235 self.runq_fnid = []
236 self.runq_task = []
237 self.runq_depends = []
238 self.runq_revdeps = []
239 self.runq_hash = []
240
241 def runq_depends_names(self, ids):
242 import re
243 ret = []
244 for id in self.runq_depends[ids]:
245 nam = os.path.basename(self.get_user_idstring(id))
246 nam = re.sub("_[^,]*,", ",", nam)
247 ret.extend([nam])
248 return ret
249
250 def get_task_name(self, task):
251 return self.runq_task[task]
252
253 def get_task_file(self, task):
254 return self.taskData.fn_index[self.runq_fnid[task]]
255
256 def get_task_hash(self, task):
257 return self.runq_hash[task]
258
259 def get_user_idstring(self, task, task_name_suffix = ""):
260 fn = self.taskData.fn_index[self.runq_fnid[task]]
261 taskname = self.runq_task[task] + task_name_suffix
262 return "%s, %s" % (fn, taskname)
263
264 def get_task_id(self, fnid, taskname):
265 for listid in xrange(len(self.runq_fnid)):
266 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
267 return listid
268 return None
269
270 def circular_depchains_handler(self, tasks):
271 """
272 Some tasks aren't buildable, likely due to circular dependency issues.
273 Identify the circular dependencies and print them in a user readable format.
274 """
275 from copy import deepcopy
276
277 valid_chains = []
278 explored_deps = {}
279 msgs = []
280
281 def chain_reorder(chain):
282 """
283 Reorder a dependency chain so the lowest task id is first
284 """
285 lowest = 0
286 new_chain = []
287 for entry in xrange(len(chain)):
288 if chain[entry] < chain[lowest]:
289 lowest = entry
290 new_chain.extend(chain[lowest:])
291 new_chain.extend(chain[:lowest])
292 return new_chain
293
294 def chain_compare_equal(chain1, chain2):
295 """
296 Compare two dependency chains and see if they're the same
297 """
298 if len(chain1) != len(chain2):
299 return False
300 for index in xrange(len(chain1)):
301 if chain1[index] != chain2[index]:
302 return False
303 return True
304
305 def chain_array_contains(chain, chain_array):
306 """
307 Return True if chain_array contains chain
308 """
309 for ch in chain_array:
310 if chain_compare_equal(ch, chain):
311 return True
312 return False
313
314 def find_chains(taskid, prev_chain):
315 prev_chain.append(taskid)
316 total_deps = []
317 total_deps.extend(self.runq_revdeps[taskid])
318 for revdep in self.runq_revdeps[taskid]:
319 if revdep in prev_chain:
320 idx = prev_chain.index(revdep)
321 # To prevent duplicates, reorder the chain to start with the lowest taskid
322 # and search through an array of those we've already printed
323 chain = prev_chain[idx:]
324 new_chain = chain_reorder(chain)
325 if not chain_array_contains(new_chain, valid_chains):
326 valid_chains.append(new_chain)
327 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
328 for dep in new_chain:
329 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
330 msgs.append("\n")
331 if len(valid_chains) > 10:
332 msgs.append("Aborted dependency loops search after 10 matches.\n")
333 return msgs
334 continue
335 scan = False
336 if revdep not in explored_deps:
337 scan = True
338 elif revdep in explored_deps[revdep]:
339 scan = True
340 else:
341 for dep in prev_chain:
342 if dep in explored_deps[revdep]:
343 scan = True
344 if scan:
345 find_chains(revdep, copy.deepcopy(prev_chain))
346 for dep in explored_deps[revdep]:
347 if dep not in total_deps:
348 total_deps.append(dep)
349
350 explored_deps[taskid] = total_deps
351
352 for task in tasks:
353 find_chains(task, [])
354
355 return msgs
356
357 def calculate_task_weights(self, endpoints):
358 """
359 Calculate a number representing the "weight" of each task. Heavier weighted tasks
360 have more dependencies and hence should be executed sooner for maximum speed.
361
362 This function also sanity checks the task list finding tasks that are not
363 possible to execute due to circular dependencies.
364 """
365
366 numTasks = len(self.runq_fnid)
367 weight = []
368 deps_left = []
369 task_done = []
370
371 for listid in xrange(numTasks):
372 task_done.append(False)
373 weight.append(0)
374 deps_left.append(len(self.runq_revdeps[listid]))
375
376 for listid in endpoints:
377 weight[listid] = 1
378 task_done[listid] = True
379
380 while True:
381 next_points = []
382 for listid in endpoints:
383 for revdep in self.runq_depends[listid]:
384 weight[revdep] = weight[revdep] + weight[listid]
385 deps_left[revdep] = deps_left[revdep] - 1
386 if deps_left[revdep] == 0:
387 next_points.append(revdep)
388 task_done[revdep] = True
389 endpoints = next_points
390 if len(next_points) == 0:
391 break
392
393 # Circular dependency sanity check
394 problem_tasks = []
395 for task in xrange(numTasks):
396 if task_done[task] is False or deps_left[task] != 0:
397 problem_tasks.append(task)
398 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
399 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
400
401 if problem_tasks:
402 message = "Unbuildable tasks were found.\n"
403 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
404 message = message + "Identifying dependency loops (this may take a short while)...\n"
405 logger.error(message)
406
407 msgs = self.circular_depchains_handler(problem_tasks)
408
409 message = "\n"
410 for msg in msgs:
411 message = message + msg
412 bb.msg.fatal("RunQueue", message)
413
414 return weight
415
416 def prepare(self):
417 """
418 Turn a set of taskData into a RunQueue and compute data needed
419 to optimise the execution order.
420 """
421
422 runq_build = []
423 recursivetasks = {}
424 recursiveitasks = {}
425 recursivetasksselfref = set()
426
427 taskData = self.taskData
428
429 if len(taskData.tasks_name) == 0:
430 # Nothing to do
431 return 0
432
433 logger.info("Preparing runqueue")
434
435 # Step A - Work out a list of tasks to run
436 #
437 # Taskdata gives us a list of possible providers for every build and run
438 # target ordered by priority. It also gives information on each of those
439 # providers.
440 #
441 # To create the actual list of tasks to execute we fix the list of
442 # providers and then resolve the dependencies into task IDs. This
443 # process is repeated for each type of dependency (tdepends, deptask,
444 # rdeptast, recrdeptask, idepends).
445
446 def add_build_dependencies(depids, tasknames, depends):
447 for depid in depids:
448 # Won't be in build_targets if ASSUME_PROVIDED
449 if depid not in taskData.build_targets:
450 continue
451 depdata = taskData.build_targets[depid][0]
452 if depdata is None:
453 continue
454 for taskname in tasknames:
455 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
456 if taskid is not None:
457 depends.add(taskid)
458
459 def add_runtime_dependencies(depids, tasknames, depends):
460 for depid in depids:
461 if depid not in taskData.run_targets:
462 continue
463 depdata = taskData.run_targets[depid][0]
464 if depdata is None:
465 continue
466 for taskname in tasknames:
467 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
468 if taskid is not None:
469 depends.add(taskid)
470
471 def add_resolved_dependencies(depids, tasknames, depends):
472 for depid in depids:
473 for taskname in tasknames:
474 taskid = taskData.gettask_id_fromfnid(depid, taskname)
475 if taskid is not None:
476 depends.add(taskid)
477
478 for task in xrange(len(taskData.tasks_name)):
479 depends = set()
480 fnid = taskData.tasks_fnid[task]
481 fn = taskData.fn_index[fnid]
482 task_deps = self.dataCache.task_deps[fn]
483
484 #logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
485
486 if fnid not in taskData.failed_fnids:
487
488 # Resolve task internal dependencies
489 #
490 # e.g. addtask before X after Y
491 depends = set(taskData.tasks_tdepends[task])
492
493 # Resolve 'deptask' dependencies
494 #
495 # e.g. do_sometask[deptask] = "do_someothertask"
496 # (makes sure sometask runs after someothertask of all DEPENDS)
497 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
498 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
499 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
500
501 # Resolve 'rdeptask' dependencies
502 #
503 # e.g. do_sometask[rdeptask] = "do_someothertask"
504 # (makes sure sometask runs after someothertask of all RDEPENDS)
505 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
506 tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
507 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
508
509 # Resolve inter-task dependencies
510 #
511 # e.g. do_sometask[depends] = "targetname:do_someothertask"
512 # (makes sure sometask runs after targetname's someothertask)
513 idepends = taskData.tasks_idepends[task]
514 for (depid, idependtask) in idepends:
515 if depid in taskData.build_targets and not depid in taskData.failed_deps:
516 # Won't be in build_targets if ASSUME_PROVIDED
517 depdata = taskData.build_targets[depid][0]
518 if depdata is not None:
519 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
520 if taskid is None:
521 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
522 depends.add(taskid)
523 irdepends = taskData.tasks_irdepends[task]
524 for (depid, idependtask) in irdepends:
525 if depid in taskData.run_targets:
526 # Won't be in run_targets if ASSUME_PROVIDED
527 depdata = taskData.run_targets[depid][0]
528 if depdata is not None:
529 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
530 if taskid is None:
531 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
532 depends.add(taskid)
533
534 # Resolve recursive 'recrdeptask' dependencies (Part A)
535 #
536 # e.g. do_sometask[recrdeptask] = "do_someothertask"
537 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538 # We cover the recursive part of the dependencies below
539 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
540 tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
541 recursivetasks[task] = tasknames
542 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
543 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
544 if taskData.tasks_name[task] in tasknames:
545 recursivetasksselfref.add(task)
546
547 if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']:
548 recursiveitasks[task] = []
549 for t in task_deps['recideptask'][taskData.tasks_name[task]].split():
550 newdep = taskData.gettask_id_fromfnid(fnid, t)
551 recursiveitasks[task].append(newdep)
552
553 self.runq_fnid.append(taskData.tasks_fnid[task])
554 self.runq_task.append(taskData.tasks_name[task])
555 self.runq_depends.append(depends)
556 self.runq_revdeps.append(set())
557 self.runq_hash.append("")
558
559 runq_build.append(0)
560
561 # Resolve recursive 'recrdeptask' dependencies (Part B)
562 #
563 # e.g. do_sometask[recrdeptask] = "do_someothertask"
564 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
565 # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
566 extradeps = {}
567 for task in recursivetasks:
568 extradeps[task] = set(self.runq_depends[task])
569 tasknames = recursivetasks[task]
570 seendeps = set()
571 seenfnid = []
572
573 def generate_recdeps(t):
574 newdeps = set()
575 add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
576 extradeps[task].update(newdeps)
577 seendeps.add(t)
578 newdeps.add(t)
579 for i in newdeps:
580 for n in self.runq_depends[i]:
581 if n not in seendeps:
582 generate_recdeps(n)
583 generate_recdeps(task)
584
585 if task in recursiveitasks:
586 for dep in recursiveitasks[task]:
587 generate_recdeps(dep)
588
589 # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
590 for task in recursivetasks:
591 extradeps[task].difference_update(recursivetasksselfref)
592
593 for task in xrange(len(taskData.tasks_name)):
594 # Add in extra dependencies
595 if task in extradeps:
596 self.runq_depends[task] = extradeps[task]
597 # Remove all self references
598 if task in self.runq_depends[task]:
599 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
600 self.runq_depends[task].remove(task)
601
602 # Step B - Mark all active tasks
603 #
604 # Start with the tasks we were asked to run and mark all dependencies
605 # as active too. If the task is to be 'forced', clear its stamp. Once
606 # all active tasks are marked, prune the ones we don't need.
607
608 logger.verbose("Marking Active Tasks")
609
610 def mark_active(listid, depth):
611 """
612 Mark an item as active along with its depends
613 (calls itself recursively)
614 """
615
616 if runq_build[listid] == 1:
617 return
618
619 runq_build[listid] = 1
620
621 depends = self.runq_depends[listid]
622 for depend in depends:
623 mark_active(depend, depth+1)
624
625 self.target_pairs = []
626 for target in self.targets:
627 targetid = taskData.getbuild_id(target[0])
628
629 if targetid not in taskData.build_targets:
630 continue
631
632 if targetid in taskData.failed_deps:
633 continue
634
635 fnid = taskData.build_targets[targetid][0]
636 fn = taskData.fn_index[fnid]
637 self.target_pairs.append((fn, target[1]))
638
639 if fnid in taskData.failed_fnids:
640 continue
641
642 if target[1] not in taskData.tasks_lookup[fnid]:
643 import difflib
644 close_matches = difflib.get_close_matches(target[1], taskData.tasks_lookup[fnid], cutoff=0.7)
645 if close_matches:
646 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
647 else:
648 extra = ""
649 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (target[1], target[0], extra))
650
651 listid = taskData.tasks_lookup[fnid][target[1]]
652
653 mark_active(listid, 1)
654
655 # Step C - Prune all inactive tasks
656 #
657 # Once all active tasks are marked, prune the ones we don't need.
658
659 maps = []
660 delcount = 0
661 for listid in xrange(len(self.runq_fnid)):
662 if runq_build[listid-delcount] == 1:
663 maps.append(listid-delcount)
664 else:
665 del self.runq_fnid[listid-delcount]
666 del self.runq_task[listid-delcount]
667 del self.runq_depends[listid-delcount]
668 del runq_build[listid-delcount]
669 del self.runq_revdeps[listid-delcount]
670 del self.runq_hash[listid-delcount]
671 delcount = delcount + 1
672 maps.append(-1)
673
674 #
675 # Step D - Sanity checks and computation
676 #
677
678 # Check to make sure we still have tasks to run
679 if len(self.runq_fnid) == 0:
680 if not taskData.abort:
681 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
682 else:
683 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
684
685 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
686
687 # Remap the dependencies to account for the deleted tasks
688 # Check we didn't delete a task we depend on
689 for listid in xrange(len(self.runq_fnid)):
690 newdeps = []
691 origdeps = self.runq_depends[listid]
692 for origdep in origdeps:
693 if maps[origdep] == -1:
694 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
695 newdeps.append(maps[origdep])
696 self.runq_depends[listid] = set(newdeps)
697
698 logger.verbose("Assign Weightings")
699
700 # Generate a list of reverse dependencies to ease future calculations
701 for listid in xrange(len(self.runq_fnid)):
702 for dep in self.runq_depends[listid]:
703 self.runq_revdeps[dep].add(listid)
704
705 # Identify tasks at the end of dependency chains
706 # Error on circular dependency loops (length two)
707 endpoints = []
708 for listid in xrange(len(self.runq_fnid)):
709 revdeps = self.runq_revdeps[listid]
710 if len(revdeps) == 0:
711 endpoints.append(listid)
712 for dep in revdeps:
713 if dep in self.runq_depends[listid]:
714 #self.dump_data(taskData)
715 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
716
717 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
718
719 # Calculate task weights
720 # Check of higher length circular dependencies
721 self.runq_weight = self.calculate_task_weights(endpoints)
722
723 # Sanity Check - Check for multiple tasks building the same provider
724 prov_list = {}
725 seen_fn = []
726 for task in xrange(len(self.runq_fnid)):
727 fn = taskData.fn_index[self.runq_fnid[task]]
728 if fn in seen_fn:
729 continue
730 seen_fn.append(fn)
731 for prov in self.dataCache.fn_provides[fn]:
732 if prov not in prov_list:
733 prov_list[prov] = [fn]
734 elif fn not in prov_list[prov]:
735 prov_list[prov].append(fn)
736 for prov in prov_list:
737 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
738 seen_pn = []
739 # If two versions of the same PN are being built its fatal, we don't support it.
740 for fn in prov_list[prov]:
741 pn = self.dataCache.pkg_fn[fn]
742 if pn not in seen_pn:
743 seen_pn.append(pn)
744 else:
745 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
746 msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov]))
747 if self.warn_multi_bb:
748 logger.warn(msg)
749 else:
750 msg += "\n This usually means one provides something the other doesn't and should."
751 logger.error(msg)
752
753 # Create a whitelist usable by the stamp checks
754 stampfnwhitelist = []
755 for entry in self.stampwhitelist.split():
756 entryid = self.taskData.getbuild_id(entry)
757 if entryid not in self.taskData.build_targets:
758 continue
759 fnid = self.taskData.build_targets[entryid][0]
760 fn = self.taskData.fn_index[fnid]
761 stampfnwhitelist.append(fn)
762 self.stampfnwhitelist = stampfnwhitelist
763
764 # Iterate over the task list looking for tasks with a 'setscene' function
765 self.runq_setscene = []
766 if not self.cooker.configuration.nosetscene:
767 for task in range(len(self.runq_fnid)):
768 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
769 if not setscene:
770 continue
771 self.runq_setscene.append(task)
772
773 def invalidate_task(fn, taskname, error_nostamp):
774 taskdep = self.dataCache.task_deps[fn]
775 fnid = self.taskData.getfn_id(fn)
776 if taskname not in taskData.tasks_lookup[fnid]:
777 logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname)
778 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
779 if error_nostamp:
780 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
781 else:
782 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
783 else:
784 logger.verbose("Invalidate task %s, %s", taskname, fn)
785 bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
786
787 # Invalidate task if force mode active
788 if self.cooker.configuration.force:
789 for (fn, target) in self.target_pairs:
790 invalidate_task(fn, target, False)
791
792 # Invalidate task if invalidate mode active
793 if self.cooker.configuration.invalidate_stamp:
794 for (fn, target) in self.target_pairs:
795 for st in self.cooker.configuration.invalidate_stamp.split(','):
796 invalidate_task(fn, "do_%s" % st, True)
797
798 # Interate over the task list and call into the siggen code
799 dealtwith = set()
800 todeal = set(range(len(self.runq_fnid)))
801 while len(todeal) > 0:
802 for task in todeal.copy():
803 if len(self.runq_depends[task] - dealtwith) == 0:
804 dealtwith.add(task)
805 todeal.remove(task)
806 procdep = []
807 for dep in self.runq_depends[task]:
808 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
809 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
810
811 return len(self.runq_fnid)
812
813 def dump_data(self, taskQueue):
814 """
815 Dump some debug information on the internal data structures
816 """
817 logger.debug(3, "run_tasks:")
818 for task in xrange(len(self.rqdata.runq_task)):
819 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
820 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
821 self.rqdata.runq_task[task],
822 self.rqdata.runq_weight[task],
823 self.rqdata.runq_depends[task],
824 self.rqdata.runq_revdeps[task])
825
826 logger.debug(3, "sorted_tasks:")
827 for task1 in xrange(len(self.rqdata.runq_task)):
828 if task1 in self.prio_map:
829 task = self.prio_map[task1]
830 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
831 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
832 self.rqdata.runq_task[task],
833 self.rqdata.runq_weight[task],
834 self.rqdata.runq_depends[task],
835 self.rqdata.runq_revdeps[task])
836
837class RunQueue:
838 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
839
840 self.cooker = cooker
841 self.cfgData = cfgData
842 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
843
844 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
845 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
846 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
847 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
848
849 self.state = runQueuePrepare
850
851 # For disk space monitor
852 self.dm = monitordisk.diskMonitor(cfgData)
853
854 self.rqexe = None
855 self.worker = None
856 self.workerpipe = None
857 self.fakeworker = None
858 self.fakeworkerpipe = None
859
860 def _start_worker(self, fakeroot = False, rqexec = None):
861 logger.debug(1, "Starting bitbake-worker")
862 if fakeroot:
863 fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
864 fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
865 env = os.environ.copy()
866 for key, value in (var.split('=') for var in fakerootenv):
867 env[key] = value
868 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
869 else:
870 worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
871 bb.utils.nonblockingfd(worker.stdout)
872 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec)
873
874 workerdata = {
875 "taskdeps" : self.rqdata.dataCache.task_deps,
876 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
877 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
878 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
879 "hashes" : bb.parse.siggen.taskhash,
880 "hash_deps" : bb.parse.siggen.runtaskdeps,
881 "sigchecksums" : bb.parse.siggen.file_checksum_values,
882 "runq_hash" : self.rqdata.runq_hash,
883 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
884 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
885 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
886 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
887 "prhost" : self.cooker.prhost,
888 "buildname" : self.cfgData.getVar("BUILDNAME", True),
889 "date" : self.cfgData.getVar("DATE", True),
890 "time" : self.cfgData.getVar("TIME", True),
891 }
892
893 worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
894 worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
895 worker.stdin.flush()
896
897 return worker, workerpipe
898
899 def _teardown_worker(self, worker, workerpipe):
900 if not worker:
901 return
902 logger.debug(1, "Teardown for bitbake-worker")
903 try:
904 worker.stdin.write("<quit></quit>")
905 worker.stdin.flush()
906 except IOError:
907 pass
908 while worker.returncode is None:
909 workerpipe.read()
910 worker.poll()
911 while workerpipe.read():
912 continue
913 workerpipe.close()
914
915 def start_worker(self):
916 if self.worker:
917 self.teardown_workers()
918 self.teardown = False
919 self.worker, self.workerpipe = self._start_worker()
920
921 def start_fakeworker(self, rqexec):
922 if not self.fakeworker:
923 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
924
925 def teardown_workers(self):
926 self.teardown = True
927 self._teardown_worker(self.worker, self.workerpipe)
928 self.worker = None
929 self.workerpipe = None
930 self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
931 self.fakeworker = None
932 self.fakeworkerpipe = None
933
934 def read_workers(self):
935 self.workerpipe.read()
936 if self.fakeworkerpipe:
937 self.fakeworkerpipe.read()
938
939 def active_fds(self):
940 fds = []
941 if self.workerpipe:
942 fds.append(self.workerpipe.input)
943 if self.fakeworkerpipe:
944 fds.append(self.fakeworkerpipe.input)
945 return fds
946
947 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
948 def get_timestamp(f):
949 try:
950 if not os.access(f, os.F_OK):
951 return None
952 return os.stat(f)[stat.ST_MTIME]
953 except:
954 return None
955
956 if self.stamppolicy == "perfile":
957 fulldeptree = False
958 else:
959 fulldeptree = True
960 stampwhitelist = []
961 if self.stamppolicy == "whitelist":
962 stampwhitelist = self.rqdata.stampfnwhitelist
963
964 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
965 if taskname is None:
966 taskname = self.rqdata.runq_task[task]
967
968 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
969
970 # If the stamp is missing its not current
971 if not os.access(stampfile, os.F_OK):
972 logger.debug(2, "Stampfile %s not available", stampfile)
973 return False
974 # If its a 'nostamp' task, it's not current
975 taskdep = self.rqdata.dataCache.task_deps[fn]
976 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
977 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
978 return False
979
980 if taskname != "do_setscene" and taskname.endswith("_setscene"):
981 return True
982
983 if cache is None:
984 cache = {}
985
986 iscurrent = True
987 t1 = get_timestamp(stampfile)
988 for dep in self.rqdata.runq_depends[task]:
989 if iscurrent:
990 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
991 taskname2 = self.rqdata.runq_task[dep]
992 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
993 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
994 t2 = get_timestamp(stampfile2)
995 t3 = get_timestamp(stampfile3)
996 if t3 and t3 > t2:
997 continue
998 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
999 if not t2:
1000 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
1001 iscurrent = False
1002 if t1 < t2:
1003 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
1004 iscurrent = False
1005 if recurse and iscurrent:
1006 if dep in cache:
1007 iscurrent = cache[dep]
1008 if not iscurrent:
1009 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1010 else:
1011 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1012 cache[dep] = iscurrent
1013 if recurse:
1014 cache[task] = iscurrent
1015 return iscurrent
1016
1017 def _execute_runqueue(self):
1018 """
1019 Run the tasks in a queue prepared by rqdata.prepare()
1020 Upon failure, optionally try to recover the build using any alternate providers
1021 (if the abort on failure configuration option isn't set)
1022 """
1023
1024 retval = True
1025
1026 if self.state is runQueuePrepare:
1027 self.rqexe = RunQueueExecuteDummy(self)
1028 if self.rqdata.prepare() == 0:
1029 self.state = runQueueComplete
1030 else:
1031 self.state = runQueueSceneInit
1032
1033 # we are ready to run, see if any UI client needs the dependency info
1034 if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset:
1035 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1036 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1037
1038 if self.state is runQueueSceneInit:
1039 dump = self.cooker.configuration.dump_signatures
1040 if dump:
1041 if 'printdiff' in dump:
1042 invalidtasks = self.print_diffscenetasks()
1043 self.dump_signatures(dump)
1044 if 'printdiff' in dump:
1045 self.write_diffscenetasks(invalidtasks)
1046 self.state = runQueueComplete
1047 else:
1048 self.start_worker()
1049 self.rqexe = RunQueueExecuteScenequeue(self)
1050
1051 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
1052 self.dm.check(self)
1053
1054 if self.state is runQueueSceneRun:
1055 retval = self.rqexe.execute()
1056
1057 if self.state is runQueueRunInit:
1058 logger.info("Executing RunQueue Tasks")
1059 self.rqexe = RunQueueExecuteTasks(self)
1060 self.state = runQueueRunning
1061
1062 if self.state is runQueueRunning:
1063 retval = self.rqexe.execute()
1064
1065 if self.state is runQueueCleanUp:
1066 self.rqexe.finish()
1067
1068 if self.state is runQueueComplete or self.state is runQueueFailed:
1069 self.teardown_workers()
1070 if self.rqexe.stats.failed:
1071 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1072 else:
1073 # Let's avoid the word "failed" if nothing actually did
1074 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1075
1076 if self.state is runQueueFailed:
1077 if not self.rqdata.taskData.tryaltconfigs:
1078 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
1079 for fnid in self.rqexe.failed_fnids:
1080 self.rqdata.taskData.fail_fnid(fnid)
1081 self.rqdata.reset()
1082
1083 if self.state is runQueueComplete:
1084 # All done
1085 return False
1086
1087 # Loop
1088 return retval
1089
1090 def execute_runqueue(self):
1091 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1092 try:
1093 return self._execute_runqueue()
1094 except bb.runqueue.TaskFailure:
1095 raise
1096 except SystemExit:
1097 raise
1098 except:
1099 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
1100 try:
1101 self.teardown_workers()
1102 except:
1103 pass
1104 self.state = runQueueComplete
1105 raise
1106
1107 def finish_runqueue(self, now = False):
1108 if not self.rqexe:
1109 return
1110
1111 if now:
1112 self.rqexe.finish_now()
1113 else:
1114 self.rqexe.finish()
1115
1116 def dump_signatures(self, options):
1117 done = set()
1118 bb.note("Reparsing files to collect dependency data")
1119 for task in range(len(self.rqdata.runq_fnid)):
1120 if self.rqdata.runq_fnid[task] not in done:
1121 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1122 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1123 done.add(self.rqdata.runq_fnid[task])
1124
1125 bb.parse.siggen.dump_sigs(self.rqdata.dataCache, options)
1126
1127 return
1128
1129 def print_diffscenetasks(self):
1130
1131 valid = []
1132 sq_hash = []
1133 sq_hashfn = []
1134 sq_fn = []
1135 sq_taskname = []
1136 sq_task = []
1137 noexec = []
1138 stamppresent = []
1139 valid_new = set()
1140
1141 for task in xrange(len(self.rqdata.runq_fnid)):
1142 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1143 taskname = self.rqdata.runq_task[task]
1144 taskdep = self.rqdata.dataCache.task_deps[fn]
1145
1146 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1147 noexec.append(task)
1148 continue
1149
1150 sq_fn.append(fn)
1151 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1152 sq_hash.append(self.rqdata.runq_hash[task])
1153 sq_taskname.append(taskname)
1154 sq_task.append(task)
1155 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1156 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
1157 valid = bb.utils.better_eval(call, locs)
1158 for v in valid:
1159 valid_new.add(sq_task[v])
1160
1161 # Tasks which are both setscene and noexec never care about dependencies
1162 # We therefore find tasks which are setscene and noexec and mark their
1163 # unique dependencies as valid.
1164 for task in noexec:
1165 if task not in self.rqdata.runq_setscene:
1166 continue
1167 for dep in self.rqdata.runq_depends[task]:
1168 hasnoexecparents = True
1169 for dep2 in self.rqdata.runq_revdeps[dep]:
1170 if dep2 in self.rqdata.runq_setscene and dep2 in noexec:
1171 continue
1172 hasnoexecparents = False
1173 break
1174 if hasnoexecparents:
1175 valid_new.add(dep)
1176
1177 invalidtasks = set()
1178 for task in xrange(len(self.rqdata.runq_fnid)):
1179 if task not in valid_new and task not in noexec:
1180 invalidtasks.add(task)
1181
1182 found = set()
1183 processed = set()
1184 for task in invalidtasks:
1185 toprocess = set([task])
1186 while toprocess:
1187 next = set()
1188 for t in toprocess:
1189 for dep in self.rqdata.runq_depends[t]:
1190 if dep in invalidtasks:
1191 found.add(task)
1192 if dep not in processed:
1193 processed.add(dep)
1194 next.add(dep)
1195 toprocess = next
1196 if task in found:
1197 toprocess = set()
1198
1199 tasklist = []
1200 for task in invalidtasks.difference(found):
1201 tasklist.append(self.rqdata.get_user_idstring(task))
1202
1203 if tasklist:
1204 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1205
1206 return invalidtasks.difference(found)
1207
1208 def write_diffscenetasks(self, invalidtasks):
1209
1210 # Define recursion callback
1211 def recursecb(key, hash1, hash2):
1212 hashes = [hash1, hash2]
1213 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1214
1215 recout = []
1216 if len(hashfiles) == 2:
1217 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1218 recout.extend(list(' ' + l for l in out2))
1219 else:
1220 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1221
1222 return recout
1223
1224
1225 for task in invalidtasks:
1226 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1227 pn = self.rqdata.dataCache.pkg_fn[fn]
1228 taskname = self.rqdata.runq_task[task]
1229 h = self.rqdata.runq_hash[task]
1230 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1231 match = None
1232 for m in matches:
1233 if h in m:
1234 match = m
1235 if match is None:
1236 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1237 matches = {k : v for k, v in matches.iteritems() if h not in k}
1238 if matches:
1239 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1240 prevh = __find_md5__.search(latestmatch).group(0)
1241 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1242 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
1243
1244class RunQueueExecute:
1245
1246 def __init__(self, rq):
1247 self.rq = rq
1248 self.cooker = rq.cooker
1249 self.cfgData = rq.cfgData
1250 self.rqdata = rq.rqdata
1251
1252 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
1253 self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
1254
1255 self.runq_buildable = []
1256 self.runq_running = []
1257 self.runq_complete = []
1258
1259 self.build_stamps = {}
1260 self.build_stamps2 = []
1261 self.failed_fnids = []
1262
1263 self.stampcache = {}
1264
1265 rq.workerpipe.setrunqueueexec(self)
1266 if rq.fakeworkerpipe:
1267 rq.fakeworkerpipe.setrunqueueexec(self)
1268
1269 def runqueue_process_waitpid(self, task, status):
1270
1271 # self.build_stamps[pid] may not exist when use shared work directory.
1272 if task in self.build_stamps:
1273 self.build_stamps2.remove(self.build_stamps[task])
1274 del self.build_stamps[task]
1275
1276 if status != 0:
1277 self.task_fail(task, status)
1278 else:
1279 self.task_complete(task)
1280 return True
1281
1282 def finish_now(self):
1283
1284 for worker in [self.rq.worker, self.rq.fakeworker]:
1285 if not worker:
1286 continue
1287 try:
1288 worker.stdin.write("<finishnow></finishnow>")
1289 worker.stdin.flush()
1290 except IOError:
1291 # worker must have died?
1292 pass
1293
1294 if len(self.failed_fnids) != 0:
1295 self.rq.state = runQueueFailed
1296 return
1297
1298 self.rq.state = runQueueComplete
1299 return
1300
1301 def finish(self):
1302 self.rq.state = runQueueCleanUp
1303
1304 if self.stats.active > 0:
1305 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1306 self.rq.read_workers()
1307
1308 return
1309
1310 if len(self.failed_fnids) != 0:
1311 self.rq.state = runQueueFailed
1312 return
1313
1314 self.rq.state = runQueueComplete
1315 return
1316
1317 def check_dependencies(self, task, taskdeps, setscene = False):
1318 if not self.rq.depvalidate:
1319 return False
1320
1321 taskdata = {}
1322 taskdeps.add(task)
1323 for dep in taskdeps:
1324 if setscene:
1325 depid = self.rqdata.runq_setscene[dep]
1326 else:
1327 depid = dep
1328 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
1329 pn = self.rqdata.dataCache.pkg_fn[fn]
1330 taskname = self.rqdata.runq_task[depid]
1331 taskdata[dep] = [pn, taskname, fn]
1332 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1333 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1334 valid = bb.utils.better_eval(call, locs)
1335 return valid
1336
1337class RunQueueExecuteDummy(RunQueueExecute):
1338 def __init__(self, rq):
1339 self.rq = rq
1340 self.stats = RunQueueStats(0)
1341
1342 def finish(self):
1343 self.rq.state = runQueueComplete
1344 return
1345
1346class RunQueueExecuteTasks(RunQueueExecute):
1347 def __init__(self, rq):
1348 RunQueueExecute.__init__(self, rq)
1349
1350 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1351
1352 self.stampcache = {}
1353
1354 initial_covered = self.rq.scenequeue_covered.copy()
1355
1356 # Mark initial buildable tasks
1357 for task in xrange(self.stats.total):
1358 self.runq_running.append(0)
1359 self.runq_complete.append(0)
1360 if len(self.rqdata.runq_depends[task]) == 0:
1361 self.runq_buildable.append(1)
1362 else:
1363 self.runq_buildable.append(0)
1364 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1365 self.rq.scenequeue_covered.add(task)
1366
1367 found = True
1368 while found:
1369 found = False
1370 for task in xrange(self.stats.total):
1371 if task in self.rq.scenequeue_covered:
1372 continue
1373 logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
1374
1375 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1376 found = True
1377 self.rq.scenequeue_covered.add(task)
1378
1379 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1380
1381 # Allow the metadata to elect for setscene tasks to run anyway
1382 covered_remove = set()
1383 if self.rq.setsceneverify:
1384 invalidtasks = []
1385 for task in xrange(len(self.rqdata.runq_task)):
1386 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1387 taskname = self.rqdata.runq_task[task]
1388 taskdep = self.rqdata.dataCache.task_deps[fn]
1389
1390 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1391 continue
1392 if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
1393 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1394 continue
1395 if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
1396 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1397 continue
1398 invalidtasks.append(task)
1399
1400 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
1401 call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1402 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.data, "invalidtasks" : invalidtasks }
1403 # Backwards compatibility with older versions without invalidtasks
1404 try:
1405 covered_remove = bb.utils.better_eval(call, locs)
1406 except TypeError:
1407 covered_remove = bb.utils.better_eval(call2, locs)
1408
1409 def removecoveredtask(task):
1410 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1411 taskname = self.rqdata.runq_task[task] + '_setscene'
1412 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1413 self.rq.scenequeue_covered.remove(task)
1414
1415 toremove = covered_remove
1416 for task in toremove:
1417 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1418 while toremove:
1419 covered_remove = []
1420 for task in toremove:
1421 removecoveredtask(task)
1422 for deptask in self.rqdata.runq_depends[task]:
1423 if deptask not in self.rq.scenequeue_covered:
1424 continue
1425 if deptask in toremove or deptask in covered_remove or deptask in initial_covered:
1426 continue
1427 logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask))
1428 covered_remove.append(deptask)
1429 toremove = covered_remove
1430
1431 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1432
1433 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1434
1435 schedulers = self.get_schedulers()
1436 for scheduler in schedulers:
1437 if self.scheduler == scheduler.name:
1438 self.sched = scheduler(self, self.rqdata)
1439 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1440 break
1441 else:
1442 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1443 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1444
1445 def get_schedulers(self):
1446 schedulers = set(obj for obj in globals().values()
1447 if type(obj) is type and
1448 issubclass(obj, RunQueueScheduler))
1449
1450 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
1451 if user_schedulers:
1452 for sched in user_schedulers.split():
1453 if not "." in sched:
1454 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1455 continue
1456
1457 modname, name = sched.rsplit(".", 1)
1458 try:
1459 module = __import__(modname, fromlist=(name,))
1460 except ImportError as exc:
1461 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1462 raise SystemExit(1)
1463 else:
1464 schedulers.add(getattr(module, name))
1465 return schedulers
1466
1467 def setbuildable(self, task):
1468 self.runq_buildable[task] = 1
1469 self.sched.newbuilable(task)
1470
1471 def task_completeoutright(self, task):
1472 """
1473 Mark a task as completed
1474 Look at the reverse dependencies and mark any task with
1475 completed dependencies as buildable
1476 """
1477 self.runq_complete[task] = 1
1478 for revdep in self.rqdata.runq_revdeps[task]:
1479 if self.runq_running[revdep] == 1:
1480 continue
1481 if self.runq_buildable[revdep] == 1:
1482 continue
1483 alldeps = 1
1484 for dep in self.rqdata.runq_depends[revdep]:
1485 if self.runq_complete[dep] != 1:
1486 alldeps = 0
1487 if alldeps == 1:
1488 self.setbuildable(revdep)
1489 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1490 taskname = self.rqdata.runq_task[revdep]
1491 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1492
1493 def task_complete(self, task):
1494 self.stats.taskCompleted()
1495 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1496 self.task_completeoutright(task)
1497
1498 def task_fail(self, task, exitcode):
1499 """
1500 Called when a task has failed
1501 Updates the state engine with the failure
1502 """
1503 self.stats.taskFailed()
1504 fnid = self.rqdata.runq_fnid[task]
1505 self.failed_fnids.append(fnid)
1506 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1507 if self.rqdata.taskData.abort:
1508 self.rq.state = runQueueCleanUp
1509
1510 def task_skip(self, task, reason):
1511 self.runq_running[task] = 1
1512 self.setbuildable(task)
1513 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1514 self.task_completeoutright(task)
1515 self.stats.taskCompleted()
1516 self.stats.taskSkipped()
1517
1518 def execute(self):
1519 """
1520 Run the tasks in a queue prepared by rqdata.prepare()
1521 """
1522
1523 self.rq.read_workers()
1524
1525
1526 if self.stats.total == 0:
1527 # nothing to do
1528 self.rq.state = runQueueCleanUp
1529
1530 task = self.sched.next()
1531 if task is not None:
1532 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1533 taskname = self.rqdata.runq_task[task]
1534
1535 if task in self.rq.scenequeue_covered:
1536 logger.debug(2, "Setscene covered task %s (%s)", task,
1537 self.rqdata.get_user_idstring(task))
1538 self.task_skip(task, "covered")
1539 return True
1540
1541 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
1542 logger.debug(2, "Stamp current task %s (%s)", task,
1543 self.rqdata.get_user_idstring(task))
1544 self.task_skip(task, "existing")
1545 return True
1546
1547 taskdep = self.rqdata.dataCache.task_deps[fn]
1548 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1549 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1550 noexec=True)
1551 bb.event.fire(startevent, self.cfgData)
1552 self.runq_running[task] = 1
1553 self.stats.taskActive()
1554 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1555 self.task_complete(task)
1556 return True
1557 else:
1558 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1559 bb.event.fire(startevent, self.cfgData)
1560
1561 taskdepdata = self.build_taskdepdata(task)
1562
1563 taskdep = self.rqdata.dataCache.task_deps[fn]
1564 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
1565 if not self.rq.fakeworker:
1566 self.rq.start_fakeworker(self)
1567 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1568 self.rq.fakeworker.stdin.flush()
1569 else:
1570 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1571 self.rq.worker.stdin.flush()
1572
1573 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1574 self.build_stamps2.append(self.build_stamps[task])
1575 self.runq_running[task] = 1
1576 self.stats.taskActive()
1577 if self.stats.active < self.number_tasks:
1578 return True
1579
1580 if self.stats.active > 0:
1581 self.rq.read_workers()
1582 return self.rq.active_fds()
1583
1584 if len(self.failed_fnids) != 0:
1585 self.rq.state = runQueueFailed
1586 return True
1587
1588 # Sanity Checks
1589 for task in xrange(self.stats.total):
1590 if self.runq_buildable[task] == 0:
1591 logger.error("Task %s never buildable!", task)
1592 if self.runq_running[task] == 0:
1593 logger.error("Task %s never ran!", task)
1594 if self.runq_complete[task] == 0:
1595 logger.error("Task %s never completed!", task)
1596 self.rq.state = runQueueComplete
1597
1598 return True
1599
1600 def build_taskdepdata(self, task):
1601 taskdepdata = {}
1602 next = self.rqdata.runq_depends[task]
1603 next.add(task)
1604 while next:
1605 additional = []
1606 for revdep in next:
1607 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1608 pn = self.rqdata.dataCache.pkg_fn[fn]
1609 taskname = self.rqdata.runq_task[revdep]
1610 deps = self.rqdata.runq_depends[revdep]
1611 taskdepdata[revdep] = [pn, taskname, fn, deps]
1612 for revdep2 in deps:
1613 if revdep2 not in taskdepdata:
1614 additional.append(revdep2)
1615 next = additional
1616
1617 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
1618 return taskdepdata
1619
1620class RunQueueExecuteScenequeue(RunQueueExecute):
1621 def __init__(self, rq):
1622 RunQueueExecute.__init__(self, rq)
1623
1624 self.scenequeue_covered = set()
1625 self.scenequeue_notcovered = set()
1626 self.scenequeue_notneeded = set()
1627
1628 # If we don't have any setscene functions, skip this step
1629 if len(self.rqdata.runq_setscene) == 0:
1630 rq.scenequeue_covered = set()
1631 rq.state = runQueueRunInit
1632 return
1633
1634 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1635
1636 sq_revdeps = []
1637 sq_revdeps_new = []
1638 sq_revdeps_squash = []
1639 self.sq_harddeps = {}
1640
1641 # We need to construct a dependency graph for the setscene functions. Intermediate
1642 # dependencies between the setscene tasks only complicate the code. This code
1643 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1644 # only containing the setscene functions.
1645
1646 for task in xrange(self.stats.total):
1647 self.runq_running.append(0)
1648 self.runq_complete.append(0)
1649 self.runq_buildable.append(0)
1650
1651 # First process the chains up to the first setscene task.
1652 endpoints = {}
1653 for task in xrange(len(self.rqdata.runq_fnid)):
1654 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1655 sq_revdeps_new.append(set())
1656 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1657 endpoints[task] = set()
1658
1659 # Secondly process the chains between setscene tasks.
1660 for task in self.rqdata.runq_setscene:
1661 for dep in self.rqdata.runq_depends[task]:
1662 if dep not in endpoints:
1663 endpoints[dep] = set()
1664 endpoints[dep].add(task)
1665
1666 def process_endpoints(endpoints):
1667 newendpoints = {}
1668 for point, task in endpoints.items():
1669 tasks = set()
1670 if task:
1671 tasks |= task
1672 if sq_revdeps_new[point]:
1673 tasks |= sq_revdeps_new[point]
1674 sq_revdeps_new[point] = set()
1675 if point in self.rqdata.runq_setscene:
1676 sq_revdeps_new[point] = tasks
1677 for dep in self.rqdata.runq_depends[point]:
1678 if point in sq_revdeps[dep]:
1679 sq_revdeps[dep].remove(point)
1680 if tasks:
1681 sq_revdeps_new[dep] |= tasks
1682 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1683 newendpoints[dep] = task
1684 if len(newendpoints) != 0:
1685 process_endpoints(newendpoints)
1686
1687 process_endpoints(endpoints)
1688
1689 # Build a list of setscene tasks which as "unskippable"
1690 # These are direct endpoints referenced by the build
1691 endpoints2 = {}
1692 sq_revdeps2 = []
1693 sq_revdeps_new2 = []
1694 def process_endpoints2(endpoints):
1695 newendpoints = {}
1696 for point, task in endpoints.items():
1697 tasks = set([point])
1698 if task:
1699 tasks |= task
1700 if sq_revdeps_new2[point]:
1701 tasks |= sq_revdeps_new2[point]
1702 sq_revdeps_new2[point] = set()
1703 if point in self.rqdata.runq_setscene:
1704 sq_revdeps_new2[point] = tasks
1705 for dep in self.rqdata.runq_depends[point]:
1706 if point in sq_revdeps2[dep]:
1707 sq_revdeps2[dep].remove(point)
1708 if tasks:
1709 sq_revdeps_new2[dep] |= tasks
1710 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1711 newendpoints[dep] = tasks
1712 if len(newendpoints) != 0:
1713 process_endpoints2(newendpoints)
1714 for task in xrange(len(self.rqdata.runq_fnid)):
1715 sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
1716 sq_revdeps_new2.append(set())
1717 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1718 endpoints2[task] = set()
1719 process_endpoints2(endpoints2)
1720 self.unskippable = []
1721 for task in self.rqdata.runq_setscene:
1722 if sq_revdeps_new2[task]:
1723 self.unskippable.append(self.rqdata.runq_setscene.index(task))
1724
1725 for task in xrange(len(self.rqdata.runq_fnid)):
1726 if task in self.rqdata.runq_setscene:
1727 deps = set()
1728 for dep in sq_revdeps_new[task]:
1729 deps.add(self.rqdata.runq_setscene.index(dep))
1730 sq_revdeps_squash.append(deps)
1731 elif len(sq_revdeps_new[task]) != 0:
1732 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1733
1734 # Resolve setscene inter-task dependencies
1735 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
1736 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
1737 for task in self.rqdata.runq_setscene:
1738 realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
1739 idepends = self.rqdata.taskData.tasks_idepends[realid]
1740 for (depid, idependtask) in idepends:
1741 if depid not in self.rqdata.taskData.build_targets:
1742 continue
1743
1744 depdata = self.rqdata.taskData.build_targets[depid][0]
1745 if depdata is None:
1746 continue
1747 dep = self.rqdata.taskData.fn_index[depdata]
1748 taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
1749 if taskid is None:
1750 bb.msg.fatal("RunQueue", "Task %s_setscene depends upon non-existent task %s:%s" % (self.rqdata.get_user_idstring(task), dep, idependtask))
1751
1752 if not self.rqdata.runq_setscene.index(taskid) in self.sq_harddeps:
1753 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)] = set()
1754 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)].add(self.rqdata.runq_setscene.index(task))
1755
1756 sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
1757 # Have to zero this to avoid circular dependencies
1758 sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
1759
1760 for task in self.sq_harddeps:
1761 for dep in self.sq_harddeps[task]:
1762 sq_revdeps_squash[dep].add(task)
1763
1764 #for task in xrange(len(sq_revdeps_squash)):
1765 # realtask = self.rqdata.runq_setscene[task]
1766 # bb.warn("Task %s: %s_setscene is %s " % (task, self.rqdata.get_user_idstring(realtask) , sq_revdeps_squash[task]))
1767
1768 self.sq_deps = []
1769 self.sq_revdeps = sq_revdeps_squash
1770 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1771
1772 for task in xrange(len(self.sq_revdeps)):
1773 self.sq_deps.append(set())
1774 for task in xrange(len(self.sq_revdeps)):
1775 for dep in self.sq_revdeps[task]:
1776 self.sq_deps[dep].add(task)
1777
1778 for task in xrange(len(self.sq_revdeps)):
1779 if len(self.sq_revdeps[task]) == 0:
1780 self.runq_buildable[task] = 1
1781
1782 self.outrightfail = []
1783 if self.rq.hashvalidate:
1784 sq_hash = []
1785 sq_hashfn = []
1786 sq_fn = []
1787 sq_taskname = []
1788 sq_task = []
1789 noexec = []
1790 stamppresent = []
1791 for task in xrange(len(self.sq_revdeps)):
1792 realtask = self.rqdata.runq_setscene[task]
1793 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1794 taskname = self.rqdata.runq_task[realtask]
1795 taskdep = self.rqdata.dataCache.task_deps[fn]
1796
1797 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1798 noexec.append(task)
1799 self.task_skip(task)
1800 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1801 continue
1802
1803 if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
1804 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1805 stamppresent.append(task)
1806 self.task_skip(task)
1807 continue
1808
1809 if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
1810 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1811 stamppresent.append(task)
1812 self.task_skip(task)
1813 continue
1814
1815 sq_fn.append(fn)
1816 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1817 sq_hash.append(self.rqdata.runq_hash[realtask])
1818 sq_taskname.append(taskname)
1819 sq_task.append(task)
1820 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1821 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
1822 valid = bb.utils.better_eval(call, locs)
1823
1824 valid_new = stamppresent
1825 for v in valid:
1826 valid_new.append(sq_task[v])
1827
1828 for task in xrange(len(self.sq_revdeps)):
1829 if task not in valid_new and task not in noexec:
1830 realtask = self.rqdata.runq_setscene[task]
1831 logger.debug(2, 'No package found, so skipping setscene task %s',
1832 self.rqdata.get_user_idstring(realtask))
1833 self.outrightfail.append(task)
1834
1835 logger.info('Executing SetScene Tasks')
1836
1837 self.rq.state = runQueueSceneRun
1838
1839 def scenequeue_updatecounters(self, task, fail = False):
1840 for dep in self.sq_deps[task]:
1841 if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]:
1842 realtask = self.rqdata.runq_setscene[task]
1843 realdep = self.rqdata.runq_setscene[dep]
1844 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (self.rqdata.get_user_idstring(realtask), self.rqdata.get_user_idstring(realdep)))
1845 continue
1846 self.sq_revdeps2[dep].remove(task)
1847 if len(self.sq_revdeps2[dep]) == 0:
1848 self.runq_buildable[dep] = 1
1849
1850 def task_completeoutright(self, task):
1851 """
1852 Mark a task as completed
1853 Look at the reverse dependencies and mark any task with
1854 completed dependencies as buildable
1855 """
1856
1857 index = self.rqdata.runq_setscene[task]
1858 logger.debug(1, 'Found task %s which could be accelerated',
1859 self.rqdata.get_user_idstring(index))
1860
1861 self.scenequeue_covered.add(task)
1862 self.scenequeue_updatecounters(task)
1863
1864 def task_complete(self, task):
1865 self.stats.taskCompleted()
1866 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1867 self.task_completeoutright(task)
1868
1869 def task_fail(self, task, result):
1870 self.stats.taskFailed()
1871 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1872 self.scenequeue_notcovered.add(task)
1873 self.scenequeue_updatecounters(task, True)
1874
1875 def task_failoutright(self, task):
1876 self.runq_running[task] = 1
1877 self.runq_buildable[task] = 1
1878 self.stats.taskCompleted()
1879 self.stats.taskSkipped()
1880 index = self.rqdata.runq_setscene[task]
1881 self.scenequeue_notcovered.add(task)
1882 self.scenequeue_updatecounters(task, True)
1883
1884 def task_skip(self, task):
1885 self.runq_running[task] = 1
1886 self.runq_buildable[task] = 1
1887 self.task_completeoutright(task)
1888 self.stats.taskCompleted()
1889 self.stats.taskSkipped()
1890
1891 def execute(self):
1892 """
1893 Run the tasks in a queue prepared by prepare_runqueue
1894 """
1895
1896 self.rq.read_workers()
1897
1898 task = None
1899 if self.stats.active < self.number_tasks:
1900 # Find the next setscene to run
1901 for nexttask in xrange(self.stats.total):
1902 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1903 if nexttask in self.unskippable:
1904 logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1905 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
1906 realtask = self.rqdata.runq_setscene[nexttask]
1907 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1908 foundtarget = False
1909 for target in self.rqdata.target_pairs:
1910 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1911 foundtarget = True
1912 break
1913 if not foundtarget:
1914 logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1915 self.task_skip(nexttask)
1916 self.scenequeue_notneeded.add(nexttask)
1917 return True
1918 if nexttask in self.outrightfail:
1919 self.task_failoutright(nexttask)
1920 return True
1921 task = nexttask
1922 break
1923 if task is not None:
1924 realtask = self.rqdata.runq_setscene[task]
1925 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1926
1927 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1928 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
1929 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1930 task, self.rqdata.get_user_idstring(realtask))
1931 self.task_failoutright(task)
1932 return True
1933
1934 if self.cooker.configuration.force:
1935 for target in self.rqdata.target_pairs:
1936 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1937 self.task_failoutright(task)
1938 return True
1939
1940 if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
1941 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1942 task, self.rqdata.get_user_idstring(realtask))
1943 self.task_skip(task)
1944 return True
1945
1946 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1947 bb.event.fire(startevent, self.cfgData)
1948
1949 taskdep = self.rqdata.dataCache.task_deps[fn]
1950 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1951 if not self.rq.fakeworker:
1952 self.rq.start_fakeworker(self)
1953 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1954 self.rq.fakeworker.stdin.flush()
1955 else:
1956 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1957 self.rq.worker.stdin.flush()
1958
1959 self.runq_running[task] = 1
1960 self.stats.taskActive()
1961 if self.stats.active < self.number_tasks:
1962 return True
1963
1964 if self.stats.active > 0:
1965 self.rq.read_workers()
1966 return self.rq.active_fds()
1967
1968 #for task in xrange(self.stats.total):
1969 # if self.runq_running[task] != 1:
1970 # buildable = self.runq_buildable[task]
1971 # revdeps = self.sq_revdeps[task]
1972 # bb.warn("Found we didn't run %s %s %s %s" % (task, buildable, str(revdeps), self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
1973
1974 # Convert scenequeue_covered task numbers into full taskgraph ids
1975 oldcovered = self.scenequeue_covered
1976 self.rq.scenequeue_covered = set()
1977 for task in oldcovered:
1978 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1979 self.rq.scenequeue_notcovered = set()
1980 for task in self.scenequeue_notcovered:
1981 self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task])
1982
1983 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1984
1985 self.rq.state = runQueueRunInit
1986 return True
1987
1988 def runqueue_process_waitpid(self, task, status):
1989 task = self.rq.rqdata.runq_setscene.index(task)
1990
1991 RunQueueExecute.runqueue_process_waitpid(self, task, status)
1992
1993class TaskFailure(Exception):
1994 """
1995 Exception raised when a task in a runqueue fails
1996 """
1997 def __init__(self, x):
1998 self.args = x
1999
2000
2001class runQueueExitWait(bb.event.Event):
2002 """
2003 Event when waiting for task processes to exit
2004 """
2005
2006 def __init__(self, remain):
2007 self.remain = remain
2008 self.message = "Waiting for %s active tasks to finish" % remain
2009 bb.event.Event.__init__(self)
2010
2011class runQueueEvent(bb.event.Event):
2012 """
2013 Base runQueue event class
2014 """
2015 def __init__(self, task, stats, rq):
2016 self.taskid = task
2017 self.taskstring = rq.rqdata.get_user_idstring(task)
2018 self.taskname = rq.rqdata.get_task_name(task)
2019 self.taskfile = rq.rqdata.get_task_file(task)
2020 self.taskhash = rq.rqdata.get_task_hash(task)
2021 self.stats = stats.copy()
2022 bb.event.Event.__init__(self)
2023
2024class sceneQueueEvent(runQueueEvent):
2025 """
2026 Base sceneQueue event class
2027 """
2028 def __init__(self, task, stats, rq, noexec=False):
2029 runQueueEvent.__init__(self, task, stats, rq)
2030 realtask = rq.rqdata.runq_setscene[task]
2031 self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
2032 self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene"
2033 self.taskfile = rq.rqdata.get_task_file(realtask)
2034 self.taskhash = rq.rqdata.get_task_hash(realtask)
2035
2036class runQueueTaskStarted(runQueueEvent):
2037 """
2038 Event notifing a task was started
2039 """
2040 def __init__(self, task, stats, rq, noexec=False):
2041 runQueueEvent.__init__(self, task, stats, rq)
2042 self.noexec = noexec
2043
2044class sceneQueueTaskStarted(sceneQueueEvent):
2045 """
2046 Event notifing a setscene task was started
2047 """
2048 def __init__(self, task, stats, rq, noexec=False):
2049 sceneQueueEvent.__init__(self, task, stats, rq)
2050 self.noexec = noexec
2051
2052class runQueueTaskFailed(runQueueEvent):
2053 """
2054 Event notifing a task failed
2055 """
2056 def __init__(self, task, stats, exitcode, rq):
2057 runQueueEvent.__init__(self, task, stats, rq)
2058 self.exitcode = exitcode
2059
2060class sceneQueueTaskFailed(sceneQueueEvent):
2061 """
2062 Event notifing a setscene task failed
2063 """
2064 def __init__(self, task, stats, exitcode, rq):
2065 sceneQueueEvent.__init__(self, task, stats, rq)
2066 self.exitcode = exitcode
2067
2068class runQueueTaskCompleted(runQueueEvent):
2069 """
2070 Event notifing a task completed
2071 """
2072
2073class sceneQueueTaskCompleted(sceneQueueEvent):
2074 """
2075 Event notifing a setscene task completed
2076 """
2077
2078class runQueueTaskSkipped(runQueueEvent):
2079 """
2080 Event notifing a task was skipped
2081 """
2082 def __init__(self, task, stats, rq, reason):
2083 runQueueEvent.__init__(self, task, stats, rq)
2084 self.reason = reason
2085
2086class runQueuePipe():
2087 """
2088 Abstraction for a pipe between a worker thread and the server
2089 """
2090 def __init__(self, pipein, pipeout, d, rq, rqexec):
2091 self.input = pipein
2092 if pipeout:
2093 pipeout.close()
2094 bb.utils.nonblockingfd(self.input)
2095 self.queue = ""
2096 self.d = d
2097 self.rq = rq
2098 self.rqexec = rqexec
2099
2100 def setrunqueueexec(self, rqexec):
2101 self.rqexec = rqexec
2102
2103 def read(self):
2104 for w in [self.rq.worker, self.rq.fakeworker]:
2105 if not w:
2106 continue
2107 w.poll()
2108 if w.returncode is not None and not self.rq.teardown:
2109 name = None
2110 if self.rq.worker and w.pid == self.rq.worker.pid:
2111 name = "Worker"
2112 elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
2113 name = "Fakeroot"
2114 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
2115 self.rq.finish_runqueue(True)
2116
2117 start = len(self.queue)
2118 try:
2119 self.queue = self.queue + self.input.read(102400)
2120 except (OSError, IOError) as e:
2121 if e.errno != errno.EAGAIN:
2122 raise
2123 end = len(self.queue)
2124 found = True
2125 while found and len(self.queue):
2126 found = False
2127 index = self.queue.find("</event>")
2128 while index != -1 and self.queue.startswith("<event>"):
2129 try:
2130 event = pickle.loads(self.queue[7:index])
2131 except ValueError as e:
2132 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2133 bb.event.fire_from_worker(event, self.d)
2134 found = True
2135 self.queue = self.queue[index+8:]
2136 index = self.queue.find("</event>")
2137 index = self.queue.find("</exitcode>")
2138 while index != -1 and self.queue.startswith("<exitcode>"):
2139 try:
2140 task, status = pickle.loads(self.queue[10:index])
2141 except ValueError as e:
2142 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
2143 self.rqexec.runqueue_process_waitpid(task, status)
2144 found = True
2145 self.queue = self.queue[index+11:]
2146 index = self.queue.find("</exitcode>")
2147 return (end > start)
2148
2149 def close(self):
2150 while self.read():
2151 continue
2152 if len(self.queue) > 0:
2153 print("Warning, worker left partial message: %s" % self.queue)
2154 self.input.close()