summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py2172
1 files changed, 2172 insertions, 0 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
new file mode 100644
index 0000000000..90c610695f
--- /dev/null
+++ b/bitbake/lib/bb/runqueue.py
@@ -0,0 +1,2172 @@
1#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import re
34import bb
35from bb import msg, data, event
36from bb import monitordisk
37import subprocess
38
39try:
40 import cPickle as pickle
41except ImportError:
42 import pickle
43
44bblogger = logging.getLogger("BitBake")
45logger = logging.getLogger("BitBake.RunQueue")
46
47__find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' )
48
49class RunQueueStats:
50 """
51 Holds statistics on the tasks handled by the associated runQueue
52 """
53 def __init__(self, total):
54 self.completed = 0
55 self.skipped = 0
56 self.failed = 0
57 self.active = 0
58 self.total = total
59
60 def copy(self):
61 obj = self.__class__(self.total)
62 obj.__dict__.update(self.__dict__)
63 return obj
64
65 def taskFailed(self):
66 self.active = self.active - 1
67 self.failed = self.failed + 1
68
69 def taskCompleted(self, number = 1):
70 self.active = self.active - number
71 self.completed = self.completed + number
72
73 def taskSkipped(self, number = 1):
74 self.active = self.active + number
75 self.skipped = self.skipped + number
76
77 def taskActive(self):
78 self.active = self.active + 1
79
80# These values indicate the next step due to be run in the
81# runQueue state machine
82runQueuePrepare = 2
83runQueueSceneInit = 3
84runQueueSceneRun = 4
85runQueueRunInit = 5
86runQueueRunning = 6
87runQueueFailed = 7
88runQueueCleanUp = 8
89runQueueComplete = 9
90
91class RunQueueScheduler(object):
92 """
93 Control the order tasks are scheduled in.
94 """
95 name = "basic"
96
97 def __init__(self, runqueue, rqdata):
98 """
99 The default scheduler just returns the first buildable task (the
100 priority map is sorted by task number)
101 """
102 self.rq = runqueue
103 self.rqdata = rqdata
104 self.numTasks = len(self.rqdata.runq_fnid)
105
106 self.prio_map = []
107 self.prio_map.extend(range(self.numTasks))
108
109 self.buildable = []
110 self.stamps = {}
111 for taskid in xrange(self.numTasks):
112 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
113 taskname = self.rqdata.runq_task[taskid]
114 self.stamps[taskid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
115 if self.rq.runq_buildable[taskid] == 1:
116 self.buildable.append(taskid)
117
118 self.rev_prio_map = None
119
120 def next_buildable_task(self):
121 """
122 Return the id of the first task we find that is buildable
123 """
124 self.buildable = [x for x in self.buildable if not self.rq.runq_running[x] == 1]
125 if not self.buildable:
126 return None
127 if len(self.buildable) == 1:
128 taskid = self.buildable[0]
129 stamp = self.stamps[taskid]
130 if stamp not in self.rq.build_stamps.itervalues():
131 return taskid
132
133 if not self.rev_prio_map:
134 self.rev_prio_map = range(self.numTasks)
135 for taskid in xrange(self.numTasks):
136 self.rev_prio_map[self.prio_map[taskid]] = taskid
137
138 best = None
139 bestprio = None
140 for taskid in self.buildable:
141 prio = self.rev_prio_map[taskid]
142 if bestprio is None or bestprio > prio:
143 stamp = self.stamps[taskid]
144 if stamp in self.rq.build_stamps.itervalues():
145 continue
146 bestprio = prio
147 best = taskid
148
149 return best
150
151 def next(self):
152 """
153 Return the id of the task we should build next
154 """
155 if self.rq.stats.active < self.rq.number_tasks:
156 return self.next_buildable_task()
157
158 def newbuilable(self, task):
159 self.buildable.append(task)
160
161class RunQueueSchedulerSpeed(RunQueueScheduler):
162 """
163 A scheduler optimised for speed. The priority map is sorted by task weight,
164 heavier weighted tasks (tasks needed by the most other tasks) are run first.
165 """
166 name = "speed"
167
168 def __init__(self, runqueue, rqdata):
169 """
170 The priority map is sorted by task weight.
171 """
172 RunQueueScheduler.__init__(self, runqueue, rqdata)
173
174 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
175 copyweight = copy.deepcopy(self.rqdata.runq_weight)
176 self.prio_map = []
177
178 for weight in sortweight:
179 idx = copyweight.index(weight)
180 self.prio_map.append(idx)
181 copyweight[idx] = -1
182
183 self.prio_map.reverse()
184
185class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
186 """
187 A scheduler optimised to complete .bb files are quickly as possible. The
188 priority map is sorted by task weight, but then reordered so once a given
189 .bb file starts to build, it's completed as quickly as possible. This works
190 well where disk space is at a premium and classes like OE's rm_work are in
191 force.
192 """
193 name = "completion"
194
195 def __init__(self, runqueue, rqdata):
196 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
197
198 #FIXME - whilst this groups all fnids together it does not reorder the
199 #fnid groups optimally.
200
201 basemap = copy.deepcopy(self.prio_map)
202 self.prio_map = []
203 while (len(basemap) > 0):
204 entry = basemap.pop(0)
205 self.prio_map.append(entry)
206 fnid = self.rqdata.runq_fnid[entry]
207 todel = []
208 for entry in basemap:
209 entry_fnid = self.rqdata.runq_fnid[entry]
210 if entry_fnid == fnid:
211 todel.append(basemap.index(entry))
212 self.prio_map.append(entry)
213 todel.reverse()
214 for idx in todel:
215 del basemap[idx]
216
217class RunQueueData:
218 """
219 BitBake Run Queue implementation
220 """
221 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
222 self.cooker = cooker
223 self.dataCache = dataCache
224 self.taskData = taskData
225 self.targets = targets
226 self.rq = rq
227 self.warn_multi_bb = False
228
229 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
230 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
231
232 self.reset()
233
234 def reset(self):
235 self.runq_fnid = []
236 self.runq_task = []
237 self.runq_depends = []
238 self.runq_revdeps = []
239 self.runq_hash = []
240
241 def runq_depends_names(self, ids):
242 import re
243 ret = []
244 for id in self.runq_depends[ids]:
245 nam = os.path.basename(self.get_user_idstring(id))
246 nam = re.sub("_[^,]*,", ",", nam)
247 ret.extend([nam])
248 return ret
249
250 def get_task_name(self, task):
251 return self.runq_task[task]
252
253 def get_task_file(self, task):
254 return self.taskData.fn_index[self.runq_fnid[task]]
255
256 def get_task_hash(self, task):
257 return self.runq_hash[task]
258
259 def get_user_idstring(self, task, task_name_suffix = ""):
260 fn = self.taskData.fn_index[self.runq_fnid[task]]
261 taskname = self.runq_task[task] + task_name_suffix
262 return "%s, %s" % (fn, taskname)
263
264 def get_task_id(self, fnid, taskname):
265 for listid in xrange(len(self.runq_fnid)):
266 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
267 return listid
268 return None
269
270 def circular_depchains_handler(self, tasks):
271 """
272 Some tasks aren't buildable, likely due to circular dependency issues.
273 Identify the circular dependencies and print them in a user readable format.
274 """
275 from copy import deepcopy
276
277 valid_chains = []
278 explored_deps = {}
279 msgs = []
280
281 def chain_reorder(chain):
282 """
283 Reorder a dependency chain so the lowest task id is first
284 """
285 lowest = 0
286 new_chain = []
287 for entry in xrange(len(chain)):
288 if chain[entry] < chain[lowest]:
289 lowest = entry
290 new_chain.extend(chain[lowest:])
291 new_chain.extend(chain[:lowest])
292 return new_chain
293
294 def chain_compare_equal(chain1, chain2):
295 """
296 Compare two dependency chains and see if they're the same
297 """
298 if len(chain1) != len(chain2):
299 return False
300 for index in xrange(len(chain1)):
301 if chain1[index] != chain2[index]:
302 return False
303 return True
304
305 def chain_array_contains(chain, chain_array):
306 """
307 Return True if chain_array contains chain
308 """
309 for ch in chain_array:
310 if chain_compare_equal(ch, chain):
311 return True
312 return False
313
314 def find_chains(taskid, prev_chain):
315 prev_chain.append(taskid)
316 total_deps = []
317 total_deps.extend(self.runq_revdeps[taskid])
318 for revdep in self.runq_revdeps[taskid]:
319 if revdep in prev_chain:
320 idx = prev_chain.index(revdep)
321 # To prevent duplicates, reorder the chain to start with the lowest taskid
322 # and search through an array of those we've already printed
323 chain = prev_chain[idx:]
324 new_chain = chain_reorder(chain)
325 if not chain_array_contains(new_chain, valid_chains):
326 valid_chains.append(new_chain)
327 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
328 for dep in new_chain:
329 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
330 msgs.append("\n")
331 if len(valid_chains) > 10:
332 msgs.append("Aborted dependency loops search after 10 matches.\n")
333 return msgs
334 continue
335 scan = False
336 if revdep not in explored_deps:
337 scan = True
338 elif revdep in explored_deps[revdep]:
339 scan = True
340 else:
341 for dep in prev_chain:
342 if dep in explored_deps[revdep]:
343 scan = True
344 if scan:
345 find_chains(revdep, copy.deepcopy(prev_chain))
346 for dep in explored_deps[revdep]:
347 if dep not in total_deps:
348 total_deps.append(dep)
349
350 explored_deps[taskid] = total_deps
351
352 for task in tasks:
353 find_chains(task, [])
354
355 return msgs
356
357 def calculate_task_weights(self, endpoints):
358 """
359 Calculate a number representing the "weight" of each task. Heavier weighted tasks
360 have more dependencies and hence should be executed sooner for maximum speed.
361
362 This function also sanity checks the task list finding tasks that are not
363 possible to execute due to circular dependencies.
364 """
365
366 numTasks = len(self.runq_fnid)
367 weight = []
368 deps_left = []
369 task_done = []
370
371 for listid in xrange(numTasks):
372 task_done.append(False)
373 weight.append(1)
374 deps_left.append(len(self.runq_revdeps[listid]))
375
376 for listid in endpoints:
377 weight[listid] = 10
378 task_done[listid] = True
379
380 while True:
381 next_points = []
382 for listid in endpoints:
383 for revdep in self.runq_depends[listid]:
384 weight[revdep] = weight[revdep] + weight[listid]
385 deps_left[revdep] = deps_left[revdep] - 1
386 if deps_left[revdep] == 0:
387 next_points.append(revdep)
388 task_done[revdep] = True
389 endpoints = next_points
390 if len(next_points) == 0:
391 break
392
393 # Circular dependency sanity check
394 problem_tasks = []
395 for task in xrange(numTasks):
396 if task_done[task] is False or deps_left[task] != 0:
397 problem_tasks.append(task)
398 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
399 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
400
401 if problem_tasks:
402 message = "Unbuildable tasks were found.\n"
403 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
404 message = message + "Identifying dependency loops (this may take a short while)...\n"
405 logger.error(message)
406
407 msgs = self.circular_depchains_handler(problem_tasks)
408
409 message = "\n"
410 for msg in msgs:
411 message = message + msg
412 bb.msg.fatal("RunQueue", message)
413
414 return weight
415
416 def prepare(self):
417 """
418 Turn a set of taskData into a RunQueue and compute data needed
419 to optimise the execution order.
420 """
421
422 runq_build = []
423 recursivetasks = {}
424 recursiveitasks = {}
425 recursivetasksselfref = set()
426
427 taskData = self.taskData
428
429 if len(taskData.tasks_name) == 0:
430 # Nothing to do
431 return 0
432
433 logger.info("Preparing runqueue")
434
435 # Step A - Work out a list of tasks to run
436 #
437 # Taskdata gives us a list of possible providers for every build and run
438 # target ordered by priority. It also gives information on each of those
439 # providers.
440 #
441 # To create the actual list of tasks to execute we fix the list of
442 # providers and then resolve the dependencies into task IDs. This
443 # process is repeated for each type of dependency (tdepends, deptask,
444 # rdeptast, recrdeptask, idepends).
445
446 def add_build_dependencies(depids, tasknames, depends):
447 for depid in depids:
448 # Won't be in build_targets if ASSUME_PROVIDED
449 if depid not in taskData.build_targets:
450 continue
451 depdata = taskData.build_targets[depid][0]
452 if depdata is None:
453 continue
454 for taskname in tasknames:
455 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
456 if taskid is not None:
457 depends.add(taskid)
458
459 def add_runtime_dependencies(depids, tasknames, depends):
460 for depid in depids:
461 if depid not in taskData.run_targets:
462 continue
463 depdata = taskData.run_targets[depid][0]
464 if depdata is None:
465 continue
466 for taskname in tasknames:
467 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
468 if taskid is not None:
469 depends.add(taskid)
470
471 def add_resolved_dependencies(depids, tasknames, depends):
472 for depid in depids:
473 for taskname in tasknames:
474 taskid = taskData.gettask_id_fromfnid(depid, taskname)
475 if taskid is not None:
476 depends.add(taskid)
477
478 for task in xrange(len(taskData.tasks_name)):
479 depends = set()
480 fnid = taskData.tasks_fnid[task]
481 fn = taskData.fn_index[fnid]
482 task_deps = self.dataCache.task_deps[fn]
483
484 #logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
485
486 if fnid not in taskData.failed_fnids:
487
488 # Resolve task internal dependencies
489 #
490 # e.g. addtask before X after Y
491 depends = set(taskData.tasks_tdepends[task])
492
493 # Resolve 'deptask' dependencies
494 #
495 # e.g. do_sometask[deptask] = "do_someothertask"
496 # (makes sure sometask runs after someothertask of all DEPENDS)
497 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
498 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
499 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
500
501 # Resolve 'rdeptask' dependencies
502 #
503 # e.g. do_sometask[rdeptask] = "do_someothertask"
504 # (makes sure sometask runs after someothertask of all RDEPENDS)
505 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
506 tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
507 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
508
509 # Resolve inter-task dependencies
510 #
511 # e.g. do_sometask[depends] = "targetname:do_someothertask"
512 # (makes sure sometask runs after targetname's someothertask)
513 idepends = taskData.tasks_idepends[task]
514 for (depid, idependtask) in idepends:
515 if depid in taskData.build_targets and not depid in taskData.failed_deps:
516 # Won't be in build_targets if ASSUME_PROVIDED
517 depdata = taskData.build_targets[depid][0]
518 if depdata is not None:
519 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
520 if taskid is None:
521 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
522 depends.add(taskid)
523 irdepends = taskData.tasks_irdepends[task]
524 for (depid, idependtask) in irdepends:
525 if depid in taskData.run_targets:
526 # Won't be in run_targets if ASSUME_PROVIDED
527 depdata = taskData.run_targets[depid][0]
528 if depdata is not None:
529 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
530 if taskid is None:
531 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
532 depends.add(taskid)
533
534 # Resolve recursive 'recrdeptask' dependencies (Part A)
535 #
536 # e.g. do_sometask[recrdeptask] = "do_someothertask"
537 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
538 # We cover the recursive part of the dependencies below
539 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
540 tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
541 recursivetasks[task] = tasknames
542 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
543 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
544 if taskData.tasks_name[task] in tasknames:
545 recursivetasksselfref.add(task)
546
547 if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']:
548 recursiveitasks[task] = []
549 for t in task_deps['recideptask'][taskData.tasks_name[task]].split():
550 newdep = taskData.gettask_id_fromfnid(fnid, t)
551 recursiveitasks[task].append(newdep)
552
553 self.runq_fnid.append(taskData.tasks_fnid[task])
554 self.runq_task.append(taskData.tasks_name[task])
555 self.runq_depends.append(depends)
556 self.runq_revdeps.append(set())
557 self.runq_hash.append("")
558
559 runq_build.append(0)
560
561 # Resolve recursive 'recrdeptask' dependencies (Part B)
562 #
563 # e.g. do_sometask[recrdeptask] = "do_someothertask"
564 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
565 # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
566 extradeps = {}
567 for task in recursivetasks:
568 extradeps[task] = set(self.runq_depends[task])
569 tasknames = recursivetasks[task]
570 seendeps = set()
571 seenfnid = []
572
573 def generate_recdeps(t):
574 newdeps = set()
575 add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
576 extradeps[task].update(newdeps)
577 seendeps.add(t)
578 newdeps.add(t)
579 for i in newdeps:
580 for n in self.runq_depends[i]:
581 if n not in seendeps:
582 generate_recdeps(n)
583 generate_recdeps(task)
584
585 if task in recursiveitasks:
586 for dep in recursiveitasks[task]:
587 generate_recdeps(dep)
588
589 # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
590 for task in recursivetasks:
591 extradeps[task].difference_update(recursivetasksselfref)
592
593 for task in xrange(len(taskData.tasks_name)):
594 # Add in extra dependencies
595 if task in extradeps:
596 self.runq_depends[task] = extradeps[task]
597 # Remove all self references
598 if task in self.runq_depends[task]:
599 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
600 self.runq_depends[task].remove(task)
601
602 # Step B - Mark all active tasks
603 #
604 # Start with the tasks we were asked to run and mark all dependencies
605 # as active too. If the task is to be 'forced', clear its stamp. Once
606 # all active tasks are marked, prune the ones we don't need.
607
608 logger.verbose("Marking Active Tasks")
609
610 def mark_active(listid, depth):
611 """
612 Mark an item as active along with its depends
613 (calls itself recursively)
614 """
615
616 if runq_build[listid] == 1:
617 return
618
619 runq_build[listid] = 1
620
621 depends = self.runq_depends[listid]
622 for depend in depends:
623 mark_active(depend, depth+1)
624
625 self.target_pairs = []
626 for target in self.targets:
627 targetid = taskData.getbuild_id(target[0])
628
629 if targetid not in taskData.build_targets:
630 continue
631
632 if targetid in taskData.failed_deps:
633 continue
634
635 fnid = taskData.build_targets[targetid][0]
636 fn = taskData.fn_index[fnid]
637 self.target_pairs.append((fn, target[1]))
638
639 if fnid in taskData.failed_fnids:
640 continue
641
642 if target[1] not in taskData.tasks_lookup[fnid]:
643 import difflib
644 close_matches = difflib.get_close_matches(target[1], taskData.tasks_lookup[fnid], cutoff=0.7)
645 if close_matches:
646 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
647 else:
648 extra = ""
649 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (target[1], target[0], extra))
650
651 listid = taskData.tasks_lookup[fnid][target[1]]
652
653 mark_active(listid, 1)
654
655 # Step C - Prune all inactive tasks
656 #
657 # Once all active tasks are marked, prune the ones we don't need.
658
659 maps = []
660 delcount = 0
661 for listid in xrange(len(self.runq_fnid)):
662 if runq_build[listid-delcount] == 1:
663 maps.append(listid-delcount)
664 else:
665 del self.runq_fnid[listid-delcount]
666 del self.runq_task[listid-delcount]
667 del self.runq_depends[listid-delcount]
668 del runq_build[listid-delcount]
669 del self.runq_revdeps[listid-delcount]
670 del self.runq_hash[listid-delcount]
671 delcount = delcount + 1
672 maps.append(-1)
673
674 #
675 # Step D - Sanity checks and computation
676 #
677
678 # Check to make sure we still have tasks to run
679 if len(self.runq_fnid) == 0:
680 if not taskData.abort:
681 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
682 else:
683 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
684
685 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
686
687 # Remap the dependencies to account for the deleted tasks
688 # Check we didn't delete a task we depend on
689 for listid in xrange(len(self.runq_fnid)):
690 newdeps = []
691 origdeps = self.runq_depends[listid]
692 for origdep in origdeps:
693 if maps[origdep] == -1:
694 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
695 newdeps.append(maps[origdep])
696 self.runq_depends[listid] = set(newdeps)
697
698 logger.verbose("Assign Weightings")
699
700 # Generate a list of reverse dependencies to ease future calculations
701 for listid in xrange(len(self.runq_fnid)):
702 for dep in self.runq_depends[listid]:
703 self.runq_revdeps[dep].add(listid)
704
705 # Identify tasks at the end of dependency chains
706 # Error on circular dependency loops (length two)
707 endpoints = []
708 for listid in xrange(len(self.runq_fnid)):
709 revdeps = self.runq_revdeps[listid]
710 if len(revdeps) == 0:
711 endpoints.append(listid)
712 for dep in revdeps:
713 if dep in self.runq_depends[listid]:
714 #self.dump_data(taskData)
715 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
716
717 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
718
719 # Calculate task weights
720 # Check of higher length circular dependencies
721 self.runq_weight = self.calculate_task_weights(endpoints)
722
723 # Sanity Check - Check for multiple tasks building the same provider
724 prov_list = {}
725 seen_fn = []
726 for task in xrange(len(self.runq_fnid)):
727 fn = taskData.fn_index[self.runq_fnid[task]]
728 if fn in seen_fn:
729 continue
730 seen_fn.append(fn)
731 for prov in self.dataCache.fn_provides[fn]:
732 if prov not in prov_list:
733 prov_list[prov] = [fn]
734 elif fn not in prov_list[prov]:
735 prov_list[prov].append(fn)
736 for prov in prov_list:
737 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
738 seen_pn = []
739 # If two versions of the same PN are being built its fatal, we don't support it.
740 for fn in prov_list[prov]:
741 pn = self.dataCache.pkg_fn[fn]
742 if pn not in seen_pn:
743 seen_pn.append(pn)
744 else:
745 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
746 msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov]))
747 if self.warn_multi_bb:
748 logger.warn(msg)
749 else:
750 msg += "\n This usually means one provides something the other doesn't and should."
751 logger.error(msg)
752
753 # Create a whitelist usable by the stamp checks
754 stampfnwhitelist = []
755 for entry in self.stampwhitelist.split():
756 entryid = self.taskData.getbuild_id(entry)
757 if entryid not in self.taskData.build_targets:
758 continue
759 fnid = self.taskData.build_targets[entryid][0]
760 fn = self.taskData.fn_index[fnid]
761 stampfnwhitelist.append(fn)
762 self.stampfnwhitelist = stampfnwhitelist
763
764 # Iterate over the task list looking for tasks with a 'setscene' function
765 self.runq_setscene = []
766 if not self.cooker.configuration.nosetscene:
767 for task in range(len(self.runq_fnid)):
768 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
769 if not setscene:
770 continue
771 self.runq_setscene.append(task)
772
773 def invalidate_task(fn, taskname, error_nostamp):
774 taskdep = self.dataCache.task_deps[fn]
775 fnid = self.taskData.getfn_id(fn)
776 if taskname not in taskData.tasks_lookup[fnid]:
777 logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname)
778 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
779 if error_nostamp:
780 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
781 else:
782 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
783 else:
784 logger.verbose("Invalidate task %s, %s", taskname, fn)
785 bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
786
787 # Invalidate task if force mode active
788 if self.cooker.configuration.force:
789 for (fn, target) in self.target_pairs:
790 invalidate_task(fn, target, False)
791
792 # Invalidate task if invalidate mode active
793 if self.cooker.configuration.invalidate_stamp:
794 for (fn, target) in self.target_pairs:
795 for st in self.cooker.configuration.invalidate_stamp.split(','):
796 invalidate_task(fn, "do_%s" % st, True)
797
798 # Iterate over the task list and call into the siggen code
799 dealtwith = set()
800 todeal = set(range(len(self.runq_fnid)))
801 while len(todeal) > 0:
802 for task in todeal.copy():
803 if len(self.runq_depends[task] - dealtwith) == 0:
804 dealtwith.add(task)
805 todeal.remove(task)
806 procdep = []
807 for dep in self.runq_depends[task]:
808 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
809 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
810
811 return len(self.runq_fnid)
812
813 def dump_data(self, taskQueue):
814 """
815 Dump some debug information on the internal data structures
816 """
817 logger.debug(3, "run_tasks:")
818 for task in xrange(len(self.rqdata.runq_task)):
819 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
820 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
821 self.rqdata.runq_task[task],
822 self.rqdata.runq_weight[task],
823 self.rqdata.runq_depends[task],
824 self.rqdata.runq_revdeps[task])
825
826 logger.debug(3, "sorted_tasks:")
827 for task1 in xrange(len(self.rqdata.runq_task)):
828 if task1 in self.prio_map:
829 task = self.prio_map[task1]
830 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
831 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
832 self.rqdata.runq_task[task],
833 self.rqdata.runq_weight[task],
834 self.rqdata.runq_depends[task],
835 self.rqdata.runq_revdeps[task])
836
837class RunQueue:
838 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
839
840 self.cooker = cooker
841 self.cfgData = cfgData
842 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
843
844 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
845 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
846 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
847 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
848
849 self.state = runQueuePrepare
850
851 # For disk space monitor
852 self.dm = monitordisk.diskMonitor(cfgData)
853
854 self.rqexe = None
855 self.worker = None
856 self.workerpipe = None
857 self.fakeworker = None
858 self.fakeworkerpipe = None
859
860 def _start_worker(self, fakeroot = False, rqexec = None):
861 logger.debug(1, "Starting bitbake-worker")
862 magic = "decafbad"
863 if self.cooker.configuration.profile:
864 magic = "decafbadbad"
865 if fakeroot:
866 fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
867 fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
868 env = os.environ.copy()
869 for key, value in (var.split('=') for var in fakerootenv):
870 env[key] = value
871 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
872 else:
873 worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
874 bb.utils.nonblockingfd(worker.stdout)
875 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec)
876
877 workerdata = {
878 "taskdeps" : self.rqdata.dataCache.task_deps,
879 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
880 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
881 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
882 "sigdata" : bb.parse.siggen.get_taskdata(),
883 "runq_hash" : self.rqdata.runq_hash,
884 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
885 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
886 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
887 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
888 "prhost" : self.cooker.prhost,
889 "buildname" : self.cfgData.getVar("BUILDNAME", True),
890 "date" : self.cfgData.getVar("DATE", True),
891 "time" : self.cfgData.getVar("TIME", True),
892 }
893
894 worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
895 worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
896 worker.stdin.flush()
897
898 return worker, workerpipe
899
900 def _teardown_worker(self, worker, workerpipe):
901 if not worker:
902 return
903 logger.debug(1, "Teardown for bitbake-worker")
904 try:
905 worker.stdin.write("<quit></quit>")
906 worker.stdin.flush()
907 except IOError:
908 pass
909 while worker.returncode is None:
910 workerpipe.read()
911 worker.poll()
912 while workerpipe.read():
913 continue
914 workerpipe.close()
915
916 def start_worker(self):
917 if self.worker:
918 self.teardown_workers()
919 self.teardown = False
920 self.worker, self.workerpipe = self._start_worker()
921
922 def start_fakeworker(self, rqexec):
923 if not self.fakeworker:
924 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
925
926 def teardown_workers(self):
927 self.teardown = True
928 self._teardown_worker(self.worker, self.workerpipe)
929 self.worker = None
930 self.workerpipe = None
931 self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
932 self.fakeworker = None
933 self.fakeworkerpipe = None
934
935 def read_workers(self):
936 self.workerpipe.read()
937 if self.fakeworkerpipe:
938 self.fakeworkerpipe.read()
939
940 def active_fds(self):
941 fds = []
942 if self.workerpipe:
943 fds.append(self.workerpipe.input)
944 if self.fakeworkerpipe:
945 fds.append(self.fakeworkerpipe.input)
946 return fds
947
948 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
949 def get_timestamp(f):
950 try:
951 if not os.access(f, os.F_OK):
952 return None
953 return os.stat(f)[stat.ST_MTIME]
954 except:
955 return None
956
957 if self.stamppolicy == "perfile":
958 fulldeptree = False
959 else:
960 fulldeptree = True
961 stampwhitelist = []
962 if self.stamppolicy == "whitelist":
963 stampwhitelist = self.rqdata.stampfnwhitelist
964
965 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
966 if taskname is None:
967 taskname = self.rqdata.runq_task[task]
968
969 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
970
971 # If the stamp is missing, it's not current
972 if not os.access(stampfile, os.F_OK):
973 logger.debug(2, "Stampfile %s not available", stampfile)
974 return False
975 # If it's a 'nostamp' task, it's not current
976 taskdep = self.rqdata.dataCache.task_deps[fn]
977 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
978 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
979 return False
980
981 if taskname != "do_setscene" and taskname.endswith("_setscene"):
982 return True
983
984 if cache is None:
985 cache = {}
986
987 iscurrent = True
988 t1 = get_timestamp(stampfile)
989 for dep in self.rqdata.runq_depends[task]:
990 if iscurrent:
991 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
992 taskname2 = self.rqdata.runq_task[dep]
993 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
994 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
995 t2 = get_timestamp(stampfile2)
996 t3 = get_timestamp(stampfile3)
997 if t3 and t3 > t2:
998 continue
999 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
1000 if not t2:
1001 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
1002 iscurrent = False
1003 if t1 < t2:
1004 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
1005 iscurrent = False
1006 if recurse and iscurrent:
1007 if dep in cache:
1008 iscurrent = cache[dep]
1009 if not iscurrent:
1010 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1011 else:
1012 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1013 cache[dep] = iscurrent
1014 if recurse:
1015 cache[task] = iscurrent
1016 return iscurrent
1017
1018 def _execute_runqueue(self):
1019 """
1020 Run the tasks in a queue prepared by rqdata.prepare()
1021 Upon failure, optionally try to recover the build using any alternate providers
1022 (if the abort on failure configuration option isn't set)
1023 """
1024
1025 retval = True
1026
1027 if self.state is runQueuePrepare:
1028 self.rqexe = RunQueueExecuteDummy(self)
1029 if self.rqdata.prepare() == 0:
1030 self.state = runQueueComplete
1031 else:
1032 self.state = runQueueSceneInit
1033
1034 # we are ready to run, see if any UI client needs the dependency info
1035 if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset:
1036 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1037 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1038
1039 if self.state is runQueueSceneInit:
1040 dump = self.cooker.configuration.dump_signatures
1041 if dump:
1042 if 'printdiff' in dump:
1043 invalidtasks = self.print_diffscenetasks()
1044 self.dump_signatures(dump)
1045 if 'printdiff' in dump:
1046 self.write_diffscenetasks(invalidtasks)
1047 self.state = runQueueComplete
1048 else:
1049 self.start_worker()
1050 self.rqexe = RunQueueExecuteScenequeue(self)
1051
1052 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
1053 self.dm.check(self)
1054
1055 if self.state is runQueueSceneRun:
1056 retval = self.rqexe.execute()
1057
1058 if self.state is runQueueRunInit:
1059 logger.info("Executing RunQueue Tasks")
1060 self.rqexe = RunQueueExecuteTasks(self)
1061 self.state = runQueueRunning
1062
1063 if self.state is runQueueRunning:
1064 retval = self.rqexe.execute()
1065
1066 if self.state is runQueueCleanUp:
1067 retval = self.rqexe.finish()
1068
1069 if (self.state is runQueueComplete or self.state is runQueueFailed) and self.rqexe:
1070 self.teardown_workers()
1071 if self.rqexe.stats.failed:
1072 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1073 else:
1074 # Let's avoid the word "failed" if nothing actually did
1075 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1076
1077 if self.state is runQueueFailed:
1078 if not self.rqdata.taskData.tryaltconfigs:
1079 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
1080 for fnid in self.rqexe.failed_fnids:
1081 self.rqdata.taskData.fail_fnid(fnid)
1082 self.rqdata.reset()
1083
1084 if self.state is runQueueComplete:
1085 # All done
1086 return False
1087
1088 # Loop
1089 return retval
1090
1091 def execute_runqueue(self):
1092 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1093 try:
1094 return self._execute_runqueue()
1095 except bb.runqueue.TaskFailure:
1096 raise
1097 except SystemExit:
1098 raise
1099 except:
1100 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
1101 try:
1102 self.teardown_workers()
1103 except:
1104 pass
1105 self.state = runQueueComplete
1106 raise
1107
1108 def finish_runqueue(self, now = False):
1109 if not self.rqexe:
1110 self.state = runQueueComplete
1111 return
1112
1113 if now:
1114 self.rqexe.finish_now()
1115 else:
1116 self.rqexe.finish()
1117
1118 def dump_signatures(self, options):
1119 done = set()
1120 bb.note("Reparsing files to collect dependency data")
1121 for task in range(len(self.rqdata.runq_fnid)):
1122 if self.rqdata.runq_fnid[task] not in done:
1123 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1124 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1125 done.add(self.rqdata.runq_fnid[task])
1126
1127 bb.parse.siggen.dump_sigs(self.rqdata.dataCache, options)
1128
1129 return
1130
1131 def print_diffscenetasks(self):
1132
1133 valid = []
1134 sq_hash = []
1135 sq_hashfn = []
1136 sq_fn = []
1137 sq_taskname = []
1138 sq_task = []
1139 noexec = []
1140 stamppresent = []
1141 valid_new = set()
1142
1143 for task in xrange(len(self.rqdata.runq_fnid)):
1144 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1145 taskname = self.rqdata.runq_task[task]
1146 taskdep = self.rqdata.dataCache.task_deps[fn]
1147
1148 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1149 noexec.append(task)
1150 continue
1151
1152 sq_fn.append(fn)
1153 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1154 sq_hash.append(self.rqdata.runq_hash[task])
1155 sq_taskname.append(taskname)
1156 sq_task.append(task)
1157 call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1158 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
1159 valid = bb.utils.better_eval(call, locs)
1160 for v in valid:
1161 valid_new.add(sq_task[v])
1162
1163 # Tasks which are both setscene and noexec never care about dependencies
1164 # We therefore find tasks which are setscene and noexec and mark their
1165 # unique dependencies as valid.
1166 for task in noexec:
1167 if task not in self.rqdata.runq_setscene:
1168 continue
1169 for dep in self.rqdata.runq_depends[task]:
1170 hasnoexecparents = True
1171 for dep2 in self.rqdata.runq_revdeps[dep]:
1172 if dep2 in self.rqdata.runq_setscene and dep2 in noexec:
1173 continue
1174 hasnoexecparents = False
1175 break
1176 if hasnoexecparents:
1177 valid_new.add(dep)
1178
1179 invalidtasks = set()
1180 for task in xrange(len(self.rqdata.runq_fnid)):
1181 if task not in valid_new and task not in noexec:
1182 invalidtasks.add(task)
1183
1184 found = set()
1185 processed = set()
1186 for task in invalidtasks:
1187 toprocess = set([task])
1188 while toprocess:
1189 next = set()
1190 for t in toprocess:
1191 for dep in self.rqdata.runq_depends[t]:
1192 if dep in invalidtasks:
1193 found.add(task)
1194 if dep not in processed:
1195 processed.add(dep)
1196 next.add(dep)
1197 toprocess = next
1198 if task in found:
1199 toprocess = set()
1200
1201 tasklist = []
1202 for task in invalidtasks.difference(found):
1203 tasklist.append(self.rqdata.get_user_idstring(task))
1204
1205 if tasklist:
1206 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1207
1208 return invalidtasks.difference(found)
1209
1210 def write_diffscenetasks(self, invalidtasks):
1211
1212 # Define recursion callback
1213 def recursecb(key, hash1, hash2):
1214 hashes = [hash1, hash2]
1215 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1216
1217 recout = []
1218 if len(hashfiles) == 2:
1219 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1220 recout.extend(list(' ' + l for l in out2))
1221 else:
1222 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1223
1224 return recout
1225
1226
1227 for task in invalidtasks:
1228 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1229 pn = self.rqdata.dataCache.pkg_fn[fn]
1230 taskname = self.rqdata.runq_task[task]
1231 h = self.rqdata.runq_hash[task]
1232 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1233 match = None
1234 for m in matches:
1235 if h in m:
1236 match = m
1237 if match is None:
1238 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1239 matches = {k : v for k, v in matches.iteritems() if h not in k}
1240 if matches:
1241 latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1242 prevh = __find_md5__.search(latestmatch).group(0)
1243 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1244 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output))
1245
1246class RunQueueExecute:
1247
1248 def __init__(self, rq):
1249 self.rq = rq
1250 self.cooker = rq.cooker
1251 self.cfgData = rq.cfgData
1252 self.rqdata = rq.rqdata
1253
1254 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
1255 self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
1256
1257 self.runq_buildable = []
1258 self.runq_running = []
1259 self.runq_complete = []
1260
1261 self.build_stamps = {}
1262 self.build_stamps2 = []
1263 self.failed_fnids = []
1264
1265 self.stampcache = {}
1266
1267 rq.workerpipe.setrunqueueexec(self)
1268 if rq.fakeworkerpipe:
1269 rq.fakeworkerpipe.setrunqueueexec(self)
1270
1271 def runqueue_process_waitpid(self, task, status):
1272
1273 # self.build_stamps[pid] may not exist when use shared work directory.
1274 if task in self.build_stamps:
1275 self.build_stamps2.remove(self.build_stamps[task])
1276 del self.build_stamps[task]
1277
1278 if status != 0:
1279 self.task_fail(task, status)
1280 else:
1281 self.task_complete(task)
1282 return True
1283
1284 def finish_now(self):
1285
1286 for worker in [self.rq.worker, self.rq.fakeworker]:
1287 if not worker:
1288 continue
1289 try:
1290 worker.stdin.write("<finishnow></finishnow>")
1291 worker.stdin.flush()
1292 except IOError:
1293 # worker must have died?
1294 pass
1295
1296 if len(self.failed_fnids) != 0:
1297 self.rq.state = runQueueFailed
1298 return
1299
1300 self.rq.state = runQueueComplete
1301 return
1302
1303 def finish(self):
1304 self.rq.state = runQueueCleanUp
1305
1306 if self.stats.active > 0:
1307 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1308 self.rq.read_workers()
1309 return self.rq.active_fds()
1310
1311 if len(self.failed_fnids) != 0:
1312 self.rq.state = runQueueFailed
1313 return True
1314
1315 self.rq.state = runQueueComplete
1316 return True
1317
1318 def check_dependencies(self, task, taskdeps, setscene = False):
1319 if not self.rq.depvalidate:
1320 return False
1321
1322 taskdata = {}
1323 taskdeps.add(task)
1324 for dep in taskdeps:
1325 if setscene:
1326 depid = self.rqdata.runq_setscene[dep]
1327 else:
1328 depid = dep
1329 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
1330 pn = self.rqdata.dataCache.pkg_fn[fn]
1331 taskname = self.rqdata.runq_task[depid]
1332 taskdata[dep] = [pn, taskname, fn]
1333 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1334 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1335 valid = bb.utils.better_eval(call, locs)
1336 return valid
1337
1338class RunQueueExecuteDummy(RunQueueExecute):
1339 def __init__(self, rq):
1340 self.rq = rq
1341 self.stats = RunQueueStats(0)
1342
1343 def finish(self):
1344 self.rq.state = runQueueComplete
1345 return
1346
1347class RunQueueExecuteTasks(RunQueueExecute):
1348 def __init__(self, rq):
1349 RunQueueExecute.__init__(self, rq)
1350
1351 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1352
1353 self.stampcache = {}
1354
1355 initial_covered = self.rq.scenequeue_covered.copy()
1356
1357 # Mark initial buildable tasks
1358 for task in xrange(self.stats.total):
1359 self.runq_running.append(0)
1360 self.runq_complete.append(0)
1361 if len(self.rqdata.runq_depends[task]) == 0:
1362 self.runq_buildable.append(1)
1363 else:
1364 self.runq_buildable.append(0)
1365 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1366 self.rq.scenequeue_covered.add(task)
1367
1368 found = True
1369 while found:
1370 found = False
1371 for task in xrange(self.stats.total):
1372 if task in self.rq.scenequeue_covered:
1373 continue
1374 logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
1375
1376 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1377 found = True
1378 self.rq.scenequeue_covered.add(task)
1379
1380 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1381
1382 # Allow the metadata to elect for setscene tasks to run anyway
1383 covered_remove = set()
1384 if self.rq.setsceneverify:
1385 invalidtasks = []
1386 for task in xrange(len(self.rqdata.runq_task)):
1387 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1388 taskname = self.rqdata.runq_task[task]
1389 taskdep = self.rqdata.dataCache.task_deps[fn]
1390
1391 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1392 continue
1393 if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
1394 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1395 continue
1396 if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
1397 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1398 continue
1399 invalidtasks.append(task)
1400
1401 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
1402 call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1403 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.data, "invalidtasks" : invalidtasks }
1404 # Backwards compatibility with older versions without invalidtasks
1405 try:
1406 covered_remove = bb.utils.better_eval(call, locs)
1407 except TypeError:
1408 covered_remove = bb.utils.better_eval(call2, locs)
1409
1410 def removecoveredtask(task):
1411 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1412 taskname = self.rqdata.runq_task[task] + '_setscene'
1413 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1414 self.rq.scenequeue_covered.remove(task)
1415
1416 toremove = covered_remove
1417 for task in toremove:
1418 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1419 while toremove:
1420 covered_remove = []
1421 for task in toremove:
1422 removecoveredtask(task)
1423 for deptask in self.rqdata.runq_depends[task]:
1424 if deptask not in self.rq.scenequeue_covered:
1425 continue
1426 if deptask in toremove or deptask in covered_remove or deptask in initial_covered:
1427 continue
1428 logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask))
1429 covered_remove.append(deptask)
1430 toremove = covered_remove
1431
1432 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1433
1434 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1435
1436 schedulers = self.get_schedulers()
1437 for scheduler in schedulers:
1438 if self.scheduler == scheduler.name:
1439 self.sched = scheduler(self, self.rqdata)
1440 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1441 break
1442 else:
1443 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1444 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1445
1446 def get_schedulers(self):
1447 schedulers = set(obj for obj in globals().values()
1448 if type(obj) is type and
1449 issubclass(obj, RunQueueScheduler))
1450
1451 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
1452 if user_schedulers:
1453 for sched in user_schedulers.split():
1454 if not "." in sched:
1455 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1456 continue
1457
1458 modname, name = sched.rsplit(".", 1)
1459 try:
1460 module = __import__(modname, fromlist=(name,))
1461 except ImportError as exc:
1462 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1463 raise SystemExit(1)
1464 else:
1465 schedulers.add(getattr(module, name))
1466 return schedulers
1467
1468 def setbuildable(self, task):
1469 self.runq_buildable[task] = 1
1470 self.sched.newbuilable(task)
1471
1472 def task_completeoutright(self, task):
1473 """
1474 Mark a task as completed
1475 Look at the reverse dependencies and mark any task with
1476 completed dependencies as buildable
1477 """
1478 self.runq_complete[task] = 1
1479 for revdep in self.rqdata.runq_revdeps[task]:
1480 if self.runq_running[revdep] == 1:
1481 continue
1482 if self.runq_buildable[revdep] == 1:
1483 continue
1484 alldeps = 1
1485 for dep in self.rqdata.runq_depends[revdep]:
1486 if self.runq_complete[dep] != 1:
1487 alldeps = 0
1488 if alldeps == 1:
1489 self.setbuildable(revdep)
1490 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1491 taskname = self.rqdata.runq_task[revdep]
1492 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1493
1494 def task_complete(self, task):
1495 self.stats.taskCompleted()
1496 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1497 self.task_completeoutright(task)
1498
1499 def task_fail(self, task, exitcode):
1500 """
1501 Called when a task has failed
1502 Updates the state engine with the failure
1503 """
1504 self.stats.taskFailed()
1505 fnid = self.rqdata.runq_fnid[task]
1506 self.failed_fnids.append(fnid)
1507 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1508 if self.rqdata.taskData.abort:
1509 self.rq.state = runQueueCleanUp
1510
1511 def task_skip(self, task, reason):
1512 self.runq_running[task] = 1
1513 self.setbuildable(task)
1514 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1515 self.task_completeoutright(task)
1516 self.stats.taskCompleted()
1517 self.stats.taskSkipped()
1518
1519 def execute(self):
1520 """
1521 Run the tasks in a queue prepared by rqdata.prepare()
1522 """
1523
1524 self.rq.read_workers()
1525
1526
1527 if self.stats.total == 0:
1528 # nothing to do
1529 self.rq.state = runQueueCleanUp
1530
1531 task = self.sched.next()
1532 if task is not None:
1533 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1534 taskname = self.rqdata.runq_task[task]
1535
1536 if task in self.rq.scenequeue_covered:
1537 logger.debug(2, "Setscene covered task %s (%s)", task,
1538 self.rqdata.get_user_idstring(task))
1539 self.task_skip(task, "covered")
1540 return True
1541
1542 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
1543 logger.debug(2, "Stamp current task %s (%s)", task,
1544 self.rqdata.get_user_idstring(task))
1545 self.task_skip(task, "existing")
1546 return True
1547
1548 taskdep = self.rqdata.dataCache.task_deps[fn]
1549 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1550 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1551 noexec=True)
1552 bb.event.fire(startevent, self.cfgData)
1553 self.runq_running[task] = 1
1554 self.stats.taskActive()
1555 if not self.cooker.configuration.dry_run:
1556 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1557 self.task_complete(task)
1558 return True
1559 else:
1560 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1561 bb.event.fire(startevent, self.cfgData)
1562
1563 taskdepdata = self.build_taskdepdata(task)
1564
1565 taskdep = self.rqdata.dataCache.task_deps[fn]
1566 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
1567 if not self.rq.fakeworker:
1568 self.rq.start_fakeworker(self)
1569 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1570 self.rq.fakeworker.stdin.flush()
1571 else:
1572 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>")
1573 self.rq.worker.stdin.flush()
1574
1575 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1576 self.build_stamps2.append(self.build_stamps[task])
1577 self.runq_running[task] = 1
1578 self.stats.taskActive()
1579 if self.stats.active < self.number_tasks:
1580 return True
1581
1582 if self.stats.active > 0:
1583 self.rq.read_workers()
1584 return self.rq.active_fds()
1585
1586 if len(self.failed_fnids) != 0:
1587 self.rq.state = runQueueFailed
1588 return True
1589
1590 # Sanity Checks
1591 for task in xrange(self.stats.total):
1592 if self.runq_buildable[task] == 0:
1593 logger.error("Task %s never buildable!", task)
1594 if self.runq_running[task] == 0:
1595 logger.error("Task %s never ran!", task)
1596 if self.runq_complete[task] == 0:
1597 logger.error("Task %s never completed!", task)
1598 self.rq.state = runQueueComplete
1599
1600 return True
1601
1602 def build_taskdepdata(self, task):
1603 taskdepdata = {}
1604 next = self.rqdata.runq_depends[task]
1605 next.add(task)
1606 while next:
1607 additional = []
1608 for revdep in next:
1609 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1610 pn = self.rqdata.dataCache.pkg_fn[fn]
1611 taskname = self.rqdata.runq_task[revdep]
1612 deps = self.rqdata.runq_depends[revdep]
1613 taskdepdata[revdep] = [pn, taskname, fn, deps]
1614 for revdep2 in deps:
1615 if revdep2 not in taskdepdata:
1616 additional.append(revdep2)
1617 next = additional
1618
1619 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
1620 return taskdepdata
1621
1622class RunQueueExecuteScenequeue(RunQueueExecute):
1623 def __init__(self, rq):
1624 RunQueueExecute.__init__(self, rq)
1625
1626 self.scenequeue_covered = set()
1627 self.scenequeue_notcovered = set()
1628 self.scenequeue_notneeded = set()
1629
1630 # If we don't have any setscene functions, skip this step
1631 if len(self.rqdata.runq_setscene) == 0:
1632 rq.scenequeue_covered = set()
1633 rq.state = runQueueRunInit
1634 return
1635
1636 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1637
1638 sq_revdeps = []
1639 sq_revdeps_new = []
1640 sq_revdeps_squash = []
1641 self.sq_harddeps = {}
1642
1643 # We need to construct a dependency graph for the setscene functions. Intermediate
1644 # dependencies between the setscene tasks only complicate the code. This code
1645 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1646 # only containing the setscene functions.
1647
1648 for task in xrange(self.stats.total):
1649 self.runq_running.append(0)
1650 self.runq_complete.append(0)
1651 self.runq_buildable.append(0)
1652
1653 # First process the chains up to the first setscene task.
1654 endpoints = {}
1655 for task in xrange(len(self.rqdata.runq_fnid)):
1656 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1657 sq_revdeps_new.append(set())
1658 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1659 endpoints[task] = set()
1660
1661 # Secondly process the chains between setscene tasks.
1662 for task in self.rqdata.runq_setscene:
1663 for dep in self.rqdata.runq_depends[task]:
1664 if dep not in endpoints:
1665 endpoints[dep] = set()
1666 endpoints[dep].add(task)
1667
1668 def process_endpoints(endpoints):
1669 newendpoints = {}
1670 for point, task in endpoints.items():
1671 tasks = set()
1672 if task:
1673 tasks |= task
1674 if sq_revdeps_new[point]:
1675 tasks |= sq_revdeps_new[point]
1676 sq_revdeps_new[point] = set()
1677 if point in self.rqdata.runq_setscene:
1678 sq_revdeps_new[point] = tasks
1679 for dep in self.rqdata.runq_depends[point]:
1680 if point in sq_revdeps[dep]:
1681 sq_revdeps[dep].remove(point)
1682 if tasks:
1683 sq_revdeps_new[dep] |= tasks
1684 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1685 newendpoints[dep] = task
1686 if len(newendpoints) != 0:
1687 process_endpoints(newendpoints)
1688
1689 process_endpoints(endpoints)
1690
1691 # Build a list of setscene tasks which are "unskippable"
1692 # These are direct endpoints referenced by the build
1693 endpoints2 = {}
1694 sq_revdeps2 = []
1695 sq_revdeps_new2 = []
1696 def process_endpoints2(endpoints):
1697 newendpoints = {}
1698 for point, task in endpoints.items():
1699 tasks = set([point])
1700 if task:
1701 tasks |= task
1702 if sq_revdeps_new2[point]:
1703 tasks |= sq_revdeps_new2[point]
1704 sq_revdeps_new2[point] = set()
1705 if point in self.rqdata.runq_setscene:
1706 sq_revdeps_new2[point] = tasks
1707 for dep in self.rqdata.runq_depends[point]:
1708 if point in sq_revdeps2[dep]:
1709 sq_revdeps2[dep].remove(point)
1710 if tasks:
1711 sq_revdeps_new2[dep] |= tasks
1712 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1713 newendpoints[dep] = tasks
1714 if len(newendpoints) != 0:
1715 process_endpoints2(newendpoints)
1716 for task in xrange(len(self.rqdata.runq_fnid)):
1717 sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
1718 sq_revdeps_new2.append(set())
1719 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1720 endpoints2[task] = set()
1721 process_endpoints2(endpoints2)
1722 self.unskippable = []
1723 for task in self.rqdata.runq_setscene:
1724 if sq_revdeps_new2[task]:
1725 self.unskippable.append(self.rqdata.runq_setscene.index(task))
1726
1727 for task in xrange(len(self.rqdata.runq_fnid)):
1728 if task in self.rqdata.runq_setscene:
1729 deps = set()
1730 for dep in sq_revdeps_new[task]:
1731 deps.add(self.rqdata.runq_setscene.index(dep))
1732 sq_revdeps_squash.append(deps)
1733 elif len(sq_revdeps_new[task]) != 0:
1734 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1735
1736 # Resolve setscene inter-task dependencies
1737 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
1738 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
1739 for task in self.rqdata.runq_setscene:
1740 realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
1741 idepends = self.rqdata.taskData.tasks_idepends[realid]
1742 for (depid, idependtask) in idepends:
1743 if depid not in self.rqdata.taskData.build_targets:
1744 continue
1745
1746 depdata = self.rqdata.taskData.build_targets[depid][0]
1747 if depdata is None:
1748 continue
1749 dep = self.rqdata.taskData.fn_index[depdata]
1750 taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
1751 if taskid is None:
1752 bb.msg.fatal("RunQueue", "Task %s_setscene depends upon non-existent task %s:%s" % (self.rqdata.get_user_idstring(task), dep, idependtask))
1753
1754 if not self.rqdata.runq_setscene.index(taskid) in self.sq_harddeps:
1755 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)] = set()
1756 self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)].add(self.rqdata.runq_setscene.index(task))
1757
1758 sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
1759 # Have to zero this to avoid circular dependencies
1760 sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
1761
1762 for task in self.sq_harddeps:
1763 for dep in self.sq_harddeps[task]:
1764 sq_revdeps_squash[dep].add(task)
1765
1766 #for task in xrange(len(sq_revdeps_squash)):
1767 # realtask = self.rqdata.runq_setscene[task]
1768 # bb.warn("Task %s: %s_setscene is %s " % (task, self.rqdata.get_user_idstring(realtask) , sq_revdeps_squash[task]))
1769
1770 self.sq_deps = []
1771 self.sq_revdeps = sq_revdeps_squash
1772 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1773
1774 for task in xrange(len(self.sq_revdeps)):
1775 self.sq_deps.append(set())
1776 for task in xrange(len(self.sq_revdeps)):
1777 for dep in self.sq_revdeps[task]:
1778 self.sq_deps[dep].add(task)
1779
1780 for task in xrange(len(self.sq_revdeps)):
1781 if len(self.sq_revdeps[task]) == 0:
1782 self.runq_buildable[task] = 1
1783
1784 self.outrightfail = []
1785 if self.rq.hashvalidate:
1786 sq_hash = []
1787 sq_hashfn = []
1788 sq_fn = []
1789 sq_taskname = []
1790 sq_task = []
1791 noexec = []
1792 stamppresent = []
1793 for task in xrange(len(self.sq_revdeps)):
1794 realtask = self.rqdata.runq_setscene[task]
1795 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1796 taskname = self.rqdata.runq_task[realtask]
1797 taskdep = self.rqdata.dataCache.task_deps[fn]
1798
1799 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1800 noexec.append(task)
1801 self.task_skip(task)
1802 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1803 continue
1804
1805 if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
1806 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1807 stamppresent.append(task)
1808 self.task_skip(task)
1809 continue
1810
1811 if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
1812 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1813 stamppresent.append(task)
1814 self.task_skip(task)
1815 continue
1816
1817 sq_fn.append(fn)
1818 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1819 sq_hash.append(self.rqdata.runq_hash[realtask])
1820 sq_taskname.append(taskname)
1821 sq_task.append(task)
1822 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1823 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
1824 valid = bb.utils.better_eval(call, locs)
1825
1826 valid_new = stamppresent
1827 for v in valid:
1828 valid_new.append(sq_task[v])
1829
1830 for task in xrange(len(self.sq_revdeps)):
1831 if task not in valid_new and task not in noexec:
1832 realtask = self.rqdata.runq_setscene[task]
1833 logger.debug(2, 'No package found, so skipping setscene task %s',
1834 self.rqdata.get_user_idstring(realtask))
1835 self.outrightfail.append(task)
1836
1837 logger.info('Executing SetScene Tasks')
1838
1839 self.rq.state = runQueueSceneRun
1840
1841 def scenequeue_updatecounters(self, task, fail = False):
1842 for dep in self.sq_deps[task]:
1843 if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]:
1844 realtask = self.rqdata.runq_setscene[task]
1845 realdep = self.rqdata.runq_setscene[dep]
1846 logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (self.rqdata.get_user_idstring(realtask), self.rqdata.get_user_idstring(realdep)))
1847 self.scenequeue_updatecounters(dep, fail)
1848 continue
1849 if task not in self.sq_revdeps2[dep]:
1850 # May already have been removed by the fail case above
1851 continue
1852 self.sq_revdeps2[dep].remove(task)
1853 if len(self.sq_revdeps2[dep]) == 0:
1854 self.runq_buildable[dep] = 1
1855
1856 def task_completeoutright(self, task):
1857 """
1858 Mark a task as completed
1859 Look at the reverse dependencies and mark any task with
1860 completed dependencies as buildable
1861 """
1862
1863 index = self.rqdata.runq_setscene[task]
1864 logger.debug(1, 'Found task %s which could be accelerated',
1865 self.rqdata.get_user_idstring(index))
1866
1867 self.scenequeue_covered.add(task)
1868 self.scenequeue_updatecounters(task)
1869
1870 def task_complete(self, task):
1871 self.stats.taskCompleted()
1872 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1873 self.task_completeoutright(task)
1874
1875 def task_fail(self, task, result):
1876 self.stats.taskFailed()
1877 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1878 self.scenequeue_notcovered.add(task)
1879 self.scenequeue_updatecounters(task, True)
1880
1881 def task_failoutright(self, task):
1882 self.runq_running[task] = 1
1883 self.runq_buildable[task] = 1
1884 self.stats.taskCompleted()
1885 self.stats.taskSkipped()
1886 index = self.rqdata.runq_setscene[task]
1887 self.scenequeue_notcovered.add(task)
1888 self.scenequeue_updatecounters(task, True)
1889
1890 def task_skip(self, task):
1891 self.runq_running[task] = 1
1892 self.runq_buildable[task] = 1
1893 self.task_completeoutright(task)
1894 self.stats.taskCompleted()
1895 self.stats.taskSkipped()
1896
1897 def execute(self):
1898 """
1899 Run the tasks in a queue prepared by prepare_runqueue
1900 """
1901
1902 self.rq.read_workers()
1903
1904 task = None
1905 if self.stats.active < self.number_tasks:
1906 # Find the next setscene to run
1907 for nexttask in xrange(self.stats.total):
1908 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1909 if nexttask in self.unskippable:
1910 logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1911 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
1912 realtask = self.rqdata.runq_setscene[nexttask]
1913 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1914 foundtarget = False
1915 for target in self.rqdata.target_pairs:
1916 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1917 foundtarget = True
1918 break
1919 if not foundtarget:
1920 logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1921 self.task_skip(nexttask)
1922 self.scenequeue_notneeded.add(nexttask)
1923 return True
1924 if nexttask in self.outrightfail:
1925 self.task_failoutright(nexttask)
1926 return True
1927 task = nexttask
1928 break
1929 if task is not None:
1930 realtask = self.rqdata.runq_setscene[task]
1931 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1932
1933 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1934 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
1935 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1936 task, self.rqdata.get_user_idstring(realtask))
1937 self.task_failoutright(task)
1938 return True
1939
1940 if self.cooker.configuration.force:
1941 for target in self.rqdata.target_pairs:
1942 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1943 self.task_failoutright(task)
1944 return True
1945
1946 if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
1947 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1948 task, self.rqdata.get_user_idstring(realtask))
1949 self.task_skip(task)
1950 return True
1951
1952 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1953 bb.event.fire(startevent, self.cfgData)
1954
1955 taskdep = self.rqdata.dataCache.task_deps[fn]
1956 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1957 if not self.rq.fakeworker:
1958 self.rq.start_fakeworker(self)
1959 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1960 self.rq.fakeworker.stdin.flush()
1961 else:
1962 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>")
1963 self.rq.worker.stdin.flush()
1964
1965 self.runq_running[task] = 1
1966 self.stats.taskActive()
1967 if self.stats.active < self.number_tasks:
1968 return True
1969
1970 if self.stats.active > 0:
1971 self.rq.read_workers()
1972 return self.rq.active_fds()
1973
1974 #for task in xrange(self.stats.total):
1975 # if self.runq_running[task] != 1:
1976 # buildable = self.runq_buildable[task]
1977 # revdeps = self.sq_revdeps[task]
1978 # bb.warn("Found we didn't run %s %s %s %s" % (task, buildable, str(revdeps), self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task])))
1979
1980 # Convert scenequeue_covered task numbers into full taskgraph ids
1981 oldcovered = self.scenequeue_covered
1982 self.rq.scenequeue_covered = set()
1983 for task in oldcovered:
1984 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1985 self.rq.scenequeue_notcovered = set()
1986 for task in self.scenequeue_notcovered:
1987 self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task])
1988
1989 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1990
1991 self.rq.state = runQueueRunInit
1992
1993 completeevent = sceneQueueComplete(self.stats, self.rq)
1994 bb.event.fire(completeevent, self.cfgData)
1995
1996 return True
1997
1998 def runqueue_process_waitpid(self, task, status):
1999 task = self.rq.rqdata.runq_setscene.index(task)
2000
2001 RunQueueExecute.runqueue_process_waitpid(self, task, status)
2002
2003class TaskFailure(Exception):
2004 """
2005 Exception raised when a task in a runqueue fails
2006 """
2007 def __init__(self, x):
2008 self.args = x
2009
2010
2011class runQueueExitWait(bb.event.Event):
2012 """
2013 Event when waiting for task processes to exit
2014 """
2015
2016 def __init__(self, remain):
2017 self.remain = remain
2018 self.message = "Waiting for %s active tasks to finish" % remain
2019 bb.event.Event.__init__(self)
2020
2021class runQueueEvent(bb.event.Event):
2022 """
2023 Base runQueue event class
2024 """
2025 def __init__(self, task, stats, rq):
2026 self.taskid = task
2027 self.taskstring = rq.rqdata.get_user_idstring(task)
2028 self.taskname = rq.rqdata.get_task_name(task)
2029 self.taskfile = rq.rqdata.get_task_file(task)
2030 self.taskhash = rq.rqdata.get_task_hash(task)
2031 self.stats = stats.copy()
2032 bb.event.Event.__init__(self)
2033
2034class sceneQueueEvent(runQueueEvent):
2035 """
2036 Base sceneQueue event class
2037 """
2038 def __init__(self, task, stats, rq, noexec=False):
2039 runQueueEvent.__init__(self, task, stats, rq)
2040 realtask = rq.rqdata.runq_setscene[task]
2041 self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
2042 self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene"
2043 self.taskfile = rq.rqdata.get_task_file(realtask)
2044 self.taskhash = rq.rqdata.get_task_hash(realtask)
2045
2046class runQueueTaskStarted(runQueueEvent):
2047 """
2048 Event notifying a task was started
2049 """
2050 def __init__(self, task, stats, rq, noexec=False):
2051 runQueueEvent.__init__(self, task, stats, rq)
2052 self.noexec = noexec
2053
2054class sceneQueueTaskStarted(sceneQueueEvent):
2055 """
2056 Event notifying a setscene task was started
2057 """
2058 def __init__(self, task, stats, rq, noexec=False):
2059 sceneQueueEvent.__init__(self, task, stats, rq)
2060 self.noexec = noexec
2061
2062class runQueueTaskFailed(runQueueEvent):
2063 """
2064 Event notifying a task failed
2065 """
2066 def __init__(self, task, stats, exitcode, rq):
2067 runQueueEvent.__init__(self, task, stats, rq)
2068 self.exitcode = exitcode
2069
2070class sceneQueueTaskFailed(sceneQueueEvent):
2071 """
2072 Event notifying a setscene task failed
2073 """
2074 def __init__(self, task, stats, exitcode, rq):
2075 sceneQueueEvent.__init__(self, task, stats, rq)
2076 self.exitcode = exitcode
2077
2078class sceneQueueComplete(sceneQueueEvent):
2079 """
2080 Event when all the sceneQueue tasks are complete
2081 """
2082 def __init__(self, stats, rq):
2083 self.stats = stats.copy()
2084 bb.event.Event.__init__(self)
2085
2086class runQueueTaskCompleted(runQueueEvent):
2087 """
2088 Event notifying a task completed
2089 """
2090
2091class sceneQueueTaskCompleted(sceneQueueEvent):
2092 """
2093 Event notifying a setscene task completed
2094 """
2095
2096class runQueueTaskSkipped(runQueueEvent):
2097 """
2098 Event notifying a task was skipped
2099 """
2100 def __init__(self, task, stats, rq, reason):
2101 runQueueEvent.__init__(self, task, stats, rq)
2102 self.reason = reason
2103
2104class runQueuePipe():
2105 """
2106 Abstraction for a pipe between a worker thread and the server
2107 """
2108 def __init__(self, pipein, pipeout, d, rq, rqexec):
2109 self.input = pipein
2110 if pipeout:
2111 pipeout.close()
2112 bb.utils.nonblockingfd(self.input)
2113 self.queue = ""
2114 self.d = d
2115 self.rq = rq
2116 self.rqexec = rqexec
2117
2118 def setrunqueueexec(self, rqexec):
2119 self.rqexec = rqexec
2120
2121 def read(self):
2122 for w in [self.rq.worker, self.rq.fakeworker]:
2123 if not w:
2124 continue
2125 w.poll()
2126 if w.returncode is not None and not self.rq.teardown:
2127 name = None
2128 if self.rq.worker and w.pid == self.rq.worker.pid:
2129 name = "Worker"
2130 elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid:
2131 name = "Fakeroot"
2132 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode)))
2133 self.rq.finish_runqueue(True)
2134
2135 start = len(self.queue)
2136 try:
2137 self.queue = self.queue + self.input.read(102400)
2138 except (OSError, IOError) as e:
2139 if e.errno != errno.EAGAIN:
2140 raise
2141 end = len(self.queue)
2142 found = True
2143 while found and len(self.queue):
2144 found = False
2145 index = self.queue.find("</event>")
2146 while index != -1 and self.queue.startswith("<event>"):
2147 try:
2148 event = pickle.loads(self.queue[7:index])
2149 except ValueError as e:
2150 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2151 bb.event.fire_from_worker(event, self.d)
2152 found = True
2153 self.queue = self.queue[index+8:]
2154 index = self.queue.find("</event>")
2155 index = self.queue.find("</exitcode>")
2156 while index != -1 and self.queue.startswith("<exitcode>"):
2157 try:
2158 task, status = pickle.loads(self.queue[10:index])
2159 except ValueError as e:
2160 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
2161 self.rqexec.runqueue_process_waitpid(task, status)
2162 found = True
2163 self.queue = self.queue[index+11:]
2164 index = self.queue.find("</exitcode>")
2165 return (end > start)
2166
2167 def close(self):
2168 while self.read():
2169 continue
2170 if len(self.queue) > 0:
2171 print("Warning, worker left partial message: %s" % self.queue)
2172 self.input.close()