diff options
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 2154 |
1 files changed, 2154 insertions, 0 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py new file mode 100644 index 0000000000..6372b65fd9 --- /dev/null +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -0,0 +1,2154 @@ | |||
1 | #!/usr/bin/env python | ||
2 | # ex:ts=4:sw=4:sts=4:et | ||
3 | # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- | ||
4 | """ | ||
5 | BitBake 'RunQueue' implementation | ||
6 | |||
7 | Handles preparation and execution of a queue of tasks | ||
8 | """ | ||
9 | |||
10 | # Copyright (C) 2006-2007 Richard Purdie | ||
11 | # | ||
12 | # This program is free software; you can redistribute it and/or modify | ||
13 | # it under the terms of the GNU General Public License version 2 as | ||
14 | # published by the Free Software Foundation. | ||
15 | # | ||
16 | # This program is distributed in the hope that it will be useful, | ||
17 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
18 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
19 | # GNU General Public License for more details. | ||
20 | # | ||
21 | # You should have received a copy of the GNU General Public License along | ||
22 | # with this program; if not, write to the Free Software Foundation, Inc., | ||
23 | # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | ||
24 | |||
25 | import copy | ||
26 | import os | ||
27 | import sys | ||
28 | import signal | ||
29 | import stat | ||
30 | import fcntl | ||
31 | import errno | ||
32 | import logging | ||
33 | import re | ||
34 | import bb | ||
35 | from bb import msg, data, event | ||
36 | from bb import monitordisk | ||
37 | import subprocess | ||
38 | |||
39 | try: | ||
40 | import cPickle as pickle | ||
41 | except ImportError: | ||
42 | import pickle | ||
43 | |||
44 | bblogger = logging.getLogger("BitBake") | ||
45 | logger = logging.getLogger("BitBake.RunQueue") | ||
46 | |||
47 | __find_md5__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{32}(?![a-z0-9])' ) | ||
48 | |||
49 | class RunQueueStats: | ||
50 | """ | ||
51 | Holds statistics on the tasks handled by the associated runQueue | ||
52 | """ | ||
53 | def __init__(self, total): | ||
54 | self.completed = 0 | ||
55 | self.skipped = 0 | ||
56 | self.failed = 0 | ||
57 | self.active = 0 | ||
58 | self.total = total | ||
59 | |||
60 | def copy(self): | ||
61 | obj = self.__class__(self.total) | ||
62 | obj.__dict__.update(self.__dict__) | ||
63 | return obj | ||
64 | |||
65 | def taskFailed(self): | ||
66 | self.active = self.active - 1 | ||
67 | self.failed = self.failed + 1 | ||
68 | |||
69 | def taskCompleted(self, number = 1): | ||
70 | self.active = self.active - number | ||
71 | self.completed = self.completed + number | ||
72 | |||
73 | def taskSkipped(self, number = 1): | ||
74 | self.active = self.active + number | ||
75 | self.skipped = self.skipped + number | ||
76 | |||
77 | def taskActive(self): | ||
78 | self.active = self.active + 1 | ||
79 | |||
80 | # These values indicate the next step due to be run in the | ||
81 | # runQueue state machine | ||
82 | runQueuePrepare = 2 | ||
83 | runQueueSceneInit = 3 | ||
84 | runQueueSceneRun = 4 | ||
85 | runQueueRunInit = 5 | ||
86 | runQueueRunning = 6 | ||
87 | runQueueFailed = 7 | ||
88 | runQueueCleanUp = 8 | ||
89 | runQueueComplete = 9 | ||
90 | |||
91 | class RunQueueScheduler(object): | ||
92 | """ | ||
93 | Control the order tasks are scheduled in. | ||
94 | """ | ||
95 | name = "basic" | ||
96 | |||
97 | def __init__(self, runqueue, rqdata): | ||
98 | """ | ||
99 | The default scheduler just returns the first buildable task (the | ||
100 | priority map is sorted by task numer) | ||
101 | """ | ||
102 | self.rq = runqueue | ||
103 | self.rqdata = rqdata | ||
104 | self.numTasks = len(self.rqdata.runq_fnid) | ||
105 | |||
106 | self.prio_map = [] | ||
107 | self.prio_map.extend(range(self.numTasks)) | ||
108 | |||
109 | self.buildable = [] | ||
110 | self.stamps = {} | ||
111 | for taskid in xrange(self.numTasks): | ||
112 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]] | ||
113 | taskname = self.rqdata.runq_task[taskid] | ||
114 | self.stamps[taskid] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | ||
115 | if self.rq.runq_buildable[taskid] == 1: | ||
116 | self.buildable.append(taskid) | ||
117 | |||
118 | self.rev_prio_map = None | ||
119 | |||
120 | def next_buildable_task(self): | ||
121 | """ | ||
122 | Return the id of the first task we find that is buildable | ||
123 | """ | ||
124 | self.buildable = [x for x in self.buildable if not self.rq.runq_running[x] == 1] | ||
125 | if not self.buildable: | ||
126 | return None | ||
127 | if len(self.buildable) == 1: | ||
128 | taskid = self.buildable[0] | ||
129 | stamp = self.stamps[taskid] | ||
130 | if stamp not in self.rq.build_stamps.itervalues(): | ||
131 | return taskid | ||
132 | |||
133 | if not self.rev_prio_map: | ||
134 | self.rev_prio_map = range(self.numTasks) | ||
135 | for taskid in xrange(self.numTasks): | ||
136 | self.rev_prio_map[self.prio_map[taskid]] = taskid | ||
137 | |||
138 | best = None | ||
139 | bestprio = None | ||
140 | for taskid in self.buildable: | ||
141 | prio = self.rev_prio_map[taskid] | ||
142 | if not bestprio or bestprio > prio: | ||
143 | stamp = self.stamps[taskid] | ||
144 | if stamp in self.rq.build_stamps.itervalues(): | ||
145 | continue | ||
146 | bestprio = prio | ||
147 | best = taskid | ||
148 | |||
149 | return best | ||
150 | |||
151 | def next(self): | ||
152 | """ | ||
153 | Return the id of the task we should build next | ||
154 | """ | ||
155 | if self.rq.stats.active < self.rq.number_tasks: | ||
156 | return self.next_buildable_task() | ||
157 | |||
158 | def newbuilable(self, task): | ||
159 | self.buildable.append(task) | ||
160 | |||
161 | class RunQueueSchedulerSpeed(RunQueueScheduler): | ||
162 | """ | ||
163 | A scheduler optimised for speed. The priority map is sorted by task weight, | ||
164 | heavier weighted tasks (tasks needed by the most other tasks) are run first. | ||
165 | """ | ||
166 | name = "speed" | ||
167 | |||
168 | def __init__(self, runqueue, rqdata): | ||
169 | """ | ||
170 | The priority map is sorted by task weight. | ||
171 | """ | ||
172 | RunQueueScheduler.__init__(self, runqueue, rqdata) | ||
173 | |||
174 | sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight)) | ||
175 | copyweight = copy.deepcopy(self.rqdata.runq_weight) | ||
176 | self.prio_map = [] | ||
177 | |||
178 | for weight in sortweight: | ||
179 | idx = copyweight.index(weight) | ||
180 | self.prio_map.append(idx) | ||
181 | copyweight[idx] = -1 | ||
182 | |||
183 | self.prio_map.reverse() | ||
184 | |||
185 | class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): | ||
186 | """ | ||
187 | A scheduler optimised to complete .bb files are quickly as possible. The | ||
188 | priority map is sorted by task weight, but then reordered so once a given | ||
189 | .bb file starts to build, its completed as quickly as possible. This works | ||
190 | well where disk space is at a premium and classes like OE's rm_work are in | ||
191 | force. | ||
192 | """ | ||
193 | name = "completion" | ||
194 | |||
195 | def __init__(self, runqueue, rqdata): | ||
196 | RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata) | ||
197 | |||
198 | #FIXME - whilst this groups all fnids together it does not reorder the | ||
199 | #fnid groups optimally. | ||
200 | |||
201 | basemap = copy.deepcopy(self.prio_map) | ||
202 | self.prio_map = [] | ||
203 | while (len(basemap) > 0): | ||
204 | entry = basemap.pop(0) | ||
205 | self.prio_map.append(entry) | ||
206 | fnid = self.rqdata.runq_fnid[entry] | ||
207 | todel = [] | ||
208 | for entry in basemap: | ||
209 | entry_fnid = self.rqdata.runq_fnid[entry] | ||
210 | if entry_fnid == fnid: | ||
211 | todel.append(basemap.index(entry)) | ||
212 | self.prio_map.append(entry) | ||
213 | todel.reverse() | ||
214 | for idx in todel: | ||
215 | del basemap[idx] | ||
216 | |||
217 | class RunQueueData: | ||
218 | """ | ||
219 | BitBake Run Queue implementation | ||
220 | """ | ||
221 | def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets): | ||
222 | self.cooker = cooker | ||
223 | self.dataCache = dataCache | ||
224 | self.taskData = taskData | ||
225 | self.targets = targets | ||
226 | self.rq = rq | ||
227 | self.warn_multi_bb = False | ||
228 | |||
229 | self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or "" | ||
230 | self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split() | ||
231 | |||
232 | self.reset() | ||
233 | |||
234 | def reset(self): | ||
235 | self.runq_fnid = [] | ||
236 | self.runq_task = [] | ||
237 | self.runq_depends = [] | ||
238 | self.runq_revdeps = [] | ||
239 | self.runq_hash = [] | ||
240 | |||
241 | def runq_depends_names(self, ids): | ||
242 | import re | ||
243 | ret = [] | ||
244 | for id in self.runq_depends[ids]: | ||
245 | nam = os.path.basename(self.get_user_idstring(id)) | ||
246 | nam = re.sub("_[^,]*,", ",", nam) | ||
247 | ret.extend([nam]) | ||
248 | return ret | ||
249 | |||
250 | def get_task_name(self, task): | ||
251 | return self.runq_task[task] | ||
252 | |||
253 | def get_task_file(self, task): | ||
254 | return self.taskData.fn_index[self.runq_fnid[task]] | ||
255 | |||
256 | def get_task_hash(self, task): | ||
257 | return self.runq_hash[task] | ||
258 | |||
259 | def get_user_idstring(self, task, task_name_suffix = ""): | ||
260 | fn = self.taskData.fn_index[self.runq_fnid[task]] | ||
261 | taskname = self.runq_task[task] + task_name_suffix | ||
262 | return "%s, %s" % (fn, taskname) | ||
263 | |||
264 | def get_task_id(self, fnid, taskname): | ||
265 | for listid in xrange(len(self.runq_fnid)): | ||
266 | if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname: | ||
267 | return listid | ||
268 | return None | ||
269 | |||
270 | def circular_depchains_handler(self, tasks): | ||
271 | """ | ||
272 | Some tasks aren't buildable, likely due to circular dependency issues. | ||
273 | Identify the circular dependencies and print them in a user readable format. | ||
274 | """ | ||
275 | from copy import deepcopy | ||
276 | |||
277 | valid_chains = [] | ||
278 | explored_deps = {} | ||
279 | msgs = [] | ||
280 | |||
281 | def chain_reorder(chain): | ||
282 | """ | ||
283 | Reorder a dependency chain so the lowest task id is first | ||
284 | """ | ||
285 | lowest = 0 | ||
286 | new_chain = [] | ||
287 | for entry in xrange(len(chain)): | ||
288 | if chain[entry] < chain[lowest]: | ||
289 | lowest = entry | ||
290 | new_chain.extend(chain[lowest:]) | ||
291 | new_chain.extend(chain[:lowest]) | ||
292 | return new_chain | ||
293 | |||
294 | def chain_compare_equal(chain1, chain2): | ||
295 | """ | ||
296 | Compare two dependency chains and see if they're the same | ||
297 | """ | ||
298 | if len(chain1) != len(chain2): | ||
299 | return False | ||
300 | for index in xrange(len(chain1)): | ||
301 | if chain1[index] != chain2[index]: | ||
302 | return False | ||
303 | return True | ||
304 | |||
305 | def chain_array_contains(chain, chain_array): | ||
306 | """ | ||
307 | Return True if chain_array contains chain | ||
308 | """ | ||
309 | for ch in chain_array: | ||
310 | if chain_compare_equal(ch, chain): | ||
311 | return True | ||
312 | return False | ||
313 | |||
314 | def find_chains(taskid, prev_chain): | ||
315 | prev_chain.append(taskid) | ||
316 | total_deps = [] | ||
317 | total_deps.extend(self.runq_revdeps[taskid]) | ||
318 | for revdep in self.runq_revdeps[taskid]: | ||
319 | if revdep in prev_chain: | ||
320 | idx = prev_chain.index(revdep) | ||
321 | # To prevent duplicates, reorder the chain to start with the lowest taskid | ||
322 | # and search through an array of those we've already printed | ||
323 | chain = prev_chain[idx:] | ||
324 | new_chain = chain_reorder(chain) | ||
325 | if not chain_array_contains(new_chain, valid_chains): | ||
326 | valid_chains.append(new_chain) | ||
327 | msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) | ||
328 | for dep in new_chain: | ||
329 | msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep))) | ||
330 | msgs.append("\n") | ||
331 | if len(valid_chains) > 10: | ||
332 | msgs.append("Aborted dependency loops search after 10 matches.\n") | ||
333 | return msgs | ||
334 | continue | ||
335 | scan = False | ||
336 | if revdep not in explored_deps: | ||
337 | scan = True | ||
338 | elif revdep in explored_deps[revdep]: | ||
339 | scan = True | ||
340 | else: | ||
341 | for dep in prev_chain: | ||
342 | if dep in explored_deps[revdep]: | ||
343 | scan = True | ||
344 | if scan: | ||
345 | find_chains(revdep, copy.deepcopy(prev_chain)) | ||
346 | for dep in explored_deps[revdep]: | ||
347 | if dep not in total_deps: | ||
348 | total_deps.append(dep) | ||
349 | |||
350 | explored_deps[taskid] = total_deps | ||
351 | |||
352 | for task in tasks: | ||
353 | find_chains(task, []) | ||
354 | |||
355 | return msgs | ||
356 | |||
357 | def calculate_task_weights(self, endpoints): | ||
358 | """ | ||
359 | Calculate a number representing the "weight" of each task. Heavier weighted tasks | ||
360 | have more dependencies and hence should be executed sooner for maximum speed. | ||
361 | |||
362 | This function also sanity checks the task list finding tasks that are not | ||
363 | possible to execute due to circular dependencies. | ||
364 | """ | ||
365 | |||
366 | numTasks = len(self.runq_fnid) | ||
367 | weight = [] | ||
368 | deps_left = [] | ||
369 | task_done = [] | ||
370 | |||
371 | for listid in xrange(numTasks): | ||
372 | task_done.append(False) | ||
373 | weight.append(0) | ||
374 | deps_left.append(len(self.runq_revdeps[listid])) | ||
375 | |||
376 | for listid in endpoints: | ||
377 | weight[listid] = 1 | ||
378 | task_done[listid] = True | ||
379 | |||
380 | while True: | ||
381 | next_points = [] | ||
382 | for listid in endpoints: | ||
383 | for revdep in self.runq_depends[listid]: | ||
384 | weight[revdep] = weight[revdep] + weight[listid] | ||
385 | deps_left[revdep] = deps_left[revdep] - 1 | ||
386 | if deps_left[revdep] == 0: | ||
387 | next_points.append(revdep) | ||
388 | task_done[revdep] = True | ||
389 | endpoints = next_points | ||
390 | if len(next_points) == 0: | ||
391 | break | ||
392 | |||
393 | # Circular dependency sanity check | ||
394 | problem_tasks = [] | ||
395 | for task in xrange(numTasks): | ||
396 | if task_done[task] is False or deps_left[task] != 0: | ||
397 | problem_tasks.append(task) | ||
398 | logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task)) | ||
399 | logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task]) | ||
400 | |||
401 | if problem_tasks: | ||
402 | message = "Unbuildable tasks were found.\n" | ||
403 | message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" | ||
404 | message = message + "Identifying dependency loops (this may take a short while)...\n" | ||
405 | logger.error(message) | ||
406 | |||
407 | msgs = self.circular_depchains_handler(problem_tasks) | ||
408 | |||
409 | message = "\n" | ||
410 | for msg in msgs: | ||
411 | message = message + msg | ||
412 | bb.msg.fatal("RunQueue", message) | ||
413 | |||
414 | return weight | ||
415 | |||
416 | def prepare(self): | ||
417 | """ | ||
418 | Turn a set of taskData into a RunQueue and compute data needed | ||
419 | to optimise the execution order. | ||
420 | """ | ||
421 | |||
422 | runq_build = [] | ||
423 | recursivetasks = {} | ||
424 | recursiveitasks = {} | ||
425 | recursivetasksselfref = set() | ||
426 | |||
427 | taskData = self.taskData | ||
428 | |||
429 | if len(taskData.tasks_name) == 0: | ||
430 | # Nothing to do | ||
431 | return 0 | ||
432 | |||
433 | logger.info("Preparing runqueue") | ||
434 | |||
435 | # Step A - Work out a list of tasks to run | ||
436 | # | ||
437 | # Taskdata gives us a list of possible providers for every build and run | ||
438 | # target ordered by priority. It also gives information on each of those | ||
439 | # providers. | ||
440 | # | ||
441 | # To create the actual list of tasks to execute we fix the list of | ||
442 | # providers and then resolve the dependencies into task IDs. This | ||
443 | # process is repeated for each type of dependency (tdepends, deptask, | ||
444 | # rdeptast, recrdeptask, idepends). | ||
445 | |||
446 | def add_build_dependencies(depids, tasknames, depends): | ||
447 | for depid in depids: | ||
448 | # Won't be in build_targets if ASSUME_PROVIDED | ||
449 | if depid not in taskData.build_targets: | ||
450 | continue | ||
451 | depdata = taskData.build_targets[depid][0] | ||
452 | if depdata is None: | ||
453 | continue | ||
454 | for taskname in tasknames: | ||
455 | taskid = taskData.gettask_id_fromfnid(depdata, taskname) | ||
456 | if taskid is not None: | ||
457 | depends.add(taskid) | ||
458 | |||
459 | def add_runtime_dependencies(depids, tasknames, depends): | ||
460 | for depid in depids: | ||
461 | if depid not in taskData.run_targets: | ||
462 | continue | ||
463 | depdata = taskData.run_targets[depid][0] | ||
464 | if depdata is None: | ||
465 | continue | ||
466 | for taskname in tasknames: | ||
467 | taskid = taskData.gettask_id_fromfnid(depdata, taskname) | ||
468 | if taskid is not None: | ||
469 | depends.add(taskid) | ||
470 | |||
471 | def add_resolved_dependencies(depids, tasknames, depends): | ||
472 | for depid in depids: | ||
473 | for taskname in tasknames: | ||
474 | taskid = taskData.gettask_id_fromfnid(depid, taskname) | ||
475 | if taskid is not None: | ||
476 | depends.add(taskid) | ||
477 | |||
478 | for task in xrange(len(taskData.tasks_name)): | ||
479 | depends = set() | ||
480 | fnid = taskData.tasks_fnid[task] | ||
481 | fn = taskData.fn_index[fnid] | ||
482 | task_deps = self.dataCache.task_deps[fn] | ||
483 | |||
484 | #logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task]) | ||
485 | |||
486 | if fnid not in taskData.failed_fnids: | ||
487 | |||
488 | # Resolve task internal dependencies | ||
489 | # | ||
490 | # e.g. addtask before X after Y | ||
491 | depends = set(taskData.tasks_tdepends[task]) | ||
492 | |||
493 | # Resolve 'deptask' dependencies | ||
494 | # | ||
495 | # e.g. do_sometask[deptask] = "do_someothertask" | ||
496 | # (makes sure sometask runs after someothertask of all DEPENDS) | ||
497 | if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']: | ||
498 | tasknames = task_deps['deptask'][taskData.tasks_name[task]].split() | ||
499 | add_build_dependencies(taskData.depids[fnid], tasknames, depends) | ||
500 | |||
501 | # Resolve 'rdeptask' dependencies | ||
502 | # | ||
503 | # e.g. do_sometask[rdeptask] = "do_someothertask" | ||
504 | # (makes sure sometask runs after someothertask of all RDEPENDS) | ||
505 | if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']: | ||
506 | tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split() | ||
507 | add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends) | ||
508 | |||
509 | # Resolve inter-task dependencies | ||
510 | # | ||
511 | # e.g. do_sometask[depends] = "targetname:do_someothertask" | ||
512 | # (makes sure sometask runs after targetname's someothertask) | ||
513 | idepends = taskData.tasks_idepends[task] | ||
514 | for (depid, idependtask) in idepends: | ||
515 | if depid in taskData.build_targets and not depid in taskData.failed_deps: | ||
516 | # Won't be in build_targets if ASSUME_PROVIDED | ||
517 | depdata = taskData.build_targets[depid][0] | ||
518 | if depdata is not None: | ||
519 | taskid = taskData.gettask_id_fromfnid(depdata, idependtask) | ||
520 | if taskid is None: | ||
521 | bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata])) | ||
522 | depends.add(taskid) | ||
523 | irdepends = taskData.tasks_irdepends[task] | ||
524 | for (depid, idependtask) in irdepends: | ||
525 | if depid in taskData.run_targets: | ||
526 | # Won't be in run_targets if ASSUME_PROVIDED | ||
527 | depdata = taskData.run_targets[depid][0] | ||
528 | if depdata is not None: | ||
529 | taskid = taskData.gettask_id_fromfnid(depdata, idependtask) | ||
530 | if taskid is None: | ||
531 | bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata])) | ||
532 | depends.add(taskid) | ||
533 | |||
534 | # Resolve recursive 'recrdeptask' dependencies (Part A) | ||
535 | # | ||
536 | # e.g. do_sometask[recrdeptask] = "do_someothertask" | ||
537 | # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) | ||
538 | # We cover the recursive part of the dependencies below | ||
539 | if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']: | ||
540 | tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split() | ||
541 | recursivetasks[task] = tasknames | ||
542 | add_build_dependencies(taskData.depids[fnid], tasknames, depends) | ||
543 | add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends) | ||
544 | if taskData.tasks_name[task] in tasknames: | ||
545 | recursivetasksselfref.add(task) | ||
546 | |||
547 | if 'recideptask' in task_deps and taskData.tasks_name[task] in task_deps['recideptask']: | ||
548 | recursiveitasks[task] = [] | ||
549 | for t in task_deps['recideptask'][taskData.tasks_name[task]].split(): | ||
550 | newdep = taskData.gettask_id_fromfnid(fnid, t) | ||
551 | recursiveitasks[task].append(newdep) | ||
552 | |||
553 | self.runq_fnid.append(taskData.tasks_fnid[task]) | ||
554 | self.runq_task.append(taskData.tasks_name[task]) | ||
555 | self.runq_depends.append(depends) | ||
556 | self.runq_revdeps.append(set()) | ||
557 | self.runq_hash.append("") | ||
558 | |||
559 | runq_build.append(0) | ||
560 | |||
561 | # Resolve recursive 'recrdeptask' dependencies (Part B) | ||
562 | # | ||
563 | # e.g. do_sometask[recrdeptask] = "do_someothertask" | ||
564 | # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) | ||
565 | # We need to do this separately since we need all of self.runq_depends to be complete before this is processed | ||
566 | extradeps = {} | ||
567 | for task in recursivetasks: | ||
568 | extradeps[task] = set(self.runq_depends[task]) | ||
569 | tasknames = recursivetasks[task] | ||
570 | seendeps = set() | ||
571 | seenfnid = [] | ||
572 | |||
573 | def generate_recdeps(t): | ||
574 | newdeps = set() | ||
575 | add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps) | ||
576 | extradeps[task].update(newdeps) | ||
577 | seendeps.add(t) | ||
578 | newdeps.add(t) | ||
579 | for i in newdeps: | ||
580 | for n in self.runq_depends[i]: | ||
581 | if n not in seendeps: | ||
582 | generate_recdeps(n) | ||
583 | generate_recdeps(task) | ||
584 | |||
585 | if task in recursiveitasks: | ||
586 | for dep in recursiveitasks[task]: | ||
587 | generate_recdeps(dep) | ||
588 | |||
589 | # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work | ||
590 | for task in recursivetasks: | ||
591 | extradeps[task].difference_update(recursivetasksselfref) | ||
592 | |||
593 | for task in xrange(len(taskData.tasks_name)): | ||
594 | # Add in extra dependencies | ||
595 | if task in extradeps: | ||
596 | self.runq_depends[task] = extradeps[task] | ||
597 | # Remove all self references | ||
598 | if task in self.runq_depends[task]: | ||
599 | logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task]) | ||
600 | self.runq_depends[task].remove(task) | ||
601 | |||
602 | # Step B - Mark all active tasks | ||
603 | # | ||
604 | # Start with the tasks we were asked to run and mark all dependencies | ||
605 | # as active too. If the task is to be 'forced', clear its stamp. Once | ||
606 | # all active tasks are marked, prune the ones we don't need. | ||
607 | |||
608 | logger.verbose("Marking Active Tasks") | ||
609 | |||
610 | def mark_active(listid, depth): | ||
611 | """ | ||
612 | Mark an item as active along with its depends | ||
613 | (calls itself recursively) | ||
614 | """ | ||
615 | |||
616 | if runq_build[listid] == 1: | ||
617 | return | ||
618 | |||
619 | runq_build[listid] = 1 | ||
620 | |||
621 | depends = self.runq_depends[listid] | ||
622 | for depend in depends: | ||
623 | mark_active(depend, depth+1) | ||
624 | |||
625 | self.target_pairs = [] | ||
626 | for target in self.targets: | ||
627 | targetid = taskData.getbuild_id(target[0]) | ||
628 | |||
629 | if targetid not in taskData.build_targets: | ||
630 | continue | ||
631 | |||
632 | if targetid in taskData.failed_deps: | ||
633 | continue | ||
634 | |||
635 | fnid = taskData.build_targets[targetid][0] | ||
636 | fn = taskData.fn_index[fnid] | ||
637 | self.target_pairs.append((fn, target[1])) | ||
638 | |||
639 | if fnid in taskData.failed_fnids: | ||
640 | continue | ||
641 | |||
642 | if target[1] not in taskData.tasks_lookup[fnid]: | ||
643 | import difflib | ||
644 | close_matches = difflib.get_close_matches(target[1], taskData.tasks_lookup[fnid], cutoff=0.7) | ||
645 | if close_matches: | ||
646 | extra = ". Close matches:\n %s" % "\n ".join(close_matches) | ||
647 | else: | ||
648 | extra = "" | ||
649 | bb.msg.fatal("RunQueue", "Task %s does not exist for target %s%s" % (target[1], target[0], extra)) | ||
650 | |||
651 | listid = taskData.tasks_lookup[fnid][target[1]] | ||
652 | |||
653 | mark_active(listid, 1) | ||
654 | |||
655 | # Step C - Prune all inactive tasks | ||
656 | # | ||
657 | # Once all active tasks are marked, prune the ones we don't need. | ||
658 | |||
659 | maps = [] | ||
660 | delcount = 0 | ||
661 | for listid in xrange(len(self.runq_fnid)): | ||
662 | if runq_build[listid-delcount] == 1: | ||
663 | maps.append(listid-delcount) | ||
664 | else: | ||
665 | del self.runq_fnid[listid-delcount] | ||
666 | del self.runq_task[listid-delcount] | ||
667 | del self.runq_depends[listid-delcount] | ||
668 | del runq_build[listid-delcount] | ||
669 | del self.runq_revdeps[listid-delcount] | ||
670 | del self.runq_hash[listid-delcount] | ||
671 | delcount = delcount + 1 | ||
672 | maps.append(-1) | ||
673 | |||
674 | # | ||
675 | # Step D - Sanity checks and computation | ||
676 | # | ||
677 | |||
678 | # Check to make sure we still have tasks to run | ||
679 | if len(self.runq_fnid) == 0: | ||
680 | if not taskData.abort: | ||
681 | bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") | ||
682 | else: | ||
683 | bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") | ||
684 | |||
685 | logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid)) | ||
686 | |||
687 | # Remap the dependencies to account for the deleted tasks | ||
688 | # Check we didn't delete a task we depend on | ||
689 | for listid in xrange(len(self.runq_fnid)): | ||
690 | newdeps = [] | ||
691 | origdeps = self.runq_depends[listid] | ||
692 | for origdep in origdeps: | ||
693 | if maps[origdep] == -1: | ||
694 | bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!") | ||
695 | newdeps.append(maps[origdep]) | ||
696 | self.runq_depends[listid] = set(newdeps) | ||
697 | |||
698 | logger.verbose("Assign Weightings") | ||
699 | |||
700 | # Generate a list of reverse dependencies to ease future calculations | ||
701 | for listid in xrange(len(self.runq_fnid)): | ||
702 | for dep in self.runq_depends[listid]: | ||
703 | self.runq_revdeps[dep].add(listid) | ||
704 | |||
705 | # Identify tasks at the end of dependency chains | ||
706 | # Error on circular dependency loops (length two) | ||
707 | endpoints = [] | ||
708 | for listid in xrange(len(self.runq_fnid)): | ||
709 | revdeps = self.runq_revdeps[listid] | ||
710 | if len(revdeps) == 0: | ||
711 | endpoints.append(listid) | ||
712 | for dep in revdeps: | ||
713 | if dep in self.runq_depends[listid]: | ||
714 | #self.dump_data(taskData) | ||
715 | bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid])) | ||
716 | |||
717 | logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) | ||
718 | |||
719 | # Calculate task weights | ||
720 | # Check of higher length circular dependencies | ||
721 | self.runq_weight = self.calculate_task_weights(endpoints) | ||
722 | |||
723 | # Sanity Check - Check for multiple tasks building the same provider | ||
724 | prov_list = {} | ||
725 | seen_fn = [] | ||
726 | for task in xrange(len(self.runq_fnid)): | ||
727 | fn = taskData.fn_index[self.runq_fnid[task]] | ||
728 | if fn in seen_fn: | ||
729 | continue | ||
730 | seen_fn.append(fn) | ||
731 | for prov in self.dataCache.fn_provides[fn]: | ||
732 | if prov not in prov_list: | ||
733 | prov_list[prov] = [fn] | ||
734 | elif fn not in prov_list[prov]: | ||
735 | prov_list[prov].append(fn) | ||
736 | for prov in prov_list: | ||
737 | if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist: | ||
738 | seen_pn = [] | ||
739 | # If two versions of the same PN are being built its fatal, we don't support it. | ||
740 | for fn in prov_list[prov]: | ||
741 | pn = self.dataCache.pkg_fn[fn] | ||
742 | if pn not in seen_pn: | ||
743 | seen_pn.append(pn) | ||
744 | else: | ||
745 | bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) | ||
746 | msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov])) | ||
747 | if self.warn_multi_bb: | ||
748 | logger.warn(msg) | ||
749 | else: | ||
750 | msg += "\n This usually means one provides something the other doesn't and should." | ||
751 | logger.error(msg) | ||
752 | |||
753 | # Create a whitelist usable by the stamp checks | ||
754 | stampfnwhitelist = [] | ||
755 | for entry in self.stampwhitelist.split(): | ||
756 | entryid = self.taskData.getbuild_id(entry) | ||
757 | if entryid not in self.taskData.build_targets: | ||
758 | continue | ||
759 | fnid = self.taskData.build_targets[entryid][0] | ||
760 | fn = self.taskData.fn_index[fnid] | ||
761 | stampfnwhitelist.append(fn) | ||
762 | self.stampfnwhitelist = stampfnwhitelist | ||
763 | |||
764 | # Iterate over the task list looking for tasks with a 'setscene' function | ||
765 | self.runq_setscene = [] | ||
766 | if not self.cooker.configuration.nosetscene: | ||
767 | for task in range(len(self.runq_fnid)): | ||
768 | setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False) | ||
769 | if not setscene: | ||
770 | continue | ||
771 | self.runq_setscene.append(task) | ||
772 | |||
773 | def invalidate_task(fn, taskname, error_nostamp): | ||
774 | taskdep = self.dataCache.task_deps[fn] | ||
775 | fnid = self.taskData.getfn_id(fn) | ||
776 | if taskname not in taskData.tasks_lookup[fnid]: | ||
777 | logger.warn("Task %s does not exist, invalidating this task will have no effect" % taskname) | ||
778 | if 'nostamp' in taskdep and taskname in taskdep['nostamp']: | ||
779 | if error_nostamp: | ||
780 | bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) | ||
781 | else: | ||
782 | bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) | ||
783 | else: | ||
784 | logger.verbose("Invalidate task %s, %s", taskname, fn) | ||
785 | bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn) | ||
786 | |||
787 | # Invalidate task if force mode active | ||
788 | if self.cooker.configuration.force: | ||
789 | for (fn, target) in self.target_pairs: | ||
790 | invalidate_task(fn, target, False) | ||
791 | |||
792 | # Invalidate task if invalidate mode active | ||
793 | if self.cooker.configuration.invalidate_stamp: | ||
794 | for (fn, target) in self.target_pairs: | ||
795 | for st in self.cooker.configuration.invalidate_stamp.split(','): | ||
796 | invalidate_task(fn, "do_%s" % st, True) | ||
797 | |||
798 | # Interate over the task list and call into the siggen code | ||
799 | dealtwith = set() | ||
800 | todeal = set(range(len(self.runq_fnid))) | ||
801 | while len(todeal) > 0: | ||
802 | for task in todeal.copy(): | ||
803 | if len(self.runq_depends[task] - dealtwith) == 0: | ||
804 | dealtwith.add(task) | ||
805 | todeal.remove(task) | ||
806 | procdep = [] | ||
807 | for dep in self.runq_depends[task]: | ||
808 | procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep]) | ||
809 | self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache) | ||
810 | |||
811 | return len(self.runq_fnid) | ||
812 | |||
813 | def dump_data(self, taskQueue): | ||
814 | """ | ||
815 | Dump some debug information on the internal data structures | ||
816 | """ | ||
817 | logger.debug(3, "run_tasks:") | ||
818 | for task in xrange(len(self.rqdata.runq_task)): | ||
819 | logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task, | ||
820 | taskQueue.fn_index[self.rqdata.runq_fnid[task]], | ||
821 | self.rqdata.runq_task[task], | ||
822 | self.rqdata.runq_weight[task], | ||
823 | self.rqdata.runq_depends[task], | ||
824 | self.rqdata.runq_revdeps[task]) | ||
825 | |||
826 | logger.debug(3, "sorted_tasks:") | ||
827 | for task1 in xrange(len(self.rqdata.runq_task)): | ||
828 | if task1 in self.prio_map: | ||
829 | task = self.prio_map[task1] | ||
830 | logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task, | ||
831 | taskQueue.fn_index[self.rqdata.runq_fnid[task]], | ||
832 | self.rqdata.runq_task[task], | ||
833 | self.rqdata.runq_weight[task], | ||
834 | self.rqdata.runq_depends[task], | ||
835 | self.rqdata.runq_revdeps[task]) | ||
836 | |||
837 | class RunQueue: | ||
838 | def __init__(self, cooker, cfgData, dataCache, taskData, targets): | ||
839 | |||
840 | self.cooker = cooker | ||
841 | self.cfgData = cfgData | ||
842 | self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets) | ||
843 | |||
844 | self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile" | ||
845 | self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None | ||
846 | self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None | ||
847 | self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None | ||
848 | |||
849 | self.state = runQueuePrepare | ||
850 | |||
851 | # For disk space monitor | ||
852 | self.dm = monitordisk.diskMonitor(cfgData) | ||
853 | |||
854 | self.rqexe = None | ||
855 | self.worker = None | ||
856 | self.workerpipe = None | ||
857 | self.fakeworker = None | ||
858 | self.fakeworkerpipe = None | ||
859 | |||
860 | def _start_worker(self, fakeroot = False, rqexec = None): | ||
861 | logger.debug(1, "Starting bitbake-worker") | ||
862 | if fakeroot: | ||
863 | fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True) | ||
864 | fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split() | ||
865 | env = os.environ.copy() | ||
866 | for key, value in (var.split('=') for var in fakerootenv): | ||
867 | env[key] = value | ||
868 | worker = subprocess.Popen([fakerootcmd, "bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) | ||
869 | else: | ||
870 | worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) | ||
871 | bb.utils.nonblockingfd(worker.stdout) | ||
872 | workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec) | ||
873 | |||
874 | workerdata = { | ||
875 | "taskdeps" : self.rqdata.dataCache.task_deps, | ||
876 | "fakerootenv" : self.rqdata.dataCache.fakerootenv, | ||
877 | "fakerootdirs" : self.rqdata.dataCache.fakerootdirs, | ||
878 | "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv, | ||
879 | "hashes" : bb.parse.siggen.taskhash, | ||
880 | "hash_deps" : bb.parse.siggen.runtaskdeps, | ||
881 | "sigchecksums" : bb.parse.siggen.file_checksum_values, | ||
882 | "runq_hash" : self.rqdata.runq_hash, | ||
883 | "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel, | ||
884 | "logdefaultverbose" : bb.msg.loggerDefaultVerbose, | ||
885 | "logdefaultverboselogs" : bb.msg.loggerVerboseLogs, | ||
886 | "logdefaultdomain" : bb.msg.loggerDefaultDomains, | ||
887 | "prhost" : self.cooker.prhost, | ||
888 | "buildname" : self.cfgData.getVar("BUILDNAME", True), | ||
889 | "date" : self.cfgData.getVar("DATE", True), | ||
890 | "time" : self.cfgData.getVar("TIME", True), | ||
891 | } | ||
892 | |||
893 | worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>") | ||
894 | worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>") | ||
895 | worker.stdin.flush() | ||
896 | |||
897 | return worker, workerpipe | ||
898 | |||
899 | def _teardown_worker(self, worker, workerpipe): | ||
900 | if not worker: | ||
901 | return | ||
902 | logger.debug(1, "Teardown for bitbake-worker") | ||
903 | try: | ||
904 | worker.stdin.write("<quit></quit>") | ||
905 | worker.stdin.flush() | ||
906 | except IOError: | ||
907 | pass | ||
908 | while worker.returncode is None: | ||
909 | workerpipe.read() | ||
910 | worker.poll() | ||
911 | while workerpipe.read(): | ||
912 | continue | ||
913 | workerpipe.close() | ||
914 | |||
915 | def start_worker(self): | ||
916 | if self.worker: | ||
917 | self.teardown_workers() | ||
918 | self.teardown = False | ||
919 | self.worker, self.workerpipe = self._start_worker() | ||
920 | |||
921 | def start_fakeworker(self, rqexec): | ||
922 | if not self.fakeworker: | ||
923 | self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec) | ||
924 | |||
925 | def teardown_workers(self): | ||
926 | self.teardown = True | ||
927 | self._teardown_worker(self.worker, self.workerpipe) | ||
928 | self.worker = None | ||
929 | self.workerpipe = None | ||
930 | self._teardown_worker(self.fakeworker, self.fakeworkerpipe) | ||
931 | self.fakeworker = None | ||
932 | self.fakeworkerpipe = None | ||
933 | |||
934 | def read_workers(self): | ||
935 | self.workerpipe.read() | ||
936 | if self.fakeworkerpipe: | ||
937 | self.fakeworkerpipe.read() | ||
938 | |||
939 | def active_fds(self): | ||
940 | fds = [] | ||
941 | if self.workerpipe: | ||
942 | fds.append(self.workerpipe.input) | ||
943 | if self.fakeworkerpipe: | ||
944 | fds.append(self.fakeworkerpipe.input) | ||
945 | return fds | ||
946 | |||
947 | def check_stamp_task(self, task, taskname = None, recurse = False, cache = None): | ||
948 | def get_timestamp(f): | ||
949 | try: | ||
950 | if not os.access(f, os.F_OK): | ||
951 | return None | ||
952 | return os.stat(f)[stat.ST_MTIME] | ||
953 | except: | ||
954 | return None | ||
955 | |||
956 | if self.stamppolicy == "perfile": | ||
957 | fulldeptree = False | ||
958 | else: | ||
959 | fulldeptree = True | ||
960 | stampwhitelist = [] | ||
961 | if self.stamppolicy == "whitelist": | ||
962 | stampwhitelist = self.rqdata.stampfnwhitelist | ||
963 | |||
964 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
965 | if taskname is None: | ||
966 | taskname = self.rqdata.runq_task[task] | ||
967 | |||
968 | stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | ||
969 | |||
970 | # If the stamp is missing its not current | ||
971 | if not os.access(stampfile, os.F_OK): | ||
972 | logger.debug(2, "Stampfile %s not available", stampfile) | ||
973 | return False | ||
974 | # If its a 'nostamp' task, it's not current | ||
975 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
976 | if 'nostamp' in taskdep and taskname in taskdep['nostamp']: | ||
977 | logger.debug(2, "%s.%s is nostamp\n", fn, taskname) | ||
978 | return False | ||
979 | |||
980 | if taskname != "do_setscene" and taskname.endswith("_setscene"): | ||
981 | return True | ||
982 | |||
983 | if cache is None: | ||
984 | cache = {} | ||
985 | |||
986 | iscurrent = True | ||
987 | t1 = get_timestamp(stampfile) | ||
988 | for dep in self.rqdata.runq_depends[task]: | ||
989 | if iscurrent: | ||
990 | fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]] | ||
991 | taskname2 = self.rqdata.runq_task[dep] | ||
992 | stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2) | ||
993 | stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2) | ||
994 | t2 = get_timestamp(stampfile2) | ||
995 | t3 = get_timestamp(stampfile3) | ||
996 | if t3 and t3 > t2: | ||
997 | continue | ||
998 | if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist): | ||
999 | if not t2: | ||
1000 | logger.debug(2, 'Stampfile %s does not exist', stampfile2) | ||
1001 | iscurrent = False | ||
1002 | if t1 < t2: | ||
1003 | logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2) | ||
1004 | iscurrent = False | ||
1005 | if recurse and iscurrent: | ||
1006 | if dep in cache: | ||
1007 | iscurrent = cache[dep] | ||
1008 | if not iscurrent: | ||
1009 | logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) | ||
1010 | else: | ||
1011 | iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) | ||
1012 | cache[dep] = iscurrent | ||
1013 | if recurse: | ||
1014 | cache[task] = iscurrent | ||
1015 | return iscurrent | ||
1016 | |||
1017 | def _execute_runqueue(self): | ||
1018 | """ | ||
1019 | Run the tasks in a queue prepared by rqdata.prepare() | ||
1020 | Upon failure, optionally try to recover the build using any alternate providers | ||
1021 | (if the abort on failure configuration option isn't set) | ||
1022 | """ | ||
1023 | |||
1024 | retval = True | ||
1025 | |||
1026 | if self.state is runQueuePrepare: | ||
1027 | self.rqexe = RunQueueExecuteDummy(self) | ||
1028 | if self.rqdata.prepare() == 0: | ||
1029 | self.state = runQueueComplete | ||
1030 | else: | ||
1031 | self.state = runQueueSceneInit | ||
1032 | |||
1033 | # we are ready to run, see if any UI client needs the dependency info | ||
1034 | if bb.cooker.CookerFeatures.SEND_DEPENDS_TREE in self.cooker.featureset: | ||
1035 | depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) | ||
1036 | bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) | ||
1037 | |||
1038 | if self.state is runQueueSceneInit: | ||
1039 | dump = self.cooker.configuration.dump_signatures | ||
1040 | if dump: | ||
1041 | if 'printdiff' in dump: | ||
1042 | invalidtasks = self.print_diffscenetasks() | ||
1043 | self.dump_signatures(dump) | ||
1044 | if 'printdiff' in dump: | ||
1045 | self.write_diffscenetasks(invalidtasks) | ||
1046 | self.state = runQueueComplete | ||
1047 | else: | ||
1048 | self.start_worker() | ||
1049 | self.rqexe = RunQueueExecuteScenequeue(self) | ||
1050 | |||
1051 | if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]: | ||
1052 | self.dm.check(self) | ||
1053 | |||
1054 | if self.state is runQueueSceneRun: | ||
1055 | retval = self.rqexe.execute() | ||
1056 | |||
1057 | if self.state is runQueueRunInit: | ||
1058 | logger.info("Executing RunQueue Tasks") | ||
1059 | self.rqexe = RunQueueExecuteTasks(self) | ||
1060 | self.state = runQueueRunning | ||
1061 | |||
1062 | if self.state is runQueueRunning: | ||
1063 | retval = self.rqexe.execute() | ||
1064 | |||
1065 | if self.state is runQueueCleanUp: | ||
1066 | self.rqexe.finish() | ||
1067 | |||
1068 | if self.state is runQueueComplete or self.state is runQueueFailed: | ||
1069 | self.teardown_workers() | ||
1070 | if self.rqexe.stats.failed: | ||
1071 | logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) | ||
1072 | else: | ||
1073 | # Let's avoid the word "failed" if nothing actually did | ||
1074 | logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) | ||
1075 | |||
1076 | if self.state is runQueueFailed: | ||
1077 | if not self.rqdata.taskData.tryaltconfigs: | ||
1078 | raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids) | ||
1079 | for fnid in self.rqexe.failed_fnids: | ||
1080 | self.rqdata.taskData.fail_fnid(fnid) | ||
1081 | self.rqdata.reset() | ||
1082 | |||
1083 | if self.state is runQueueComplete: | ||
1084 | # All done | ||
1085 | return False | ||
1086 | |||
1087 | # Loop | ||
1088 | return retval | ||
1089 | |||
1090 | def execute_runqueue(self): | ||
1091 | # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. | ||
1092 | try: | ||
1093 | return self._execute_runqueue() | ||
1094 | except bb.runqueue.TaskFailure: | ||
1095 | raise | ||
1096 | except SystemExit: | ||
1097 | raise | ||
1098 | except: | ||
1099 | logger.error("An uncaught exception occured in runqueue, please see the failure below:") | ||
1100 | try: | ||
1101 | self.teardown_workers() | ||
1102 | except: | ||
1103 | pass | ||
1104 | self.state = runQueueComplete | ||
1105 | raise | ||
1106 | |||
1107 | def finish_runqueue(self, now = False): | ||
1108 | if not self.rqexe: | ||
1109 | return | ||
1110 | |||
1111 | if now: | ||
1112 | self.rqexe.finish_now() | ||
1113 | else: | ||
1114 | self.rqexe.finish() | ||
1115 | |||
1116 | def dump_signatures(self, options): | ||
1117 | done = set() | ||
1118 | bb.note("Reparsing files to collect dependency data") | ||
1119 | for task in range(len(self.rqdata.runq_fnid)): | ||
1120 | if self.rqdata.runq_fnid[task] not in done: | ||
1121 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
1122 | the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data) | ||
1123 | done.add(self.rqdata.runq_fnid[task]) | ||
1124 | |||
1125 | bb.parse.siggen.dump_sigs(self.rqdata.dataCache, options) | ||
1126 | |||
1127 | return | ||
1128 | |||
1129 | def print_diffscenetasks(self): | ||
1130 | |||
1131 | valid = [] | ||
1132 | sq_hash = [] | ||
1133 | sq_hashfn = [] | ||
1134 | sq_fn = [] | ||
1135 | sq_taskname = [] | ||
1136 | sq_task = [] | ||
1137 | noexec = [] | ||
1138 | stamppresent = [] | ||
1139 | valid_new = set() | ||
1140 | |||
1141 | for task in xrange(len(self.rqdata.runq_fnid)): | ||
1142 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
1143 | taskname = self.rqdata.runq_task[task] | ||
1144 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1145 | |||
1146 | if 'noexec' in taskdep and taskname in taskdep['noexec']: | ||
1147 | noexec.append(task) | ||
1148 | continue | ||
1149 | |||
1150 | sq_fn.append(fn) | ||
1151 | sq_hashfn.append(self.rqdata.dataCache.hashfn[fn]) | ||
1152 | sq_hash.append(self.rqdata.runq_hash[task]) | ||
1153 | sq_taskname.append(taskname) | ||
1154 | sq_task.append(task) | ||
1155 | call = self.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)" | ||
1156 | locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data } | ||
1157 | valid = bb.utils.better_eval(call, locs) | ||
1158 | for v in valid: | ||
1159 | valid_new.add(sq_task[v]) | ||
1160 | |||
1161 | # Tasks which are both setscene and noexec never care about dependencies | ||
1162 | # We therefore find tasks which are setscene and noexec and mark their | ||
1163 | # unique dependencies as valid. | ||
1164 | for task in noexec: | ||
1165 | if task not in self.rqdata.runq_setscene: | ||
1166 | continue | ||
1167 | for dep in self.rqdata.runq_depends[task]: | ||
1168 | hasnoexecparents = True | ||
1169 | for dep2 in self.rqdata.runq_revdeps[dep]: | ||
1170 | if dep2 in self.rqdata.runq_setscene and dep2 in noexec: | ||
1171 | continue | ||
1172 | hasnoexecparents = False | ||
1173 | break | ||
1174 | if hasnoexecparents: | ||
1175 | valid_new.add(dep) | ||
1176 | |||
1177 | invalidtasks = set() | ||
1178 | for task in xrange(len(self.rqdata.runq_fnid)): | ||
1179 | if task not in valid_new and task not in noexec: | ||
1180 | invalidtasks.add(task) | ||
1181 | |||
1182 | found = set() | ||
1183 | processed = set() | ||
1184 | for task in invalidtasks: | ||
1185 | toprocess = set([task]) | ||
1186 | while toprocess: | ||
1187 | next = set() | ||
1188 | for t in toprocess: | ||
1189 | for dep in self.rqdata.runq_depends[t]: | ||
1190 | if dep in invalidtasks: | ||
1191 | found.add(task) | ||
1192 | if dep not in processed: | ||
1193 | processed.add(dep) | ||
1194 | next.add(dep) | ||
1195 | toprocess = next | ||
1196 | if task in found: | ||
1197 | toprocess = set() | ||
1198 | |||
1199 | tasklist = [] | ||
1200 | for task in invalidtasks.difference(found): | ||
1201 | tasklist.append(self.rqdata.get_user_idstring(task)) | ||
1202 | |||
1203 | if tasklist: | ||
1204 | bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) | ||
1205 | |||
1206 | return invalidtasks.difference(found) | ||
1207 | |||
1208 | def write_diffscenetasks(self, invalidtasks): | ||
1209 | |||
1210 | # Define recursion callback | ||
1211 | def recursecb(key, hash1, hash2): | ||
1212 | hashes = [hash1, hash2] | ||
1213 | hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) | ||
1214 | |||
1215 | recout = [] | ||
1216 | if len(hashfiles) == 2: | ||
1217 | out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb) | ||
1218 | recout.extend(list(' ' + l for l in out2)) | ||
1219 | else: | ||
1220 | recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) | ||
1221 | |||
1222 | return recout | ||
1223 | |||
1224 | |||
1225 | for task in invalidtasks: | ||
1226 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
1227 | pn = self.rqdata.dataCache.pkg_fn[fn] | ||
1228 | taskname = self.rqdata.runq_task[task] | ||
1229 | h = self.rqdata.runq_hash[task] | ||
1230 | matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData) | ||
1231 | match = None | ||
1232 | for m in matches: | ||
1233 | if h in m: | ||
1234 | match = m | ||
1235 | if match is None: | ||
1236 | bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) | ||
1237 | matches = {k : v for k, v in matches.iteritems() if h not in k} | ||
1238 | if matches: | ||
1239 | latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] | ||
1240 | prevh = __find_md5__.search(latestmatch).group(0) | ||
1241 | output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) | ||
1242 | bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) | ||
1243 | |||
1244 | class RunQueueExecute: | ||
1245 | |||
1246 | def __init__(self, rq): | ||
1247 | self.rq = rq | ||
1248 | self.cooker = rq.cooker | ||
1249 | self.cfgData = rq.cfgData | ||
1250 | self.rqdata = rq.rqdata | ||
1251 | |||
1252 | self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1) | ||
1253 | self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed" | ||
1254 | |||
1255 | self.runq_buildable = [] | ||
1256 | self.runq_running = [] | ||
1257 | self.runq_complete = [] | ||
1258 | |||
1259 | self.build_stamps = {} | ||
1260 | self.build_stamps2 = [] | ||
1261 | self.failed_fnids = [] | ||
1262 | |||
1263 | self.stampcache = {} | ||
1264 | |||
1265 | rq.workerpipe.setrunqueueexec(self) | ||
1266 | if rq.fakeworkerpipe: | ||
1267 | rq.fakeworkerpipe.setrunqueueexec(self) | ||
1268 | |||
1269 | def runqueue_process_waitpid(self, task, status): | ||
1270 | |||
1271 | # self.build_stamps[pid] may not exist when use shared work directory. | ||
1272 | if task in self.build_stamps: | ||
1273 | self.build_stamps2.remove(self.build_stamps[task]) | ||
1274 | del self.build_stamps[task] | ||
1275 | |||
1276 | if status != 0: | ||
1277 | self.task_fail(task, status) | ||
1278 | else: | ||
1279 | self.task_complete(task) | ||
1280 | return True | ||
1281 | |||
1282 | def finish_now(self): | ||
1283 | |||
1284 | for worker in [self.rq.worker, self.rq.fakeworker]: | ||
1285 | if not worker: | ||
1286 | continue | ||
1287 | try: | ||
1288 | worker.stdin.write("<finishnow></finishnow>") | ||
1289 | worker.stdin.flush() | ||
1290 | except IOError: | ||
1291 | # worker must have died? | ||
1292 | pass | ||
1293 | |||
1294 | if len(self.failed_fnids) != 0: | ||
1295 | self.rq.state = runQueueFailed | ||
1296 | return | ||
1297 | |||
1298 | self.rq.state = runQueueComplete | ||
1299 | return | ||
1300 | |||
1301 | def finish(self): | ||
1302 | self.rq.state = runQueueCleanUp | ||
1303 | |||
1304 | if self.stats.active > 0: | ||
1305 | bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData) | ||
1306 | self.rq.read_workers() | ||
1307 | |||
1308 | return | ||
1309 | |||
1310 | if len(self.failed_fnids) != 0: | ||
1311 | self.rq.state = runQueueFailed | ||
1312 | return | ||
1313 | |||
1314 | self.rq.state = runQueueComplete | ||
1315 | return | ||
1316 | |||
1317 | def check_dependencies(self, task, taskdeps, setscene = False): | ||
1318 | if not self.rq.depvalidate: | ||
1319 | return False | ||
1320 | |||
1321 | taskdata = {} | ||
1322 | taskdeps.add(task) | ||
1323 | for dep in taskdeps: | ||
1324 | if setscene: | ||
1325 | depid = self.rqdata.runq_setscene[dep] | ||
1326 | else: | ||
1327 | depid = dep | ||
1328 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]] | ||
1329 | pn = self.rqdata.dataCache.pkg_fn[fn] | ||
1330 | taskname = self.rqdata.runq_task[depid] | ||
1331 | taskdata[dep] = [pn, taskname, fn] | ||
1332 | call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" | ||
1333 | locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } | ||
1334 | valid = bb.utils.better_eval(call, locs) | ||
1335 | return valid | ||
1336 | |||
1337 | class RunQueueExecuteDummy(RunQueueExecute): | ||
1338 | def __init__(self, rq): | ||
1339 | self.rq = rq | ||
1340 | self.stats = RunQueueStats(0) | ||
1341 | |||
1342 | def finish(self): | ||
1343 | self.rq.state = runQueueComplete | ||
1344 | return | ||
1345 | |||
1346 | class RunQueueExecuteTasks(RunQueueExecute): | ||
1347 | def __init__(self, rq): | ||
1348 | RunQueueExecute.__init__(self, rq) | ||
1349 | |||
1350 | self.stats = RunQueueStats(len(self.rqdata.runq_fnid)) | ||
1351 | |||
1352 | self.stampcache = {} | ||
1353 | |||
1354 | initial_covered = self.rq.scenequeue_covered.copy() | ||
1355 | |||
1356 | # Mark initial buildable tasks | ||
1357 | for task in xrange(self.stats.total): | ||
1358 | self.runq_running.append(0) | ||
1359 | self.runq_complete.append(0) | ||
1360 | if len(self.rqdata.runq_depends[task]) == 0: | ||
1361 | self.runq_buildable.append(1) | ||
1362 | else: | ||
1363 | self.runq_buildable.append(0) | ||
1364 | if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered: | ||
1365 | self.rq.scenequeue_covered.add(task) | ||
1366 | |||
1367 | found = True | ||
1368 | while found: | ||
1369 | found = False | ||
1370 | for task in xrange(self.stats.total): | ||
1371 | if task in self.rq.scenequeue_covered: | ||
1372 | continue | ||
1373 | logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task]))) | ||
1374 | |||
1375 | if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered: | ||
1376 | found = True | ||
1377 | self.rq.scenequeue_covered.add(task) | ||
1378 | |||
1379 | logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered)) | ||
1380 | |||
1381 | # Allow the metadata to elect for setscene tasks to run anyway | ||
1382 | covered_remove = set() | ||
1383 | if self.rq.setsceneverify: | ||
1384 | invalidtasks = [] | ||
1385 | for task in xrange(len(self.rqdata.runq_task)): | ||
1386 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
1387 | taskname = self.rqdata.runq_task[task] | ||
1388 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1389 | |||
1390 | if 'noexec' in taskdep and taskname in taskdep['noexec']: | ||
1391 | continue | ||
1392 | if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache): | ||
1393 | logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task)) | ||
1394 | continue | ||
1395 | if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache): | ||
1396 | logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task)) | ||
1397 | continue | ||
1398 | invalidtasks.append(task) | ||
1399 | |||
1400 | call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)" | ||
1401 | call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)" | ||
1402 | locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.data, "invalidtasks" : invalidtasks } | ||
1403 | # Backwards compatibility with older versions without invalidtasks | ||
1404 | try: | ||
1405 | covered_remove = bb.utils.better_eval(call, locs) | ||
1406 | except TypeError: | ||
1407 | covered_remove = bb.utils.better_eval(call2, locs) | ||
1408 | |||
1409 | def removecoveredtask(task): | ||
1410 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
1411 | taskname = self.rqdata.runq_task[task] + '_setscene' | ||
1412 | bb.build.del_stamp(taskname, self.rqdata.dataCache, fn) | ||
1413 | self.rq.scenequeue_covered.remove(task) | ||
1414 | |||
1415 | toremove = covered_remove | ||
1416 | for task in toremove: | ||
1417 | logger.debug(1, 'Not skipping task %s due to setsceneverify', task) | ||
1418 | while toremove: | ||
1419 | covered_remove = [] | ||
1420 | for task in toremove: | ||
1421 | removecoveredtask(task) | ||
1422 | for deptask in self.rqdata.runq_depends[task]: | ||
1423 | if deptask not in self.rq.scenequeue_covered: | ||
1424 | continue | ||
1425 | if deptask in toremove or deptask in covered_remove or deptask in initial_covered: | ||
1426 | continue | ||
1427 | logger.debug(1, 'Task %s depends on task %s so not skipping' % (task, deptask)) | ||
1428 | covered_remove.append(deptask) | ||
1429 | toremove = covered_remove | ||
1430 | |||
1431 | logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered) | ||
1432 | |||
1433 | event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData) | ||
1434 | |||
1435 | schedulers = self.get_schedulers() | ||
1436 | for scheduler in schedulers: | ||
1437 | if self.scheduler == scheduler.name: | ||
1438 | self.sched = scheduler(self, self.rqdata) | ||
1439 | logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name) | ||
1440 | break | ||
1441 | else: | ||
1442 | bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % | ||
1443 | (self.scheduler, ", ".join(obj.name for obj in schedulers))) | ||
1444 | |||
1445 | def get_schedulers(self): | ||
1446 | schedulers = set(obj for obj in globals().values() | ||
1447 | if type(obj) is type and | ||
1448 | issubclass(obj, RunQueueScheduler)) | ||
1449 | |||
1450 | user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True) | ||
1451 | if user_schedulers: | ||
1452 | for sched in user_schedulers.split(): | ||
1453 | if not "." in sched: | ||
1454 | bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) | ||
1455 | continue | ||
1456 | |||
1457 | modname, name = sched.rsplit(".", 1) | ||
1458 | try: | ||
1459 | module = __import__(modname, fromlist=(name,)) | ||
1460 | except ImportError as exc: | ||
1461 | logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) | ||
1462 | raise SystemExit(1) | ||
1463 | else: | ||
1464 | schedulers.add(getattr(module, name)) | ||
1465 | return schedulers | ||
1466 | |||
1467 | def setbuildable(self, task): | ||
1468 | self.runq_buildable[task] = 1 | ||
1469 | self.sched.newbuilable(task) | ||
1470 | |||
1471 | def task_completeoutright(self, task): | ||
1472 | """ | ||
1473 | Mark a task as completed | ||
1474 | Look at the reverse dependencies and mark any task with | ||
1475 | completed dependencies as buildable | ||
1476 | """ | ||
1477 | self.runq_complete[task] = 1 | ||
1478 | for revdep in self.rqdata.runq_revdeps[task]: | ||
1479 | if self.runq_running[revdep] == 1: | ||
1480 | continue | ||
1481 | if self.runq_buildable[revdep] == 1: | ||
1482 | continue | ||
1483 | alldeps = 1 | ||
1484 | for dep in self.rqdata.runq_depends[revdep]: | ||
1485 | if self.runq_complete[dep] != 1: | ||
1486 | alldeps = 0 | ||
1487 | if alldeps == 1: | ||
1488 | self.setbuildable(revdep) | ||
1489 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]] | ||
1490 | taskname = self.rqdata.runq_task[revdep] | ||
1491 | logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname) | ||
1492 | |||
1493 | def task_complete(self, task): | ||
1494 | self.stats.taskCompleted() | ||
1495 | bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) | ||
1496 | self.task_completeoutright(task) | ||
1497 | |||
1498 | def task_fail(self, task, exitcode): | ||
1499 | """ | ||
1500 | Called when a task has failed | ||
1501 | Updates the state engine with the failure | ||
1502 | """ | ||
1503 | self.stats.taskFailed() | ||
1504 | fnid = self.rqdata.runq_fnid[task] | ||
1505 | self.failed_fnids.append(fnid) | ||
1506 | bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData) | ||
1507 | if self.rqdata.taskData.abort: | ||
1508 | self.rq.state = runQueueCleanUp | ||
1509 | |||
1510 | def task_skip(self, task, reason): | ||
1511 | self.runq_running[task] = 1 | ||
1512 | self.setbuildable(task) | ||
1513 | bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) | ||
1514 | self.task_completeoutright(task) | ||
1515 | self.stats.taskCompleted() | ||
1516 | self.stats.taskSkipped() | ||
1517 | |||
1518 | def execute(self): | ||
1519 | """ | ||
1520 | Run the tasks in a queue prepared by rqdata.prepare() | ||
1521 | """ | ||
1522 | |||
1523 | self.rq.read_workers() | ||
1524 | |||
1525 | |||
1526 | if self.stats.total == 0: | ||
1527 | # nothing to do | ||
1528 | self.rq.state = runQueueCleanUp | ||
1529 | |||
1530 | task = self.sched.next() | ||
1531 | if task is not None: | ||
1532 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]] | ||
1533 | taskname = self.rqdata.runq_task[task] | ||
1534 | |||
1535 | if task in self.rq.scenequeue_covered: | ||
1536 | logger.debug(2, "Setscene covered task %s (%s)", task, | ||
1537 | self.rqdata.get_user_idstring(task)) | ||
1538 | self.task_skip(task, "covered") | ||
1539 | return True | ||
1540 | |||
1541 | if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): | ||
1542 | logger.debug(2, "Stamp current task %s (%s)", task, | ||
1543 | self.rqdata.get_user_idstring(task)) | ||
1544 | self.task_skip(task, "existing") | ||
1545 | return True | ||
1546 | |||
1547 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1548 | if 'noexec' in taskdep and taskname in taskdep['noexec']: | ||
1549 | startevent = runQueueTaskStarted(task, self.stats, self.rq, | ||
1550 | noexec=True) | ||
1551 | bb.event.fire(startevent, self.cfgData) | ||
1552 | self.runq_running[task] = 1 | ||
1553 | self.stats.taskActive() | ||
1554 | bb.build.make_stamp(taskname, self.rqdata.dataCache, fn) | ||
1555 | self.task_complete(task) | ||
1556 | return True | ||
1557 | else: | ||
1558 | startevent = runQueueTaskStarted(task, self.stats, self.rq) | ||
1559 | bb.event.fire(startevent, self.cfgData) | ||
1560 | |||
1561 | taskdepdata = self.build_taskdepdata(task) | ||
1562 | |||
1563 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1564 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: | ||
1565 | if not self.rq.fakeworker: | ||
1566 | self.rq.start_fakeworker(self) | ||
1567 | self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>") | ||
1568 | self.rq.fakeworker.stdin.flush() | ||
1569 | else: | ||
1570 | self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn), taskdepdata)) + "</runtask>") | ||
1571 | self.rq.worker.stdin.flush() | ||
1572 | |||
1573 | self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn) | ||
1574 | self.build_stamps2.append(self.build_stamps[task]) | ||
1575 | self.runq_running[task] = 1 | ||
1576 | self.stats.taskActive() | ||
1577 | if self.stats.active < self.number_tasks: | ||
1578 | return True | ||
1579 | |||
1580 | if self.stats.active > 0: | ||
1581 | self.rq.read_workers() | ||
1582 | return self.rq.active_fds() | ||
1583 | |||
1584 | if len(self.failed_fnids) != 0: | ||
1585 | self.rq.state = runQueueFailed | ||
1586 | return True | ||
1587 | |||
1588 | # Sanity Checks | ||
1589 | for task in xrange(self.stats.total): | ||
1590 | if self.runq_buildable[task] == 0: | ||
1591 | logger.error("Task %s never buildable!", task) | ||
1592 | if self.runq_running[task] == 0: | ||
1593 | logger.error("Task %s never ran!", task) | ||
1594 | if self.runq_complete[task] == 0: | ||
1595 | logger.error("Task %s never completed!", task) | ||
1596 | self.rq.state = runQueueComplete | ||
1597 | |||
1598 | return True | ||
1599 | |||
1600 | def build_taskdepdata(self, task): | ||
1601 | taskdepdata = {} | ||
1602 | next = self.rqdata.runq_depends[task] | ||
1603 | next.add(task) | ||
1604 | while next: | ||
1605 | additional = [] | ||
1606 | for revdep in next: | ||
1607 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]] | ||
1608 | pn = self.rqdata.dataCache.pkg_fn[fn] | ||
1609 | taskname = self.rqdata.runq_task[revdep] | ||
1610 | deps = self.rqdata.runq_depends[revdep] | ||
1611 | taskdepdata[revdep] = [pn, taskname, fn, deps] | ||
1612 | for revdep2 in deps: | ||
1613 | if revdep2 not in taskdepdata: | ||
1614 | additional.append(revdep2) | ||
1615 | next = additional | ||
1616 | |||
1617 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) | ||
1618 | return taskdepdata | ||
1619 | |||
1620 | class RunQueueExecuteScenequeue(RunQueueExecute): | ||
1621 | def __init__(self, rq): | ||
1622 | RunQueueExecute.__init__(self, rq) | ||
1623 | |||
1624 | self.scenequeue_covered = set() | ||
1625 | self.scenequeue_notcovered = set() | ||
1626 | self.scenequeue_notneeded = set() | ||
1627 | |||
1628 | # If we don't have any setscene functions, skip this step | ||
1629 | if len(self.rqdata.runq_setscene) == 0: | ||
1630 | rq.scenequeue_covered = set() | ||
1631 | rq.state = runQueueRunInit | ||
1632 | return | ||
1633 | |||
1634 | self.stats = RunQueueStats(len(self.rqdata.runq_setscene)) | ||
1635 | |||
1636 | sq_revdeps = [] | ||
1637 | sq_revdeps_new = [] | ||
1638 | sq_revdeps_squash = [] | ||
1639 | self.sq_harddeps = {} | ||
1640 | |||
1641 | # We need to construct a dependency graph for the setscene functions. Intermediate | ||
1642 | # dependencies between the setscene tasks only complicate the code. This code | ||
1643 | # therefore aims to collapse the huge runqueue dependency tree into a smaller one | ||
1644 | # only containing the setscene functions. | ||
1645 | |||
1646 | for task in xrange(self.stats.total): | ||
1647 | self.runq_running.append(0) | ||
1648 | self.runq_complete.append(0) | ||
1649 | self.runq_buildable.append(0) | ||
1650 | |||
1651 | # First process the chains up to the first setscene task. | ||
1652 | endpoints = {} | ||
1653 | for task in xrange(len(self.rqdata.runq_fnid)): | ||
1654 | sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task])) | ||
1655 | sq_revdeps_new.append(set()) | ||
1656 | if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene: | ||
1657 | endpoints[task] = set() | ||
1658 | |||
1659 | # Secondly process the chains between setscene tasks. | ||
1660 | for task in self.rqdata.runq_setscene: | ||
1661 | for dep in self.rqdata.runq_depends[task]: | ||
1662 | if dep not in endpoints: | ||
1663 | endpoints[dep] = set() | ||
1664 | endpoints[dep].add(task) | ||
1665 | |||
1666 | def process_endpoints(endpoints): | ||
1667 | newendpoints = {} | ||
1668 | for point, task in endpoints.items(): | ||
1669 | tasks = set() | ||
1670 | if task: | ||
1671 | tasks |= task | ||
1672 | if sq_revdeps_new[point]: | ||
1673 | tasks |= sq_revdeps_new[point] | ||
1674 | sq_revdeps_new[point] = set() | ||
1675 | if point in self.rqdata.runq_setscene: | ||
1676 | sq_revdeps_new[point] = tasks | ||
1677 | for dep in self.rqdata.runq_depends[point]: | ||
1678 | if point in sq_revdeps[dep]: | ||
1679 | sq_revdeps[dep].remove(point) | ||
1680 | if tasks: | ||
1681 | sq_revdeps_new[dep] |= tasks | ||
1682 | if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene: | ||
1683 | newendpoints[dep] = task | ||
1684 | if len(newendpoints) != 0: | ||
1685 | process_endpoints(newendpoints) | ||
1686 | |||
1687 | process_endpoints(endpoints) | ||
1688 | |||
1689 | # Build a list of setscene tasks which as "unskippable" | ||
1690 | # These are direct endpoints referenced by the build | ||
1691 | endpoints2 = {} | ||
1692 | sq_revdeps2 = [] | ||
1693 | sq_revdeps_new2 = [] | ||
1694 | def process_endpoints2(endpoints): | ||
1695 | newendpoints = {} | ||
1696 | for point, task in endpoints.items(): | ||
1697 | tasks = set([point]) | ||
1698 | if task: | ||
1699 | tasks |= task | ||
1700 | if sq_revdeps_new2[point]: | ||
1701 | tasks |= sq_revdeps_new2[point] | ||
1702 | sq_revdeps_new2[point] = set() | ||
1703 | if point in self.rqdata.runq_setscene: | ||
1704 | sq_revdeps_new2[point] = tasks | ||
1705 | for dep in self.rqdata.runq_depends[point]: | ||
1706 | if point in sq_revdeps2[dep]: | ||
1707 | sq_revdeps2[dep].remove(point) | ||
1708 | if tasks: | ||
1709 | sq_revdeps_new2[dep] |= tasks | ||
1710 | if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene: | ||
1711 | newendpoints[dep] = tasks | ||
1712 | if len(newendpoints) != 0: | ||
1713 | process_endpoints2(newendpoints) | ||
1714 | for task in xrange(len(self.rqdata.runq_fnid)): | ||
1715 | sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task])) | ||
1716 | sq_revdeps_new2.append(set()) | ||
1717 | if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene: | ||
1718 | endpoints2[task] = set() | ||
1719 | process_endpoints2(endpoints2) | ||
1720 | self.unskippable = [] | ||
1721 | for task in self.rqdata.runq_setscene: | ||
1722 | if sq_revdeps_new2[task]: | ||
1723 | self.unskippable.append(self.rqdata.runq_setscene.index(task)) | ||
1724 | |||
1725 | for task in xrange(len(self.rqdata.runq_fnid)): | ||
1726 | if task in self.rqdata.runq_setscene: | ||
1727 | deps = set() | ||
1728 | for dep in sq_revdeps_new[task]: | ||
1729 | deps.add(self.rqdata.runq_setscene.index(dep)) | ||
1730 | sq_revdeps_squash.append(deps) | ||
1731 | elif len(sq_revdeps_new[task]) != 0: | ||
1732 | bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.") | ||
1733 | |||
1734 | # Resolve setscene inter-task dependencies | ||
1735 | # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" | ||
1736 | # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies | ||
1737 | for task in self.rqdata.runq_setscene: | ||
1738 | realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False) | ||
1739 | idepends = self.rqdata.taskData.tasks_idepends[realid] | ||
1740 | for (depid, idependtask) in idepends: | ||
1741 | if depid not in self.rqdata.taskData.build_targets: | ||
1742 | continue | ||
1743 | |||
1744 | depdata = self.rqdata.taskData.build_targets[depid][0] | ||
1745 | if depdata is None: | ||
1746 | continue | ||
1747 | dep = self.rqdata.taskData.fn_index[depdata] | ||
1748 | taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", "")) | ||
1749 | if taskid is None: | ||
1750 | bb.msg.fatal("RunQueue", "Task %s_setscene depends upon non-existent task %s:%s" % (self.rqdata.get_user_idstring(task), dep, idependtask)) | ||
1751 | |||
1752 | if not self.rqdata.runq_setscene.index(taskid) in self.sq_harddeps: | ||
1753 | self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)] = set() | ||
1754 | self.sq_harddeps[self.rqdata.runq_setscene.index(taskid)].add(self.rqdata.runq_setscene.index(task)) | ||
1755 | |||
1756 | sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid)) | ||
1757 | # Have to zero this to avoid circular dependencies | ||
1758 | sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set() | ||
1759 | |||
1760 | for task in self.sq_harddeps: | ||
1761 | for dep in self.sq_harddeps[task]: | ||
1762 | sq_revdeps_squash[dep].add(task) | ||
1763 | |||
1764 | #for task in xrange(len(sq_revdeps_squash)): | ||
1765 | # realtask = self.rqdata.runq_setscene[task] | ||
1766 | # bb.warn("Task %s: %s_setscene is %s " % (task, self.rqdata.get_user_idstring(realtask) , sq_revdeps_squash[task])) | ||
1767 | |||
1768 | self.sq_deps = [] | ||
1769 | self.sq_revdeps = sq_revdeps_squash | ||
1770 | self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps) | ||
1771 | |||
1772 | for task in xrange(len(self.sq_revdeps)): | ||
1773 | self.sq_deps.append(set()) | ||
1774 | for task in xrange(len(self.sq_revdeps)): | ||
1775 | for dep in self.sq_revdeps[task]: | ||
1776 | self.sq_deps[dep].add(task) | ||
1777 | |||
1778 | for task in xrange(len(self.sq_revdeps)): | ||
1779 | if len(self.sq_revdeps[task]) == 0: | ||
1780 | self.runq_buildable[task] = 1 | ||
1781 | |||
1782 | self.outrightfail = [] | ||
1783 | if self.rq.hashvalidate: | ||
1784 | sq_hash = [] | ||
1785 | sq_hashfn = [] | ||
1786 | sq_fn = [] | ||
1787 | sq_taskname = [] | ||
1788 | sq_task = [] | ||
1789 | noexec = [] | ||
1790 | stamppresent = [] | ||
1791 | for task in xrange(len(self.sq_revdeps)): | ||
1792 | realtask = self.rqdata.runq_setscene[task] | ||
1793 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]] | ||
1794 | taskname = self.rqdata.runq_task[realtask] | ||
1795 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1796 | |||
1797 | if 'noexec' in taskdep and taskname in taskdep['noexec']: | ||
1798 | noexec.append(task) | ||
1799 | self.task_skip(task) | ||
1800 | bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn) | ||
1801 | continue | ||
1802 | |||
1803 | if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache): | ||
1804 | logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask)) | ||
1805 | stamppresent.append(task) | ||
1806 | self.task_skip(task) | ||
1807 | continue | ||
1808 | |||
1809 | if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache): | ||
1810 | logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask)) | ||
1811 | stamppresent.append(task) | ||
1812 | self.task_skip(task) | ||
1813 | continue | ||
1814 | |||
1815 | sq_fn.append(fn) | ||
1816 | sq_hashfn.append(self.rqdata.dataCache.hashfn[fn]) | ||
1817 | sq_hash.append(self.rqdata.runq_hash[realtask]) | ||
1818 | sq_taskname.append(taskname) | ||
1819 | sq_task.append(task) | ||
1820 | call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)" | ||
1821 | locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data } | ||
1822 | valid = bb.utils.better_eval(call, locs) | ||
1823 | |||
1824 | valid_new = stamppresent | ||
1825 | for v in valid: | ||
1826 | valid_new.append(sq_task[v]) | ||
1827 | |||
1828 | for task in xrange(len(self.sq_revdeps)): | ||
1829 | if task not in valid_new and task not in noexec: | ||
1830 | realtask = self.rqdata.runq_setscene[task] | ||
1831 | logger.debug(2, 'No package found, so skipping setscene task %s', | ||
1832 | self.rqdata.get_user_idstring(realtask)) | ||
1833 | self.outrightfail.append(task) | ||
1834 | |||
1835 | logger.info('Executing SetScene Tasks') | ||
1836 | |||
1837 | self.rq.state = runQueueSceneRun | ||
1838 | |||
1839 | def scenequeue_updatecounters(self, task, fail = False): | ||
1840 | for dep in self.sq_deps[task]: | ||
1841 | if fail and task in self.sq_harddeps and dep in self.sq_harddeps[task]: | ||
1842 | realtask = self.rqdata.runq_setscene[task] | ||
1843 | realdep = self.rqdata.runq_setscene[dep] | ||
1844 | logger.debug(2, "%s was unavailable and is a hard dependency of %s so skipping" % (self.rqdata.get_user_idstring(realtask), self.rqdata.get_user_idstring(realdep))) | ||
1845 | continue | ||
1846 | self.sq_revdeps2[dep].remove(task) | ||
1847 | if len(self.sq_revdeps2[dep]) == 0: | ||
1848 | self.runq_buildable[dep] = 1 | ||
1849 | |||
1850 | def task_completeoutright(self, task): | ||
1851 | """ | ||
1852 | Mark a task as completed | ||
1853 | Look at the reverse dependencies and mark any task with | ||
1854 | completed dependencies as buildable | ||
1855 | """ | ||
1856 | |||
1857 | index = self.rqdata.runq_setscene[task] | ||
1858 | logger.debug(1, 'Found task %s which could be accelerated', | ||
1859 | self.rqdata.get_user_idstring(index)) | ||
1860 | |||
1861 | self.scenequeue_covered.add(task) | ||
1862 | self.scenequeue_updatecounters(task) | ||
1863 | |||
1864 | def task_complete(self, task): | ||
1865 | self.stats.taskCompleted() | ||
1866 | bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) | ||
1867 | self.task_completeoutright(task) | ||
1868 | |||
1869 | def task_fail(self, task, result): | ||
1870 | self.stats.taskFailed() | ||
1871 | bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) | ||
1872 | self.scenequeue_notcovered.add(task) | ||
1873 | self.scenequeue_updatecounters(task, True) | ||
1874 | |||
1875 | def task_failoutright(self, task): | ||
1876 | self.runq_running[task] = 1 | ||
1877 | self.runq_buildable[task] = 1 | ||
1878 | self.stats.taskCompleted() | ||
1879 | self.stats.taskSkipped() | ||
1880 | index = self.rqdata.runq_setscene[task] | ||
1881 | self.scenequeue_notcovered.add(task) | ||
1882 | self.scenequeue_updatecounters(task, True) | ||
1883 | |||
1884 | def task_skip(self, task): | ||
1885 | self.runq_running[task] = 1 | ||
1886 | self.runq_buildable[task] = 1 | ||
1887 | self.task_completeoutright(task) | ||
1888 | self.stats.taskCompleted() | ||
1889 | self.stats.taskSkipped() | ||
1890 | |||
1891 | def execute(self): | ||
1892 | """ | ||
1893 | Run the tasks in a queue prepared by prepare_runqueue | ||
1894 | """ | ||
1895 | |||
1896 | self.rq.read_workers() | ||
1897 | |||
1898 | task = None | ||
1899 | if self.stats.active < self.number_tasks: | ||
1900 | # Find the next setscene to run | ||
1901 | for nexttask in xrange(self.stats.total): | ||
1902 | if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1: | ||
1903 | if nexttask in self.unskippable: | ||
1904 | logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask])) | ||
1905 | if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True): | ||
1906 | realtask = self.rqdata.runq_setscene[nexttask] | ||
1907 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]] | ||
1908 | foundtarget = False | ||
1909 | for target in self.rqdata.target_pairs: | ||
1910 | if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]: | ||
1911 | foundtarget = True | ||
1912 | break | ||
1913 | if not foundtarget: | ||
1914 | logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask])) | ||
1915 | self.task_skip(nexttask) | ||
1916 | self.scenequeue_notneeded.add(nexttask) | ||
1917 | return True | ||
1918 | if nexttask in self.outrightfail: | ||
1919 | self.task_failoutright(nexttask) | ||
1920 | return True | ||
1921 | task = nexttask | ||
1922 | break | ||
1923 | if task is not None: | ||
1924 | realtask = self.rqdata.runq_setscene[task] | ||
1925 | fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]] | ||
1926 | |||
1927 | taskname = self.rqdata.runq_task[realtask] + "_setscene" | ||
1928 | if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache): | ||
1929 | logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant', | ||
1930 | task, self.rqdata.get_user_idstring(realtask)) | ||
1931 | self.task_failoutright(task) | ||
1932 | return True | ||
1933 | |||
1934 | if self.cooker.configuration.force: | ||
1935 | for target in self.rqdata.target_pairs: | ||
1936 | if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]: | ||
1937 | self.task_failoutright(task) | ||
1938 | return True | ||
1939 | |||
1940 | if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache): | ||
1941 | logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies', | ||
1942 | task, self.rqdata.get_user_idstring(realtask)) | ||
1943 | self.task_skip(task) | ||
1944 | return True | ||
1945 | |||
1946 | startevent = sceneQueueTaskStarted(task, self.stats, self.rq) | ||
1947 | bb.event.fire(startevent, self.cfgData) | ||
1948 | |||
1949 | taskdep = self.rqdata.dataCache.task_deps[fn] | ||
1950 | if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']: | ||
1951 | if not self.rq.fakeworker: | ||
1952 | self.rq.start_fakeworker(self) | ||
1953 | self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>") | ||
1954 | self.rq.fakeworker.stdin.flush() | ||
1955 | else: | ||
1956 | self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn), None)) + "</runtask>") | ||
1957 | self.rq.worker.stdin.flush() | ||
1958 | |||
1959 | self.runq_running[task] = 1 | ||
1960 | self.stats.taskActive() | ||
1961 | if self.stats.active < self.number_tasks: | ||
1962 | return True | ||
1963 | |||
1964 | if self.stats.active > 0: | ||
1965 | self.rq.read_workers() | ||
1966 | return self.rq.active_fds() | ||
1967 | |||
1968 | #for task in xrange(self.stats.total): | ||
1969 | # if self.runq_running[task] != 1: | ||
1970 | # buildable = self.runq_buildable[task] | ||
1971 | # revdeps = self.sq_revdeps[task] | ||
1972 | # bb.warn("Found we didn't run %s %s %s %s" % (task, buildable, str(revdeps), self.rqdata.get_user_idstring(self.rqdata.runq_setscene[task]))) | ||
1973 | |||
1974 | # Convert scenequeue_covered task numbers into full taskgraph ids | ||
1975 | oldcovered = self.scenequeue_covered | ||
1976 | self.rq.scenequeue_covered = set() | ||
1977 | for task in oldcovered: | ||
1978 | self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task]) | ||
1979 | self.rq.scenequeue_notcovered = set() | ||
1980 | for task in self.scenequeue_notcovered: | ||
1981 | self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task]) | ||
1982 | |||
1983 | logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered)) | ||
1984 | |||
1985 | self.rq.state = runQueueRunInit | ||
1986 | return True | ||
1987 | |||
1988 | def runqueue_process_waitpid(self, task, status): | ||
1989 | task = self.rq.rqdata.runq_setscene.index(task) | ||
1990 | |||
1991 | RunQueueExecute.runqueue_process_waitpid(self, task, status) | ||
1992 | |||
1993 | class TaskFailure(Exception): | ||
1994 | """ | ||
1995 | Exception raised when a task in a runqueue fails | ||
1996 | """ | ||
1997 | def __init__(self, x): | ||
1998 | self.args = x | ||
1999 | |||
2000 | |||
2001 | class runQueueExitWait(bb.event.Event): | ||
2002 | """ | ||
2003 | Event when waiting for task processes to exit | ||
2004 | """ | ||
2005 | |||
2006 | def __init__(self, remain): | ||
2007 | self.remain = remain | ||
2008 | self.message = "Waiting for %s active tasks to finish" % remain | ||
2009 | bb.event.Event.__init__(self) | ||
2010 | |||
2011 | class runQueueEvent(bb.event.Event): | ||
2012 | """ | ||
2013 | Base runQueue event class | ||
2014 | """ | ||
2015 | def __init__(self, task, stats, rq): | ||
2016 | self.taskid = task | ||
2017 | self.taskstring = rq.rqdata.get_user_idstring(task) | ||
2018 | self.taskname = rq.rqdata.get_task_name(task) | ||
2019 | self.taskfile = rq.rqdata.get_task_file(task) | ||
2020 | self.taskhash = rq.rqdata.get_task_hash(task) | ||
2021 | self.stats = stats.copy() | ||
2022 | bb.event.Event.__init__(self) | ||
2023 | |||
2024 | class sceneQueueEvent(runQueueEvent): | ||
2025 | """ | ||
2026 | Base sceneQueue event class | ||
2027 | """ | ||
2028 | def __init__(self, task, stats, rq, noexec=False): | ||
2029 | runQueueEvent.__init__(self, task, stats, rq) | ||
2030 | realtask = rq.rqdata.runq_setscene[task] | ||
2031 | self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene") | ||
2032 | self.taskname = rq.rqdata.get_task_name(realtask) + "_setscene" | ||
2033 | self.taskfile = rq.rqdata.get_task_file(realtask) | ||
2034 | self.taskhash = rq.rqdata.get_task_hash(realtask) | ||
2035 | |||
2036 | class runQueueTaskStarted(runQueueEvent): | ||
2037 | """ | ||
2038 | Event notifing a task was started | ||
2039 | """ | ||
2040 | def __init__(self, task, stats, rq, noexec=False): | ||
2041 | runQueueEvent.__init__(self, task, stats, rq) | ||
2042 | self.noexec = noexec | ||
2043 | |||
2044 | class sceneQueueTaskStarted(sceneQueueEvent): | ||
2045 | """ | ||
2046 | Event notifing a setscene task was started | ||
2047 | """ | ||
2048 | def __init__(self, task, stats, rq, noexec=False): | ||
2049 | sceneQueueEvent.__init__(self, task, stats, rq) | ||
2050 | self.noexec = noexec | ||
2051 | |||
2052 | class runQueueTaskFailed(runQueueEvent): | ||
2053 | """ | ||
2054 | Event notifing a task failed | ||
2055 | """ | ||
2056 | def __init__(self, task, stats, exitcode, rq): | ||
2057 | runQueueEvent.__init__(self, task, stats, rq) | ||
2058 | self.exitcode = exitcode | ||
2059 | |||
2060 | class sceneQueueTaskFailed(sceneQueueEvent): | ||
2061 | """ | ||
2062 | Event notifing a setscene task failed | ||
2063 | """ | ||
2064 | def __init__(self, task, stats, exitcode, rq): | ||
2065 | sceneQueueEvent.__init__(self, task, stats, rq) | ||
2066 | self.exitcode = exitcode | ||
2067 | |||
2068 | class runQueueTaskCompleted(runQueueEvent): | ||
2069 | """ | ||
2070 | Event notifing a task completed | ||
2071 | """ | ||
2072 | |||
2073 | class sceneQueueTaskCompleted(sceneQueueEvent): | ||
2074 | """ | ||
2075 | Event notifing a setscene task completed | ||
2076 | """ | ||
2077 | |||
2078 | class runQueueTaskSkipped(runQueueEvent): | ||
2079 | """ | ||
2080 | Event notifing a task was skipped | ||
2081 | """ | ||
2082 | def __init__(self, task, stats, rq, reason): | ||
2083 | runQueueEvent.__init__(self, task, stats, rq) | ||
2084 | self.reason = reason | ||
2085 | |||
2086 | class runQueuePipe(): | ||
2087 | """ | ||
2088 | Abstraction for a pipe between a worker thread and the server | ||
2089 | """ | ||
2090 | def __init__(self, pipein, pipeout, d, rq, rqexec): | ||
2091 | self.input = pipein | ||
2092 | if pipeout: | ||
2093 | pipeout.close() | ||
2094 | bb.utils.nonblockingfd(self.input) | ||
2095 | self.queue = "" | ||
2096 | self.d = d | ||
2097 | self.rq = rq | ||
2098 | self.rqexec = rqexec | ||
2099 | |||
2100 | def setrunqueueexec(self, rqexec): | ||
2101 | self.rqexec = rqexec | ||
2102 | |||
2103 | def read(self): | ||
2104 | for w in [self.rq.worker, self.rq.fakeworker]: | ||
2105 | if not w: | ||
2106 | continue | ||
2107 | w.poll() | ||
2108 | if w.returncode is not None and not self.rq.teardown: | ||
2109 | name = None | ||
2110 | if self.rq.worker and w.pid == self.rq.worker.pid: | ||
2111 | name = "Worker" | ||
2112 | elif self.rq.fakeworker and w.pid == self.rq.fakeworker.pid: | ||
2113 | name = "Fakeroot" | ||
2114 | bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, w.pid, str(w.returncode))) | ||
2115 | self.rq.finish_runqueue(True) | ||
2116 | |||
2117 | start = len(self.queue) | ||
2118 | try: | ||
2119 | self.queue = self.queue + self.input.read(102400) | ||
2120 | except (OSError, IOError) as e: | ||
2121 | if e.errno != errno.EAGAIN: | ||
2122 | raise | ||
2123 | end = len(self.queue) | ||
2124 | found = True | ||
2125 | while found and len(self.queue): | ||
2126 | found = False | ||
2127 | index = self.queue.find("</event>") | ||
2128 | while index != -1 and self.queue.startswith("<event>"): | ||
2129 | try: | ||
2130 | event = pickle.loads(self.queue[7:index]) | ||
2131 | except ValueError as e: | ||
2132 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) | ||
2133 | bb.event.fire_from_worker(event, self.d) | ||
2134 | found = True | ||
2135 | self.queue = self.queue[index+8:] | ||
2136 | index = self.queue.find("</event>") | ||
2137 | index = self.queue.find("</exitcode>") | ||
2138 | while index != -1 and self.queue.startswith("<exitcode>"): | ||
2139 | try: | ||
2140 | task, status = pickle.loads(self.queue[10:index]) | ||
2141 | except ValueError as e: | ||
2142 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) | ||
2143 | self.rqexec.runqueue_process_waitpid(task, status) | ||
2144 | found = True | ||
2145 | self.queue = self.queue[index+11:] | ||
2146 | index = self.queue.find("</exitcode>") | ||
2147 | return (end > start) | ||
2148 | |||
2149 | def close(self): | ||
2150 | while self.read(): | ||
2151 | continue | ||
2152 | if len(self.queue) > 0: | ||
2153 | print("Warning, worker left partial message: %s" % self.queue) | ||
2154 | self.input.close() | ||