summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py1914
1 files changed, 1914 insertions, 0 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
new file mode 100644
index 0000000000..c09cfd4b2c
--- /dev/null
+++ b/bitbake/lib/bb/runqueue.py
@@ -0,0 +1,1914 @@
1#!/usr/bin/env python
2# ex:ts=4:sw=4:sts=4:et
3# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
4"""
5BitBake 'RunQueue' implementation
6
7Handles preparation and execution of a queue of tasks
8"""
9
10# Copyright (C) 2006-2007 Richard Purdie
11#
12# This program is free software; you can redistribute it and/or modify
13# it under the terms of the GNU General Public License version 2 as
14# published by the Free Software Foundation.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License along
22# with this program; if not, write to the Free Software Foundation, Inc.,
23# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24
25import copy
26import os
27import sys
28import signal
29import stat
30import fcntl
31import errno
32import logging
33import bb
34from bb import msg, data, event
35from bb import monitordisk
36import subprocess
37
38try:
39 import cPickle as pickle
40except ImportError:
41 import pickle
42
43bblogger = logging.getLogger("BitBake")
44logger = logging.getLogger("BitBake.RunQueue")
45
46class RunQueueStats:
47 """
48 Holds statistics on the tasks handled by the associated runQueue
49 """
50 def __init__(self, total):
51 self.completed = 0
52 self.skipped = 0
53 self.failed = 0
54 self.active = 0
55 self.total = total
56
57 def copy(self):
58 obj = self.__class__(self.total)
59 obj.__dict__.update(self.__dict__)
60 return obj
61
62 def taskFailed(self):
63 self.active = self.active - 1
64 self.failed = self.failed + 1
65
66 def taskCompleted(self, number = 1):
67 self.active = self.active - number
68 self.completed = self.completed + number
69
70 def taskSkipped(self, number = 1):
71 self.active = self.active + number
72 self.skipped = self.skipped + number
73
74 def taskActive(self):
75 self.active = self.active + 1
76
77# These values indicate the next step due to be run in the
78# runQueue state machine
79runQueuePrepare = 2
80runQueueSceneInit = 3
81runQueueSceneRun = 4
82runQueueRunInit = 5
83runQueueRunning = 6
84runQueueFailed = 7
85runQueueCleanUp = 8
86runQueueComplete = 9
87
88class RunQueueScheduler(object):
89 """
90 Control the order tasks are scheduled in.
91 """
92 name = "basic"
93
94 def __init__(self, runqueue, rqdata):
95 """
96 The default scheduler just returns the first buildable task (the
97 priority map is sorted by task numer)
98 """
99 self.rq = runqueue
100 self.rqdata = rqdata
101 numTasks = len(self.rqdata.runq_fnid)
102
103 self.prio_map = []
104 self.prio_map.extend(range(numTasks))
105
106 def next_buildable_task(self):
107 """
108 Return the id of the first task we find that is buildable
109 """
110 for tasknum in xrange(len(self.rqdata.runq_fnid)):
111 taskid = self.prio_map[tasknum]
112 if self.rq.runq_running[taskid] == 1:
113 continue
114 if self.rq.runq_buildable[taskid] == 1:
115 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
116 taskname = self.rqdata.runq_task[taskid]
117 stamp = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
118 if stamp in self.rq.build_stamps.values():
119 continue
120 return taskid
121
122 def next(self):
123 """
124 Return the id of the task we should build next
125 """
126 if self.rq.stats.active < self.rq.number_tasks:
127 return self.next_buildable_task()
128
129class RunQueueSchedulerSpeed(RunQueueScheduler):
130 """
131 A scheduler optimised for speed. The priority map is sorted by task weight,
132 heavier weighted tasks (tasks needed by the most other tasks) are run first.
133 """
134 name = "speed"
135
136 def __init__(self, runqueue, rqdata):
137 """
138 The priority map is sorted by task weight.
139 """
140
141 self.rq = runqueue
142 self.rqdata = rqdata
143
144 sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
145 copyweight = copy.deepcopy(self.rqdata.runq_weight)
146 self.prio_map = []
147
148 for weight in sortweight:
149 idx = copyweight.index(weight)
150 self.prio_map.append(idx)
151 copyweight[idx] = -1
152
153 self.prio_map.reverse()
154
155class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
156 """
157 A scheduler optimised to complete .bb files are quickly as possible. The
158 priority map is sorted by task weight, but then reordered so once a given
159 .bb file starts to build, its completed as quickly as possible. This works
160 well where disk space is at a premium and classes like OE's rm_work are in
161 force.
162 """
163 name = "completion"
164
165 def __init__(self, runqueue, rqdata):
166 RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
167
168 #FIXME - whilst this groups all fnids together it does not reorder the
169 #fnid groups optimally.
170
171 basemap = copy.deepcopy(self.prio_map)
172 self.prio_map = []
173 while (len(basemap) > 0):
174 entry = basemap.pop(0)
175 self.prio_map.append(entry)
176 fnid = self.rqdata.runq_fnid[entry]
177 todel = []
178 for entry in basemap:
179 entry_fnid = self.rqdata.runq_fnid[entry]
180 if entry_fnid == fnid:
181 todel.append(basemap.index(entry))
182 self.prio_map.append(entry)
183 todel.reverse()
184 for idx in todel:
185 del basemap[idx]
186
187class RunQueueData:
188 """
189 BitBake Run Queue implementation
190 """
191 def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
192 self.cooker = cooker
193 self.dataCache = dataCache
194 self.taskData = taskData
195 self.targets = targets
196 self.rq = rq
197 self.warn_multi_bb = False
198
199 self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
200 self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
201
202 self.reset()
203
204 def reset(self):
205 self.runq_fnid = []
206 self.runq_task = []
207 self.runq_depends = []
208 self.runq_revdeps = []
209 self.runq_hash = []
210
211 def runq_depends_names(self, ids):
212 import re
213 ret = []
214 for id in self.runq_depends[ids]:
215 nam = os.path.basename(self.get_user_idstring(id))
216 nam = re.sub("_[^,]*,", ",", nam)
217 ret.extend([nam])
218 return ret
219
220 def get_task_name(self, task):
221 return self.runq_task[task]
222
223 def get_task_file(self, task):
224 return self.taskData.fn_index[self.runq_fnid[task]]
225
226 def get_task_hash(self, task):
227 return self.runq_hash[task]
228
229 def get_user_idstring(self, task, task_name_suffix = ""):
230 fn = self.taskData.fn_index[self.runq_fnid[task]]
231 taskname = self.runq_task[task] + task_name_suffix
232 return "%s, %s" % (fn, taskname)
233
234 def get_task_id(self, fnid, taskname):
235 for listid in xrange(len(self.runq_fnid)):
236 if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
237 return listid
238 return None
239
240 def circular_depchains_handler(self, tasks):
241 """
242 Some tasks aren't buildable, likely due to circular dependency issues.
243 Identify the circular dependencies and print them in a user readable format.
244 """
245 from copy import deepcopy
246
247 valid_chains = []
248 explored_deps = {}
249 msgs = []
250
251 def chain_reorder(chain):
252 """
253 Reorder a dependency chain so the lowest task id is first
254 """
255 lowest = 0
256 new_chain = []
257 for entry in xrange(len(chain)):
258 if chain[entry] < chain[lowest]:
259 lowest = entry
260 new_chain.extend(chain[lowest:])
261 new_chain.extend(chain[:lowest])
262 return new_chain
263
264 def chain_compare_equal(chain1, chain2):
265 """
266 Compare two dependency chains and see if they're the same
267 """
268 if len(chain1) != len(chain2):
269 return False
270 for index in xrange(len(chain1)):
271 if chain1[index] != chain2[index]:
272 return False
273 return True
274
275 def chain_array_contains(chain, chain_array):
276 """
277 Return True if chain_array contains chain
278 """
279 for ch in chain_array:
280 if chain_compare_equal(ch, chain):
281 return True
282 return False
283
284 def find_chains(taskid, prev_chain):
285 prev_chain.append(taskid)
286 total_deps = []
287 total_deps.extend(self.runq_revdeps[taskid])
288 for revdep in self.runq_revdeps[taskid]:
289 if revdep in prev_chain:
290 idx = prev_chain.index(revdep)
291 # To prevent duplicates, reorder the chain to start with the lowest taskid
292 # and search through an array of those we've already printed
293 chain = prev_chain[idx:]
294 new_chain = chain_reorder(chain)
295 if not chain_array_contains(new_chain, valid_chains):
296 valid_chains.append(new_chain)
297 msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
298 for dep in new_chain:
299 msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
300 msgs.append("\n")
301 if len(valid_chains) > 10:
302 msgs.append("Aborted dependency loops search after 10 matches.\n")
303 return msgs
304 continue
305 scan = False
306 if revdep not in explored_deps:
307 scan = True
308 elif revdep in explored_deps[revdep]:
309 scan = True
310 else:
311 for dep in prev_chain:
312 if dep in explored_deps[revdep]:
313 scan = True
314 if scan:
315 find_chains(revdep, copy.deepcopy(prev_chain))
316 for dep in explored_deps[revdep]:
317 if dep not in total_deps:
318 total_deps.append(dep)
319
320 explored_deps[taskid] = total_deps
321
322 for task in tasks:
323 find_chains(task, [])
324
325 return msgs
326
327 def calculate_task_weights(self, endpoints):
328 """
329 Calculate a number representing the "weight" of each task. Heavier weighted tasks
330 have more dependencies and hence should be executed sooner for maximum speed.
331
332 This function also sanity checks the task list finding tasks that are not
333 possible to execute due to circular dependencies.
334 """
335
336 numTasks = len(self.runq_fnid)
337 weight = []
338 deps_left = []
339 task_done = []
340
341 for listid in xrange(numTasks):
342 task_done.append(False)
343 weight.append(0)
344 deps_left.append(len(self.runq_revdeps[listid]))
345
346 for listid in endpoints:
347 weight[listid] = 1
348 task_done[listid] = True
349
350 while True:
351 next_points = []
352 for listid in endpoints:
353 for revdep in self.runq_depends[listid]:
354 weight[revdep] = weight[revdep] + weight[listid]
355 deps_left[revdep] = deps_left[revdep] - 1
356 if deps_left[revdep] == 0:
357 next_points.append(revdep)
358 task_done[revdep] = True
359 endpoints = next_points
360 if len(next_points) == 0:
361 break
362
363 # Circular dependency sanity check
364 problem_tasks = []
365 for task in xrange(numTasks):
366 if task_done[task] is False or deps_left[task] != 0:
367 problem_tasks.append(task)
368 logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
369 logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
370
371 if problem_tasks:
372 message = "Unbuildable tasks were found.\n"
373 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
374 message = message + "Identifying dependency loops (this may take a short while)...\n"
375 logger.error(message)
376
377 msgs = self.circular_depchains_handler(problem_tasks)
378
379 message = "\n"
380 for msg in msgs:
381 message = message + msg
382 bb.msg.fatal("RunQueue", message)
383
384 return weight
385
386 def prepare(self):
387 """
388 Turn a set of taskData into a RunQueue and compute data needed
389 to optimise the execution order.
390 """
391
392 runq_build = []
393 recursivetasks = {}
394 recursiveitasks = {}
395 recursivetasksselfref = set()
396
397 taskData = self.taskData
398
399 if len(taskData.tasks_name) == 0:
400 # Nothing to do
401 return 0
402
403 logger.info("Preparing runqueue")
404
405 # Step A - Work out a list of tasks to run
406 #
407 # Taskdata gives us a list of possible providers for every build and run
408 # target ordered by priority. It also gives information on each of those
409 # providers.
410 #
411 # To create the actual list of tasks to execute we fix the list of
412 # providers and then resolve the dependencies into task IDs. This
413 # process is repeated for each type of dependency (tdepends, deptask,
414 # rdeptast, recrdeptask, idepends).
415
416 def add_build_dependencies(depids, tasknames, depends):
417 for depid in depids:
418 # Won't be in build_targets if ASSUME_PROVIDED
419 if depid not in taskData.build_targets:
420 continue
421 depdata = taskData.build_targets[depid][0]
422 if depdata is None:
423 continue
424 for taskname in tasknames:
425 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
426 if taskid is not None:
427 depends.add(taskid)
428
429 def add_runtime_dependencies(depids, tasknames, depends):
430 for depid in depids:
431 if depid not in taskData.run_targets:
432 continue
433 depdata = taskData.run_targets[depid][0]
434 if depdata is None:
435 continue
436 for taskname in tasknames:
437 taskid = taskData.gettask_id_fromfnid(depdata, taskname)
438 if taskid is not None:
439 depends.add(taskid)
440
441 def add_resolved_dependencies(depids, tasknames, depends):
442 for depid in depids:
443 for taskname in tasknames:
444 taskid = taskData.gettask_id_fromfnid(depid, taskname)
445 if taskid is not None:
446 depends.add(taskid)
447
448 for task in xrange(len(taskData.tasks_name)):
449 depends = set()
450 fnid = taskData.tasks_fnid[task]
451 fn = taskData.fn_index[fnid]
452 task_deps = self.dataCache.task_deps[fn]
453
454 logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
455
456 if fnid not in taskData.failed_fnids:
457
458 # Resolve task internal dependencies
459 #
460 # e.g. addtask before X after Y
461 depends = set(taskData.tasks_tdepends[task])
462
463 # Resolve 'deptask' dependencies
464 #
465 # e.g. do_sometask[deptask] = "do_someothertask"
466 # (makes sure sometask runs after someothertask of all DEPENDS)
467 if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
468 tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
469 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
470
471 # Resolve 'rdeptask' dependencies
472 #
473 # e.g. do_sometask[rdeptask] = "do_someothertask"
474 # (makes sure sometask runs after someothertask of all RDEPENDS)
475 if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
476 tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
477 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
478
479 # Resolve inter-task dependencies
480 #
481 # e.g. do_sometask[depends] = "targetname:do_someothertask"
482 # (makes sure sometask runs after targetname's someothertask)
483 idepends = taskData.tasks_idepends[task]
484 for (depid, idependtask) in idepends:
485 if depid in taskData.build_targets and not depid in taskData.failed_deps:
486 # Won't be in build_targets if ASSUME_PROVIDED
487 depdata = taskData.build_targets[depid][0]
488 if depdata is not None:
489 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
490 if taskid is None:
491 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
492 depends.add(taskid)
493 irdepends = taskData.tasks_irdepends[task]
494 for (depid, idependtask) in irdepends:
495 if depid in taskData.run_targets:
496 # Won't be in run_targets if ASSUME_PROVIDED
497 depdata = taskData.run_targets[depid][0]
498 if depdata is not None:
499 taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
500 if taskid is None:
501 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
502 depends.add(taskid)
503
504 # Resolve recursive 'recrdeptask' dependencies (Part A)
505 #
506 # e.g. do_sometask[recrdeptask] = "do_someothertask"
507 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
508 # We cover the recursive part of the dependencies below
509 if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
510 tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
511 recursivetasks[task] = tasknames
512 add_build_dependencies(taskData.depids[fnid], tasknames, depends)
513 add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
514 if taskData.tasks_name[task] in tasknames:
515 recursivetasksselfref.add(task)
516
517 if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']:
518 recursiveitasks[task] = []
519 for t in task_deps['recideptask'][taskData.tasks_name[task]].split():
520 newdep = taskData.gettask_id_fromfnid(fnid, t)
521 recursiveitasks[task].append(newdep)
522
523 self.runq_fnid.append(taskData.tasks_fnid[task])
524 self.runq_task.append(taskData.tasks_name[task])
525 self.runq_depends.append(depends)
526 self.runq_revdeps.append(set())
527 self.runq_hash.append("")
528
529 runq_build.append(0)
530
531 # Resolve recursive 'recrdeptask' dependencies (Part B)
532 #
533 # e.g. do_sometask[recrdeptask] = "do_someothertask"
534 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
535 # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
536 extradeps = {}
537 for task in recursivetasks:
538 extradeps[task] = set(self.runq_depends[task])
539 tasknames = recursivetasks[task]
540 seendeps = set()
541 seenfnid = []
542
543 def generate_recdeps(t):
544 newdeps = set()
545 add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
546 extradeps[task].update(newdeps)
547 seendeps.add(t)
548 newdeps.add(t)
549 for i in newdeps:
550 for n in self.runq_depends[i]:
551 if n not in seendeps:
552 generate_recdeps(n)
553 generate_recdeps(task)
554
555 if task in recursiveitasks:
556 for dep in recursiveitasks[task]:
557 generate_recdeps(dep)
558
559 # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
560 for task in recursivetasks:
561 extradeps[task].difference_update(recursivetasksselfref)
562
563 for task in xrange(len(taskData.tasks_name)):
564 # Add in extra dependencies
565 if task in extradeps:
566 self.runq_depends[task] = extradeps[task]
567 # Remove all self references
568 if task in self.runq_depends[task]:
569 logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
570 self.runq_depends[task].remove(task)
571
572 # Step B - Mark all active tasks
573 #
574 # Start with the tasks we were asked to run and mark all dependencies
575 # as active too. If the task is to be 'forced', clear its stamp. Once
576 # all active tasks are marked, prune the ones we don't need.
577
578 logger.verbose("Marking Active Tasks")
579
580 def mark_active(listid, depth):
581 """
582 Mark an item as active along with its depends
583 (calls itself recursively)
584 """
585
586 if runq_build[listid] == 1:
587 return
588
589 runq_build[listid] = 1
590
591 depends = self.runq_depends[listid]
592 for depend in depends:
593 mark_active(depend, depth+1)
594
595 self.target_pairs = []
596 for target in self.targets:
597 targetid = taskData.getbuild_id(target[0])
598
599 if targetid not in taskData.build_targets:
600 continue
601
602 if targetid in taskData.failed_deps:
603 continue
604
605 fnid = taskData.build_targets[targetid][0]
606 fn = taskData.fn_index[fnid]
607 self.target_pairs.append((fn, target[1]))
608
609 if fnid in taskData.failed_fnids:
610 continue
611
612 if target[1] not in taskData.tasks_lookup[fnid]:
613 import difflib
614 close_matches = difflib.get_close_matches(target[1], taskData.tasks_lookup[fnid], cutoff=0.7)
615 if close_matches:
616 extra = ". Close matches:\n %s" % "\n ".join(close_matches)
617 else:
618 extra = ""
619 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (target[1], target[0], extra))
620
621 listid = taskData.tasks_lookup[fnid][target[1]]
622
623 mark_active(listid, 1)
624
625 # Step C - Prune all inactive tasks
626 #
627 # Once all active tasks are marked, prune the ones we don't need.
628
629 maps = []
630 delcount = 0
631 for listid in xrange(len(self.runq_fnid)):
632 if runq_build[listid-delcount] == 1:
633 maps.append(listid-delcount)
634 else:
635 del self.runq_fnid[listid-delcount]
636 del self.runq_task[listid-delcount]
637 del self.runq_depends[listid-delcount]
638 del runq_build[listid-delcount]
639 del self.runq_revdeps[listid-delcount]
640 del self.runq_hash[listid-delcount]
641 delcount = delcount + 1
642 maps.append(-1)
643
644 #
645 # Step D - Sanity checks and computation
646 #
647
648 # Check to make sure we still have tasks to run
649 if len(self.runq_fnid) == 0:
650 if not taskData.abort:
651 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
652 else:
653 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
654
655 logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
656
657 # Remap the dependencies to account for the deleted tasks
658 # Check we didn't delete a task we depend on
659 for listid in xrange(len(self.runq_fnid)):
660 newdeps = []
661 origdeps = self.runq_depends[listid]
662 for origdep in origdeps:
663 if maps[origdep] == -1:
664 bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
665 newdeps.append(maps[origdep])
666 self.runq_depends[listid] = set(newdeps)
667
668 logger.verbose("Assign Weightings")
669
670 # Generate a list of reverse dependencies to ease future calculations
671 for listid in xrange(len(self.runq_fnid)):
672 for dep in self.runq_depends[listid]:
673 self.runq_revdeps[dep].add(listid)
674
675 # Identify tasks at the end of dependency chains
676 # Error on circular dependency loops (length two)
677 endpoints = []
678 for listid in xrange(len(self.runq_fnid)):
679 revdeps = self.runq_revdeps[listid]
680 if len(revdeps) == 0:
681 endpoints.append(listid)
682 for dep in revdeps:
683 if dep in self.runq_depends[listid]:
684 #self.dump_data(taskData)
685 bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
686
687 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
688
689 # Calculate task weights
690 # Check of higher length circular dependencies
691 self.runq_weight = self.calculate_task_weights(endpoints)
692
693 # Sanity Check - Check for multiple tasks building the same provider
694 prov_list = {}
695 seen_fn = []
696 for task in xrange(len(self.runq_fnid)):
697 fn = taskData.fn_index[self.runq_fnid[task]]
698 if fn in seen_fn:
699 continue
700 seen_fn.append(fn)
701 for prov in self.dataCache.fn_provides[fn]:
702 if prov not in prov_list:
703 prov_list[prov] = [fn]
704 elif fn not in prov_list[prov]:
705 prov_list[prov].append(fn)
706 for prov in prov_list:
707 if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
708 seen_pn = []
709 # If two versions of the same PN are being built its fatal, we don't support it.
710 for fn in prov_list[prov]:
711 pn = self.dataCache.pkg_fn[fn]
712 if pn not in seen_pn:
713 seen_pn.append(pn)
714 else:
715 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
716 msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov]))
717 if self.warn_multi_bb:
718 logger.warn(msg)
719 else:
720 msg += "\n This usually means one provides something the other doesn't and should."
721 logger.error(msg)
722
723 # Create a whitelist usable by the stamp checks
724 stampfnwhitelist = []
725 for entry in self.stampwhitelist.split():
726 entryid = self.taskData.getbuild_id(entry)
727 if entryid not in self.taskData.build_targets:
728 continue
729 fnid = self.taskData.build_targets[entryid][0]
730 fn = self.taskData.fn_index[fnid]
731 stampfnwhitelist.append(fn)
732 self.stampfnwhitelist = stampfnwhitelist
733
734 # Iterate over the task list looking for tasks with a 'setscene' function
735 self.runq_setscene = []
736 if not self.cooker.configuration.nosetscene:
737 for task in range(len(self.runq_fnid)):
738 setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
739 if not setscene:
740 continue
741 self.runq_setscene.append(task)
742
743 def invalidate_task(fn, taskname, error_nostamp):
744 taskdep = self.dataCache.task_deps[fn]
745 fnid = self.taskData.getfn_id(fn)
746 if taskname not in taskData.tasks_lookup[fnid]:
747 logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname)
748 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
749 if error_nostamp:
750 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
751 else:
752 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
753 else:
754 logger.verbose("Invalidate task %s, %s", taskname, fn)
755 bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
756
757 # Invalidate task if force mode active
758 if self.cooker.configuration.force:
759 for (fn, target) in self.target_pairs:
760 invalidate_task(fn, target, False)
761
762 # Invalidate task if invalidate mode active
763 if self.cooker.configuration.invalidate_stamp:
764 for (fn, target) in self.target_pairs:
765 for st in self.cooker.configuration.invalidate_stamp.split(','):
766 invalidate_task(fn, "do_%s" % st, True)
767
768 # Interate over the task list and call into the siggen code
769 dealtwith = set()
770 todeal = set(range(len(self.runq_fnid)))
771 while len(todeal) > 0:
772 for task in todeal.copy():
773 if len(self.runq_depends[task] - dealtwith) == 0:
774 dealtwith.add(task)
775 todeal.remove(task)
776 procdep = []
777 for dep in self.runq_depends[task]:
778 procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
779 self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
780
781 self.hashes = {}
782 self.hash_deps = {}
783 for task in xrange(len(self.runq_fnid)):
784 identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
785 self.runq_task[task])
786 self.hashes[identifier] = self.runq_hash[task]
787 deps = []
788 for dep in self.runq_depends[task]:
789 depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
790 self.runq_task[dep])
791 deps.append(depidentifier)
792 self.hash_deps[identifier] = deps
793
794 return len(self.runq_fnid)
795
796 def dump_data(self, taskQueue):
797 """
798 Dump some debug information on the internal data structures
799 """
800 logger.debug(3, "run_tasks:")
801 for task in xrange(len(self.rqdata.runq_task)):
802 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
803 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
804 self.rqdata.runq_task[task],
805 self.rqdata.runq_weight[task],
806 self.rqdata.runq_depends[task],
807 self.rqdata.runq_revdeps[task])
808
809 logger.debug(3, "sorted_tasks:")
810 for task1 in xrange(len(self.rqdata.runq_task)):
811 if task1 in self.prio_map:
812 task = self.prio_map[task1]
813 logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
814 taskQueue.fn_index[self.rqdata.runq_fnid[task]],
815 self.rqdata.runq_task[task],
816 self.rqdata.runq_weight[task],
817 self.rqdata.runq_depends[task],
818 self.rqdata.runq_revdeps[task])
819
820class RunQueue:
821 def __init__(self, cooker, cfgData, dataCache, taskData, targets):
822
823 self.cooker = cooker
824 self.cfgData = cfgData
825 self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
826
827 self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
828 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
829 self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
830 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
831
832 self.state = runQueuePrepare
833
834 # For disk space monitor
835 self.dm = monitordisk.diskMonitor(cfgData)
836
837 self.rqexe = None
838 self.worker = None
839 self.workerpipe = None
840 self.fakeworker = None
841 self.fakeworkerpipe = None
842
843 def _start_worker(self, fakeroot = False, rqexec = None):
844 logger.debug(1, "Starting bitbake-worker")
845 if fakeroot:
846 fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
847 fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
848 env = os.environ.copy()
849 for key, value in (var.split('=') for var in fakerootenv):
850 env[key] = value
851 worker = subprocess.Popen([fakerootcmd, "bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
852 else:
853 worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
854 bb.utils.nonblockingfd(worker.stdout)
855 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, rqexec)
856
857 workerdata = {
858 "taskdeps" : self.rqdata.dataCache.task_deps,
859 "fakerootenv" : self.rqdata.dataCache.fakerootenv,
860 "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
861 "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
862 "hashes" : self.rqdata.hashes,
863 "hash_deps" : self.rqdata.hash_deps,
864 "sigchecksums" : bb.parse.siggen.file_checksum_values,
865 "runq_hash" : self.rqdata.runq_hash,
866 "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
867 "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
868 "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
869 "logdefaultdomain" : bb.msg.loggerDefaultDomains,
870 "prhost" : self.cooker.prhost,
871 "buildname" : self.cfgData.getVar("BUILDNAME", True),
872 "date" : self.cfgData.getVar("DATE", True),
873 "time" : self.cfgData.getVar("TIME", True),
874 }
875
876 worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
877 worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
878 worker.stdin.flush()
879
880 return worker, workerpipe
881
882 def _teardown_worker(self, worker, workerpipe):
883 if not worker:
884 return
885 logger.debug(1, "Teardown for bitbake-worker")
886 worker.stdin.write("<quit></quit>")
887 worker.stdin.flush()
888 while worker.returncode is None:
889 workerpipe.read()
890 worker.poll()
891 while workerpipe.read():
892 continue
893 workerpipe.close()
894
895 def start_worker(self):
896 if self.worker:
897 self.teardown_workers()
898 self.worker, self.workerpipe = self._start_worker()
899
900 def start_fakeworker(self, rqexec):
901 if not self.fakeworker:
902 self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
903
904 def teardown_workers(self):
905 self._teardown_worker(self.worker, self.workerpipe)
906 self.worker = None
907 self.workerpipe = None
908 self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
909 self.fakeworker = None
910 self.fakeworkerpipe = None
911
912 def read_workers(self):
913 self.workerpipe.read()
914 if self.fakeworkerpipe:
915 self.fakeworkerpipe.read()
916
917 def active_fds(self):
918 fds = []
919 if self.workerpipe:
920 fds.append(self.workerpipe.input)
921 if self.fakeworkerpipe:
922 fds.append(self.fakeworkerpipe.input)
923 return fds
924
925 def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
926 def get_timestamp(f):
927 try:
928 if not os.access(f, os.F_OK):
929 return None
930 return os.stat(f)[stat.ST_MTIME]
931 except:
932 return None
933
934 if self.stamppolicy == "perfile":
935 fulldeptree = False
936 else:
937 fulldeptree = True
938 stampwhitelist = []
939 if self.stamppolicy == "whitelist":
940 stampwhitelist = self.rqdata.stampfnwhitelist
941
942 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
943 if taskname is None:
944 taskname = self.rqdata.runq_task[task]
945
946 stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
947
948 # If the stamp is missing its not current
949 if not os.access(stampfile, os.F_OK):
950 logger.debug(2, "Stampfile %s not available", stampfile)
951 return False
952 # If its a 'nostamp' task, it's not current
953 taskdep = self.rqdata.dataCache.task_deps[fn]
954 if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
955 logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
956 return False
957
958 if taskname != "do_setscene" and taskname.endswith("_setscene"):
959 return True
960
961 if cache is None:
962 cache = {}
963
964 iscurrent = True
965 t1 = get_timestamp(stampfile)
966 for dep in self.rqdata.runq_depends[task]:
967 if iscurrent:
968 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
969 taskname2 = self.rqdata.runq_task[dep]
970 stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
971 stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
972 t2 = get_timestamp(stampfile2)
973 t3 = get_timestamp(stampfile3)
974 if t3 and t3 > t2:
975 continue
976 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
977 if not t2:
978 logger.debug(2, 'Stampfile %s does not exist', stampfile2)
979 iscurrent = False
980 if t1 < t2:
981 logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
982 iscurrent = False
983 if recurse and iscurrent:
984 if dep in cache:
985 iscurrent = cache[dep]
986 if not iscurrent:
987 logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
988 else:
989 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
990 cache[dep] = iscurrent
991 if recurse:
992 cache[task] = iscurrent
993 return iscurrent
994
995 def _execute_runqueue(self):
996 """
997 Run the tasks in a queue prepared by rqdata.prepare()
998 Upon failure, optionally try to recover the build using any alternate providers
999 (if the abort on failure configuration option isn't set)
1000 """
1001
1002 retval = True
1003
1004 if self.state is runQueuePrepare:
1005 self.rqexe = RunQueueExecuteDummy(self)
1006 if self.rqdata.prepare() == 0:
1007 self.state = runQueueComplete
1008 else:
1009 self.state = runQueueSceneInit
1010
1011 # we are ready to run, see if any UI client needs the dependency info
1012 if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset:
1013 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1014 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1015
1016 if self.state is runQueueSceneInit:
1017 if self.cooker.configuration.dump_signatures:
1018 self.dump_signatures()
1019 else:
1020 self.start_worker()
1021 self.rqexe = RunQueueExecuteScenequeue(self)
1022
1023 if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
1024 self.dm.check(self)
1025
1026 if self.state is runQueueSceneRun:
1027 retval = self.rqexe.execute()
1028
1029 if self.state is runQueueRunInit:
1030 logger.info("Executing RunQueue Tasks")
1031 self.rqexe = RunQueueExecuteTasks(self)
1032 self.state = runQueueRunning
1033
1034 if self.state is runQueueRunning:
1035 retval = self.rqexe.execute()
1036
1037 if self.state is runQueueCleanUp:
1038 self.rqexe.finish()
1039
1040 if self.state is runQueueComplete or self.state is runQueueFailed:
1041 self.teardown_workers()
1042 if self.rqexe.stats.failed:
1043 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1044 else:
1045 # Let's avoid the word "failed" if nothing actually did
1046 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1047
1048 if self.state is runQueueFailed:
1049 if not self.rqdata.taskData.tryaltconfigs:
1050 raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
1051 for fnid in self.rqexe.failed_fnids:
1052 self.rqdata.taskData.fail_fnid(fnid)
1053 self.rqdata.reset()
1054
1055 if self.state is runQueueComplete:
1056 # All done
1057 return False
1058
1059 # Loop
1060 return retval
1061
1062 def execute_runqueue(self):
1063 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1064 try:
1065 return self._execute_runqueue()
1066 except bb.runqueue.TaskFailure:
1067 raise
1068 except SystemExit:
1069 raise
1070 except:
1071 logger.error("An uncaught exception occured in runqueue, please see the failure below:")
1072 try:
1073 self.teardown_workers()
1074 except:
1075 pass
1076 self.state = runQueueComplete
1077 raise
1078
1079 def finish_runqueue(self, now = False):
1080 if not self.rqexe:
1081 return
1082
1083 if now:
1084 self.rqexe.finish_now()
1085 else:
1086 self.rqexe.finish()
1087
1088 def dump_signatures(self):
1089 self.state = runQueueComplete
1090 done = set()
1091 bb.note("Reparsing files to collect dependency data")
1092 for task in range(len(self.rqdata.runq_fnid)):
1093 if self.rqdata.runq_fnid[task] not in done:
1094 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1095 the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
1096 done.add(self.rqdata.runq_fnid[task])
1097
1098 bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
1099
1100 return
1101
1102
1103class RunQueueExecute:
1104
1105 def __init__(self, rq):
1106 self.rq = rq
1107 self.cooker = rq.cooker
1108 self.cfgData = rq.cfgData
1109 self.rqdata = rq.rqdata
1110
1111 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
1112 self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
1113
1114 self.runq_buildable = []
1115 self.runq_running = []
1116 self.runq_complete = []
1117
1118 self.build_stamps = {}
1119 self.failed_fnids = []
1120
1121 self.stampcache = {}
1122
1123 rq.workerpipe.setrunqueueexec(self)
1124 if rq.fakeworkerpipe:
1125 rq.fakeworkerpipe.setrunqueueexec(self)
1126
1127 def runqueue_process_waitpid(self, task, status):
1128
1129 # self.build_stamps[pid] may not exist when use shared work directory.
1130 if task in self.build_stamps:
1131 del self.build_stamps[task]
1132
1133 if status != 0:
1134 self.task_fail(task, status)
1135 else:
1136 self.task_complete(task)
1137 return True
1138
1139 def finish_now(self):
1140
1141 self.rq.worker.stdin.write("<finishnow></finishnow>")
1142 self.rq.worker.stdin.flush()
1143 if self.rq.fakeworker:
1144 self.rq.fakeworker.stdin.write("<finishnow></finishnow>")
1145 self.rq.fakeworker.stdin.flush()
1146
1147 if len(self.failed_fnids) != 0:
1148 self.rq.state = runQueueFailed
1149 return
1150
1151 self.rq.state = runQueueComplete
1152 return
1153
1154 def finish(self):
1155 self.rq.state = runQueueCleanUp
1156
1157 if self.stats.active > 0:
1158 bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
1159 self.rq.read_workers()
1160
1161 return
1162
1163 if len(self.failed_fnids) != 0:
1164 self.rq.state = runQueueFailed
1165 return
1166
1167 self.rq.state = runQueueComplete
1168 return
1169
1170 def check_dependencies(self, task, taskdeps, setscene = False):
1171 if not self.rq.depvalidate:
1172 return False
1173
1174 taskdata = {}
1175 taskdeps.add(task)
1176 for dep in taskdeps:
1177 if setscene:
1178 depid = self.rqdata.runq_setscene[dep]
1179 else:
1180 depid = dep
1181 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
1182 pn = self.rqdata.dataCache.pkg_fn[fn]
1183 taskname = self.rqdata.runq_task[depid]
1184 taskdata[dep] = [pn, taskname, fn]
1185 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1186 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1187 valid = bb.utils.better_eval(call, locs)
1188 return valid
1189
1190class RunQueueExecuteDummy(RunQueueExecute):
1191 def __init__(self, rq):
1192 self.rq = rq
1193 self.stats = RunQueueStats(0)
1194
1195 def finish(self):
1196 self.rq.state = runQueueComplete
1197 return
1198
1199class RunQueueExecuteTasks(RunQueueExecute):
1200 def __init__(self, rq):
1201 RunQueueExecute.__init__(self, rq)
1202
1203 self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
1204
1205 self.stampcache = {}
1206
1207 # Mark initial buildable tasks
1208 for task in xrange(self.stats.total):
1209 self.runq_running.append(0)
1210 self.runq_complete.append(0)
1211 if len(self.rqdata.runq_depends[task]) == 0:
1212 self.runq_buildable.append(1)
1213 else:
1214 self.runq_buildable.append(0)
1215 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1216 self.rq.scenequeue_covered.add(task)
1217
1218 found = True
1219 while found:
1220 found = False
1221 for task in xrange(self.stats.total):
1222 if task in self.rq.scenequeue_covered:
1223 continue
1224 logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
1225
1226 if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
1227 found = True
1228 self.rq.scenequeue_covered.add(task)
1229
1230 logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
1231
1232 # Allow the metadata to elect for setscene tasks to run anyway
1233 covered_remove = set()
1234 if self.rq.setsceneverify:
1235 invalidtasks = []
1236 for task in xrange(len(self.rqdata.runq_task)):
1237 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1238 taskname = self.rqdata.runq_task[task]
1239 taskdep = self.rqdata.dataCache.task_deps[fn]
1240
1241 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1242 continue
1243 if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
1244 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1245 continue
1246 if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
1247 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
1248 continue
1249 invalidtasks.append(task)
1250
1251 call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
1252 call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
1253 locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.data, "invalidtasks" : invalidtasks }
1254 # Backwards compatibility with older versions without invalidtasks
1255 try:
1256 covered_remove = bb.utils.better_eval(call, locs)
1257 except TypeError:
1258 covered_remove = bb.utils.better_eval(call2, locs)
1259
1260 for task in covered_remove:
1261 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1262 taskname = self.rqdata.runq_task[task] + '_setscene'
1263 bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
1264 logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
1265 self.rq.scenequeue_covered.remove(task)
1266
1267 logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
1268
1269 event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
1270
1271 schedulers = self.get_schedulers()
1272 for scheduler in schedulers:
1273 if self.scheduler == scheduler.name:
1274 self.sched = scheduler(self, self.rqdata)
1275 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
1276 break
1277 else:
1278 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
1279 (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1280
1281 def get_schedulers(self):
1282 schedulers = set(obj for obj in globals().values()
1283 if type(obj) is type and
1284 issubclass(obj, RunQueueScheduler))
1285
1286 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
1287 if user_schedulers:
1288 for sched in user_schedulers.split():
1289 if not "." in sched:
1290 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1291 continue
1292
1293 modname, name = sched.rsplit(".", 1)
1294 try:
1295 module = __import__(modname, fromlist=(name,))
1296 except ImportError as exc:
1297 logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1298 raise SystemExit(1)
1299 else:
1300 schedulers.add(getattr(module, name))
1301 return schedulers
1302
1303 def task_completeoutright(self, task):
1304 """
1305 Mark a task as completed
1306 Look at the reverse dependencies and mark any task with
1307 completed dependencies as buildable
1308 """
1309 self.runq_complete[task] = 1
1310 for revdep in self.rqdata.runq_revdeps[task]:
1311 if self.runq_running[revdep] == 1:
1312 continue
1313 if self.runq_buildable[revdep] == 1:
1314 continue
1315 alldeps = 1
1316 for dep in self.rqdata.runq_depends[revdep]:
1317 if self.runq_complete[dep] != 1:
1318 alldeps = 0
1319 if alldeps == 1:
1320 self.runq_buildable[revdep] = 1
1321 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
1322 taskname = self.rqdata.runq_task[revdep]
1323 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
1324
1325 def task_complete(self, task):
1326 self.stats.taskCompleted()
1327 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1328 self.task_completeoutright(task)
1329
1330 def task_fail(self, task, exitcode):
1331 """
1332 Called when a task has failed
1333 Updates the state engine with the failure
1334 """
1335 self.stats.taskFailed()
1336 fnid = self.rqdata.runq_fnid[task]
1337 self.failed_fnids.append(fnid)
1338 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
1339 if self.rqdata.taskData.abort:
1340 self.rq.state = runQueueCleanUp
1341
1342 def task_skip(self, task, reason):
1343 self.runq_running[task] = 1
1344 self.runq_buildable[task] = 1
1345 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1346 self.task_completeoutright(task)
1347 self.stats.taskCompleted()
1348 self.stats.taskSkipped()
1349
1350 def execute(self):
1351 """
1352 Run the tasks in a queue prepared by rqdata.prepare()
1353 """
1354
1355 self.rq.read_workers()
1356
1357
1358 if self.stats.total == 0:
1359 # nothing to do
1360 self.rq.state = runQueueCleanUp
1361
1362 task = self.sched.next()
1363 if task is not None:
1364 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
1365 taskname = self.rqdata.runq_task[task]
1366
1367 if task in self.rq.scenequeue_covered:
1368 logger.debug(2, "Setscene covered task %s (%s)", task,
1369 self.rqdata.get_user_idstring(task))
1370 self.task_skip(task, "covered")
1371 return True
1372
1373 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
1374 logger.debug(2, "Stamp current task %s (%s)", task,
1375 self.rqdata.get_user_idstring(task))
1376 self.task_skip(task, "existing")
1377 return True
1378
1379 taskdep = self.rqdata.dataCache.task_deps[fn]
1380 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1381 startevent = runQueueTaskStarted(task, self.stats, self.rq,
1382 noexec=True)
1383 bb.event.fire(startevent, self.cfgData)
1384 self.runq_running[task] = 1
1385 self.stats.taskActive()
1386 bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
1387 self.task_complete(task)
1388 return True
1389 else:
1390 startevent = runQueueTaskStarted(task, self.stats, self.rq)
1391 bb.event.fire(startevent, self.cfgData)
1392
1393 taskdep = self.rqdata.dataCache.task_deps[fn]
1394 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1395 if not self.rq.fakeworker:
1396 self.rq.start_fakeworker(self)
1397 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1398 self.rq.fakeworker.stdin.flush()
1399 else:
1400 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1401 self.rq.worker.stdin.flush()
1402
1403 self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
1404 self.runq_running[task] = 1
1405 self.stats.taskActive()
1406 if self.stats.active < self.number_tasks:
1407 return True
1408
1409 if self.stats.active > 0:
1410 self.rq.read_workers()
1411 return self.rq.active_fds()
1412
1413 if len(self.failed_fnids) != 0:
1414 self.rq.state = runQueueFailed
1415 return True
1416
1417 # Sanity Checks
1418 for task in xrange(self.stats.total):
1419 if self.runq_buildable[task] == 0:
1420 logger.error("Task %s never buildable!", task)
1421 if self.runq_running[task] == 0:
1422 logger.error("Task %s never ran!", task)
1423 if self.runq_complete[task] == 0:
1424 logger.error("Task %s never completed!", task)
1425 self.rq.state = runQueueComplete
1426
1427 return True
1428
1429class RunQueueExecuteScenequeue(RunQueueExecute):
1430 def __init__(self, rq):
1431 RunQueueExecute.__init__(self, rq)
1432
1433 self.scenequeue_covered = set()
1434 self.scenequeue_notcovered = set()
1435 self.scenequeue_notneeded = set()
1436
1437 # If we don't have any setscene functions, skip this step
1438 if len(self.rqdata.runq_setscene) == 0:
1439 rq.scenequeue_covered = set()
1440 rq.state = runQueueRunInit
1441 return
1442
1443 self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
1444
1445 sq_revdeps = []
1446 sq_revdeps_new = []
1447 sq_revdeps_squash = []
1448 self.sq_harddeps = []
1449
1450 # We need to construct a dependency graph for the setscene functions. Intermediate
1451 # dependencies between the setscene tasks only complicate the code. This code
1452 # therefore aims to collapse the huge runqueue dependency tree into a smaller one
1453 # only containing the setscene functions.
1454
1455 for task in xrange(self.stats.total):
1456 self.runq_running.append(0)
1457 self.runq_complete.append(0)
1458 self.runq_buildable.append(0)
1459
1460 # First process the chains up to the first setscene task.
1461 endpoints = {}
1462 for task in xrange(len(self.rqdata.runq_fnid)):
1463 sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
1464 sq_revdeps_new.append(set())
1465 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1466 endpoints[task] = set()
1467
1468 # Secondly process the chains between setscene tasks.
1469 for task in self.rqdata.runq_setscene:
1470 for dep in self.rqdata.runq_depends[task]:
1471 if dep not in endpoints:
1472 endpoints[dep] = set()
1473 endpoints[dep].add(task)
1474
1475 def process_endpoints(endpoints):
1476 newendpoints = {}
1477 for point, task in endpoints.items():
1478 tasks = set()
1479 if task:
1480 tasks |= task
1481 if sq_revdeps_new[point]:
1482 tasks |= sq_revdeps_new[point]
1483 sq_revdeps_new[point] = set()
1484 if point in self.rqdata.runq_setscene:
1485 sq_revdeps_new[point] = tasks
1486 for dep in self.rqdata.runq_depends[point]:
1487 if point in sq_revdeps[dep]:
1488 sq_revdeps[dep].remove(point)
1489 if tasks:
1490 sq_revdeps_new[dep] |= tasks
1491 if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1492 newendpoints[dep] = task
1493 if len(newendpoints) != 0:
1494 process_endpoints(newendpoints)
1495
1496 process_endpoints(endpoints)
1497
1498 # Build a list of setscene tasks which as "unskippable"
1499 # These are direct endpoints referenced by the build
1500 endpoints2 = {}
1501 sq_revdeps2 = []
1502 sq_revdeps_new2 = []
1503 def process_endpoints2(endpoints):
1504 newendpoints = {}
1505 for point, task in endpoints.items():
1506 tasks = set([point])
1507 if task:
1508 tasks |= task
1509 if sq_revdeps_new2[point]:
1510 tasks |= sq_revdeps_new2[point]
1511 sq_revdeps_new2[point] = set()
1512 if point in self.rqdata.runq_setscene:
1513 sq_revdeps_new2[point] = tasks
1514 for dep in self.rqdata.runq_depends[point]:
1515 if point in sq_revdeps2[dep]:
1516 sq_revdeps2[dep].remove(point)
1517 if tasks:
1518 sq_revdeps_new2[dep] |= tasks
1519 if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
1520 newendpoints[dep] = tasks
1521 if len(newendpoints) != 0:
1522 process_endpoints2(newendpoints)
1523 for task in xrange(len(self.rqdata.runq_fnid)):
1524 sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
1525 sq_revdeps_new2.append(set())
1526 if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
1527 endpoints2[task] = set()
1528 process_endpoints2(endpoints2)
1529 self.unskippable = []
1530 for task in self.rqdata.runq_setscene:
1531 if sq_revdeps_new2[task]:
1532 self.unskippable.append(self.rqdata.runq_setscene.index(task))
1533
1534 for task in xrange(len(self.rqdata.runq_fnid)):
1535 if task in self.rqdata.runq_setscene:
1536 deps = set()
1537 for dep in sq_revdeps_new[task]:
1538 deps.add(self.rqdata.runq_setscene.index(dep))
1539 sq_revdeps_squash.append(deps)
1540 elif len(sq_revdeps_new[task]) != 0:
1541 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
1542
1543 # Resolve setscene inter-task dependencies
1544 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
1545 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
1546 for task in self.rqdata.runq_setscene:
1547 realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
1548 idepends = self.rqdata.taskData.tasks_idepends[realid]
1549 for (depid, idependtask) in idepends:
1550 if depid not in self.rqdata.taskData.build_targets:
1551 continue
1552
1553 depdata = self.rqdata.taskData.build_targets[depid][0]
1554 if depdata is None:
1555 continue
1556 dep = self.rqdata.taskData.fn_index[depdata]
1557 taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
1558 if taskid is None:
1559 bb.msg.fatal("RunQueue", "Task %s:%s depends upon non-existent task %s:%s" % (self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realid]], self.rqdata.taskData.tasks_name[realid], dep, idependtask))
1560
1561 self.sq_harddeps.append(self.rqdata.runq_setscene.index(taskid))
1562 sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
1563 # Have to zero this to avoid circular dependencies
1564 sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
1565
1566 #for task in xrange(len(sq_revdeps_squash)):
1567 # print "Task %s: %s.%s is %s " % (task, self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[self.rqdata.runq_setscene[task]]], self.rqdata.runq_task[self.rqdata.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
1568
1569 self.sq_deps = []
1570 self.sq_revdeps = sq_revdeps_squash
1571 self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
1572
1573 for task in xrange(len(self.sq_revdeps)):
1574 self.sq_deps.append(set())
1575 for task in xrange(len(self.sq_revdeps)):
1576 for dep in self.sq_revdeps[task]:
1577 self.sq_deps[dep].add(task)
1578
1579 for task in xrange(len(self.sq_revdeps)):
1580 if len(self.sq_revdeps[task]) == 0:
1581 self.runq_buildable[task] = 1
1582
1583 if self.rq.hashvalidate:
1584 sq_hash = []
1585 sq_hashfn = []
1586 sq_fn = []
1587 sq_taskname = []
1588 sq_task = []
1589 noexec = []
1590 stamppresent = []
1591 for task in xrange(len(self.sq_revdeps)):
1592 realtask = self.rqdata.runq_setscene[task]
1593 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1594 taskname = self.rqdata.runq_task[realtask]
1595 taskdep = self.rqdata.dataCache.task_deps[fn]
1596
1597 if 'noexec' in taskdep and taskname in taskdep['noexec']:
1598 noexec.append(task)
1599 self.task_skip(task)
1600 bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
1601 continue
1602
1603 if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
1604 logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1605 stamppresent.append(task)
1606 self.task_skip(task)
1607 continue
1608
1609 if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
1610 logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
1611 stamppresent.append(task)
1612 self.task_skip(task)
1613 continue
1614
1615 sq_fn.append(fn)
1616 sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
1617 sq_hash.append(self.rqdata.runq_hash[realtask])
1618 sq_taskname.append(taskname)
1619 sq_task.append(task)
1620 call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
1621 locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
1622 valid = bb.utils.better_eval(call, locs)
1623
1624 valid_new = stamppresent
1625 for v in valid:
1626 valid_new.append(sq_task[v])
1627
1628 for task in xrange(len(self.sq_revdeps)):
1629 if task not in valid_new and task not in noexec:
1630 realtask = self.rqdata.runq_setscene[task]
1631 logger.debug(2, 'No package found, so skipping setscene task %s',
1632 self.rqdata.get_user_idstring(realtask))
1633 self.task_failoutright(task)
1634
1635 logger.info('Executing SetScene Tasks')
1636
1637 self.rq.state = runQueueSceneRun
1638
1639 def scenequeue_updatecounters(self, task, fail = False):
1640 for dep in self.sq_deps[task]:
1641 if fail and task in self.sq_harddeps:
1642 continue
1643 self.sq_revdeps2[dep].remove(task)
1644 if len(self.sq_revdeps2[dep]) == 0:
1645 self.runq_buildable[dep] = 1
1646
1647 def task_completeoutright(self, task):
1648 """
1649 Mark a task as completed
1650 Look at the reverse dependencies and mark any task with
1651 completed dependencies as buildable
1652 """
1653
1654 index = self.rqdata.runq_setscene[task]
1655 logger.debug(1, 'Found task %s which could be accelerated',
1656 self.rqdata.get_user_idstring(index))
1657
1658 self.scenequeue_covered.add(task)
1659 self.scenequeue_updatecounters(task)
1660
1661 def task_complete(self, task):
1662 self.stats.taskCompleted()
1663 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1664 self.task_completeoutright(task)
1665
1666 def task_fail(self, task, result):
1667 self.stats.taskFailed()
1668 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
1669 self.scenequeue_notcovered.add(task)
1670 self.scenequeue_updatecounters(task, True)
1671
1672 def task_failoutright(self, task):
1673 self.runq_running[task] = 1
1674 self.runq_buildable[task] = 1
1675 self.stats.taskCompleted()
1676 self.stats.taskSkipped()
1677 index = self.rqdata.runq_setscene[task]
1678 self.scenequeue_notcovered.add(task)
1679 self.scenequeue_updatecounters(task, True)
1680
1681 def task_skip(self, task):
1682 self.runq_running[task] = 1
1683 self.runq_buildable[task] = 1
1684 self.task_completeoutright(task)
1685 self.stats.taskCompleted()
1686 self.stats.taskSkipped()
1687
1688 def execute(self):
1689 """
1690 Run the tasks in a queue prepared by prepare_runqueue
1691 """
1692
1693 self.rq.read_workers()
1694
1695 task = None
1696 if self.stats.active < self.number_tasks:
1697 # Find the next setscene to run
1698 for nexttask in xrange(self.stats.total):
1699 if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
1700 if nexttask in self.unskippable:
1701 logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1702 if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
1703 logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
1704 self.task_skip(nexttask)
1705 self.scenequeue_notneeded.add(nexttask)
1706 return True
1707 task = nexttask
1708 break
1709 if task is not None:
1710 realtask = self.rqdata.runq_setscene[task]
1711 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
1712
1713 taskname = self.rqdata.runq_task[realtask] + "_setscene"
1714 if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
1715 logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
1716 task, self.rqdata.get_user_idstring(realtask))
1717 self.task_failoutright(task)
1718 return True
1719
1720 if self.cooker.configuration.force:
1721 for target in self.rqdata.target_pairs:
1722 if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
1723 self.task_failoutright(task)
1724 return True
1725
1726 if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
1727 logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
1728 task, self.rqdata.get_user_idstring(realtask))
1729 self.task_skip(task)
1730 return True
1731
1732 startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
1733 bb.event.fire(startevent, self.cfgData)
1734
1735 taskdep = self.rqdata.dataCache.task_deps[fn]
1736 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
1737 if not self.rq.fakeworker:
1738 self.rq.start_fakeworker(self)
1739 self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1740 self.rq.fakeworker.stdin.flush()
1741 else:
1742 self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
1743 self.rq.worker.stdin.flush()
1744
1745 self.runq_running[task] = 1
1746 self.stats.taskActive()
1747 if self.stats.active < self.number_tasks:
1748 return True
1749
1750 if self.stats.active > 0:
1751 self.rq.read_workers()
1752 return self.rq.active_fds()
1753
1754 # Convert scenequeue_covered task numbers into full taskgraph ids
1755 oldcovered = self.scenequeue_covered
1756 self.rq.scenequeue_covered = set()
1757 for task in oldcovered:
1758 self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
1759 self.rq.scenequeue_notcovered = set()
1760 for task in self.scenequeue_notcovered:
1761 self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task])
1762
1763 logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
1764
1765 self.rq.state = runQueueRunInit
1766 return True
1767
1768 def runqueue_process_waitpid(self, task, status):
1769 task = self.rq.rqdata.runq_setscene.index(task)
1770
1771 RunQueueExecute.runqueue_process_waitpid(self, task, status)
1772
1773class TaskFailure(Exception):
1774 """
1775 Exception raised when a task in a runqueue fails
1776 """
1777 def __init__(self, x):
1778 self.args = x
1779
1780
1781class runQueueExitWait(bb.event.Event):
1782 """
1783 Event when waiting for task processes to exit
1784 """
1785
1786 def __init__(self, remain):
1787 self.remain = remain
1788 self.message = "Waiting for %s active tasks to finish" % remain
1789 bb.event.Event.__init__(self)
1790
1791class runQueueEvent(bb.event.Event):
1792 """
1793 Base runQueue event class
1794 """
1795 def __init__(self, task, stats, rq):
1796 self.taskid = task
1797 self.taskstring = rq.rqdata.get_user_idstring(task)
1798 self.taskname = rq.rqdata.get_task_name(task)
1799 self.taskfile = rq.rqdata.get_task_file(task)
1800 self.taskhash = rq.rqdata.get_task_hash(task)
1801 self.stats = stats.copy()
1802 bb.event.Event.__init__(self)
1803
1804class sceneQueueEvent(runQueueEvent):
1805 """
1806 Base sceneQueue event class
1807 """
1808 def __init__(self, task, stats, rq, noexec=False):
1809 runQueueEvent.__init__(self, task, stats, rq)
1810 realtask = rq.rqdata.runq_setscene[task]
1811 self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
1812 self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene"
1813 self.taskfile = rq.rqdata.get_task_file(realtask)
1814 self.taskhash = rq.rqdata.get_task_hash(task)
1815
1816class runQueueTaskStarted(runQueueEvent):
1817 """
1818 Event notifing a task was started
1819 """
1820 def __init__(self, task, stats, rq, noexec=False):
1821 runQueueEvent.__init__(self, task, stats, rq)
1822 self.noexec = noexec
1823
1824class sceneQueueTaskStarted(sceneQueueEvent):
1825 """
1826 Event notifing a setscene task was started
1827 """
1828 def __init__(self, task, stats, rq, noexec=False):
1829 sceneQueueEvent.__init__(self, task, stats, rq)
1830 self.noexec = noexec
1831
1832class runQueueTaskFailed(runQueueEvent):
1833 """
1834 Event notifing a task failed
1835 """
1836 def __init__(self, task, stats, exitcode, rq):
1837 runQueueEvent.__init__(self, task, stats, rq)
1838 self.exitcode = exitcode
1839
1840class sceneQueueTaskFailed(sceneQueueEvent):
1841 """
1842 Event notifing a setscene task failed
1843 """
1844 def __init__(self, task, stats, exitcode, rq):
1845 sceneQueueEvent.__init__(self, task, stats, rq)
1846 self.exitcode = exitcode
1847
1848class runQueueTaskCompleted(runQueueEvent):
1849 """
1850 Event notifing a task completed
1851 """
1852
1853class sceneQueueTaskCompleted(sceneQueueEvent):
1854 """
1855 Event notifing a setscene task completed
1856 """
1857
1858class runQueueTaskSkipped(runQueueEvent):
1859 """
1860 Event notifing a task was skipped
1861 """
1862 def __init__(self, task, stats, rq, reason):
1863 runQueueEvent.__init__(self, task, stats, rq)
1864 self.reason = reason
1865
1866class runQueuePipe():
1867 """
1868 Abstraction for a pipe between a worker thread and the server
1869 """
1870 def __init__(self, pipein, pipeout, d, rq):
1871 self.input = pipein
1872 if pipeout:
1873 pipeout.close()
1874 bb.utils.nonblockingfd(self.input)
1875 self.queue = ""
1876 self.d = d
1877 self.rq = rq
1878
1879 def setrunqueueexec(self, rq):
1880 self.rq = rq
1881
1882 def read(self):
1883 start = len(self.queue)
1884 try:
1885 self.queue = self.queue + self.input.read(102400)
1886 except (OSError, IOError) as e:
1887 if e.errno != errno.EAGAIN:
1888 raise
1889 end = len(self.queue)
1890 found = True
1891 while found and len(self.queue):
1892 found = False
1893 index = self.queue.find("</event>")
1894 while index != -1 and self.queue.startswith("<event>"):
1895 event = pickle.loads(self.queue[7:index])
1896 bb.event.fire_from_worker(event, self.d)
1897 found = True
1898 self.queue = self.queue[index+8:]
1899 index = self.queue.find("</event>")
1900 index = self.queue.find("</exitcode>")
1901 while index != -1 and self.queue.startswith("<exitcode>"):
1902 task, status = pickle.loads(self.queue[10:index])
1903 self.rq.runqueue_process_waitpid(task, status)
1904 found = True
1905 self.queue = self.queue[index+11:]
1906 index = self.queue.find("</exitcode>")
1907 return (end > start)
1908
1909 def close(self):
1910 while self.read():
1911 continue
1912 if len(self.queue) > 0:
1913 print("Warning, worker left partial message: %s" % self.queue)
1914 self.input.close()