diff options
-rwxr-xr-x | bitbake/bin/bitbake-worker | 6 | ||||
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 165 |
2 files changed, 161 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index f63f060c57..3e502d5ca9 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
234 | the_data.setVar(varname, value) | 234 | the_data.setVar(varname, value) |
235 | 235 | ||
236 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) | 236 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) |
237 | if "newhashes" in workerdata: | ||
238 | bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) | ||
237 | ret = 0 | 239 | ret = 0 |
238 | 240 | ||
239 | the_data = bb_cache.loadDataFull(fn, appends) | 241 | the_data = bb_cache.loadDataFull(fn, appends) |
@@ -377,6 +379,7 @@ class BitbakeWorker(object): | |||
377 | self.handle_item(b"cookerconfig", self.handle_cookercfg) | 379 | self.handle_item(b"cookerconfig", self.handle_cookercfg) |
378 | self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) | 380 | self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) |
379 | self.handle_item(b"workerdata", self.handle_workerdata) | 381 | self.handle_item(b"workerdata", self.handle_workerdata) |
382 | self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) | ||
380 | self.handle_item(b"runtask", self.handle_runtask) | 383 | self.handle_item(b"runtask", self.handle_runtask) |
381 | self.handle_item(b"finishnow", self.handle_finishnow) | 384 | self.handle_item(b"finishnow", self.handle_finishnow) |
382 | self.handle_item(b"ping", self.handle_ping) | 385 | self.handle_item(b"ping", self.handle_ping) |
@@ -416,6 +419,9 @@ class BitbakeWorker(object): | |||
416 | for mc in self.databuilder.mcdata: | 419 | for mc in self.databuilder.mcdata: |
417 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) | 420 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) |
418 | 421 | ||
422 | def handle_newtaskhashes(self, data): | ||
423 | self.workerdata["newhashes"] = pickle.loads(data) | ||
424 | |||
419 | def handle_ping(self, _): | 425 | def handle_ping(self, _): |
420 | workerlog_write("Handling ping\n") | 426 | workerlog_write("Handling ping\n") |
421 | 427 | ||
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 519561c231..11b98f698d 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -149,7 +149,7 @@ class RunQueueScheduler(object): | |||
149 | Return the id of the first task we find that is buildable | 149 | Return the id of the first task we find that is buildable |
150 | """ | 150 | """ |
151 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] | 151 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] |
152 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] | 152 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks] |
153 | if not buildable: | 153 | if not buildable: |
154 | return None | 154 | return None |
155 | 155 | ||
@@ -206,6 +206,9 @@ class RunQueueScheduler(object): | |||
206 | def newbuildable(self, task): | 206 | def newbuildable(self, task): |
207 | self.buildable.append(task) | 207 | self.buildable.append(task) |
208 | 208 | ||
209 | def removebuildable(self, task): | ||
210 | self.buildable.remove(task) | ||
211 | |||
209 | def describe_task(self, taskid): | 212 | def describe_task(self, taskid): |
210 | result = 'ID %s' % taskid | 213 | result = 'ID %s' % taskid |
211 | if self.rev_prio_map: | 214 | if self.rev_prio_map: |
@@ -1719,6 +1722,8 @@ class RunQueueExecute: | |||
1719 | self.sq_running = set() | 1722 | self.sq_running = set() |
1720 | self.sq_live = set() | 1723 | self.sq_live = set() |
1721 | 1724 | ||
1725 | self.changed_setscene = set() | ||
1726 | |||
1722 | self.runq_buildable = set() | 1727 | self.runq_buildable = set() |
1723 | self.runq_running = set() | 1728 | self.runq_running = set() |
1724 | self.runq_complete = set() | 1729 | self.runq_complete = set() |
@@ -1730,6 +1735,7 @@ class RunQueueExecute: | |||
1730 | 1735 | ||
1731 | self.stampcache = {} | 1736 | self.stampcache = {} |
1732 | 1737 | ||
1738 | self.holdoff_tasks = set() | ||
1733 | self.sqdone = False | 1739 | self.sqdone = False |
1734 | 1740 | ||
1735 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) | 1741 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) |
@@ -1925,6 +1931,7 @@ class RunQueueExecute: | |||
1925 | """ | 1931 | """ |
1926 | 1932 | ||
1927 | self.rq.read_workers() | 1933 | self.rq.read_workers() |
1934 | self.process_possible_migrations() | ||
1928 | 1935 | ||
1929 | task = None | 1936 | task = None |
1930 | if not self.sqdone and self.can_start_task(): | 1937 | if not self.sqdone and self.can_start_task(): |
@@ -2007,7 +2014,7 @@ class RunQueueExecute: | |||
2007 | if self.can_start_task(): | 2014 | if self.can_start_task(): |
2008 | return True | 2015 | return True |
2009 | 2016 | ||
2010 | if not self.sq_live and not self.sqdone and not self.sq_deferred: | 2017 | if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: |
2011 | logger.info("Setscene tasks completed") | 2018 | logger.info("Setscene tasks completed") |
2012 | logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) | 2019 | logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) |
2013 | 2020 | ||
@@ -2167,6 +2174,131 @@ class RunQueueExecute: | |||
2167 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) | 2174 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) |
2168 | return taskdepdata | 2175 | return taskdepdata |
2169 | 2176 | ||
2177 | def updated_taskhash(self, tid, unihash): | ||
2178 | changed = set() | ||
2179 | if unihash != self.rqdata.runtaskentries[tid].unihash: | ||
2180 | logger.info("Task %s unihash changed to %s" % (tid, unihash)) | ||
2181 | self.rqdata.runtaskentries[tid].unihash = unihash | ||
2182 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
2183 | bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash) | ||
2184 | |||
2185 | # Work out all tasks which depend on this one | ||
2186 | total = set() | ||
2187 | next = set(self.rqdata.runtaskentries[tid].revdeps) | ||
2188 | while next: | ||
2189 | current = next.copy() | ||
2190 | total = total |next | ||
2191 | next = set() | ||
2192 | for ntid in current: | ||
2193 | next |= self.rqdata.runtaskentries[ntid].revdeps | ||
2194 | next.difference_update(total) | ||
2195 | |||
2196 | # Now iterate those tasks in dependency order to regenerate their taskhash/unihash | ||
2197 | done = set() | ||
2198 | next = set(self.rqdata.runtaskentries[tid].revdeps) | ||
2199 | while next: | ||
2200 | current = next.copy() | ||
2201 | next = set() | ||
2202 | for tid in current: | ||
2203 | if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): | ||
2204 | continue | ||
2205 | procdep = [] | ||
2206 | for dep in self.rqdata.runtaskentries[tid].depends: | ||
2207 | procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep)) | ||
2208 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
2209 | orighash = self.rqdata.runtaskentries[tid].hash | ||
2210 | self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc]) | ||
2211 | origuni = self.rqdata.runtaskentries[tid].unihash | ||
2212 | self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname) | ||
2213 | logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) | ||
2214 | next |= self.rqdata.runtaskentries[tid].revdeps | ||
2215 | changed.add(tid) | ||
2216 | total.remove(tid) | ||
2217 | next.intersection_update(total) | ||
2218 | |||
2219 | if changed: | ||
2220 | for mc in self.rq.worker: | ||
2221 | self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | ||
2222 | for mc in self.rq.fakeworker: | ||
2223 | self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | ||
2224 | |||
2225 | logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) | ||
2226 | |||
2227 | for tid in changed: | ||
2228 | if tid not in self.rqdata.runq_setscene_tids: | ||
2229 | continue | ||
2230 | valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) | ||
2231 | if not valid: | ||
2232 | continue | ||
2233 | self.changed_setscene.add(tid) | ||
2234 | |||
2235 | if changed: | ||
2236 | self.update_holdofftasks() | ||
2237 | |||
2238 | def update_holdofftasks(self): | ||
2239 | self.holdoff_tasks = set(self.changed_setscene) | ||
2240 | |||
2241 | for tid in self.rqdata.runq_setscene_tids: | ||
2242 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: | ||
2243 | self.holdoff_tasks.add(tid) | ||
2244 | |||
2245 | for tid in self.holdoff_tasks.copy(): | ||
2246 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2247 | if dep not in self.runq_complete: | ||
2248 | self.holdoff_tasks.add(dep) | ||
2249 | logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) | ||
2250 | |||
2251 | def process_possible_migrations(self): | ||
2252 | changes = False | ||
2253 | for tid in self.changed_setscene.copy(): | ||
2254 | if tid in self.runq_running: | ||
2255 | self.changed_setscene.remove(tid) | ||
2256 | continue | ||
2257 | |||
2258 | valid = True | ||
2259 | # Check no tasks this covers are running | ||
2260 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2261 | if dep in self.runq_running and dep not in self.runq_complete: | ||
2262 | logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid)) | ||
2263 | valid = False | ||
2264 | break | ||
2265 | if not valid: | ||
2266 | continue | ||
2267 | |||
2268 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2269 | if dep not in self.runq_complete: | ||
2270 | if dep in self.tasks_scenequeue_done: | ||
2271 | self.tasks_scenequeue_done.remove(dep) | ||
2272 | if dep in self.tasks_notcovered: | ||
2273 | self.tasks_notcovered.remove(dep) | ||
2274 | |||
2275 | if tid in self.sq_buildable: | ||
2276 | self.sq_buildable.remove(tid) | ||
2277 | if tid in self.sq_running: | ||
2278 | self.sq_running.remove(tid) | ||
2279 | if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): | ||
2280 | if tid not in self.sq_buildable: | ||
2281 | self.sq_buildable.add(tid) | ||
2282 | |||
2283 | if tid in self.sqdata.outrightfail: | ||
2284 | self.sqdata.outrightfail.remove(tid) | ||
2285 | if tid in self.scenequeue_notcovered: | ||
2286 | self.scenequeue_notcovered.remove(tid) | ||
2287 | |||
2288 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
2289 | self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) | ||
2290 | |||
2291 | if tid in self.build_stamps: | ||
2292 | del self.build_stamps[tid] | ||
2293 | |||
2294 | logger.info("Setscene task %s now valid and being rerun" % tid) | ||
2295 | self.sqdone = False | ||
2296 | self.changed_setscene.remove(tid) | ||
2297 | changes = True | ||
2298 | |||
2299 | if changes: | ||
2300 | self.update_holdofftasks() | ||
2301 | |||
2170 | def scenequeue_process_notcovered(self, task): | 2302 | def scenequeue_process_notcovered(self, task): |
2171 | if len(self.rqdata.runtaskentries[task].depends) == 0: | 2303 | if len(self.rqdata.runtaskentries[task].depends) == 0: |
2172 | self.setbuildable(task) | 2304 | self.setbuildable(task) |
@@ -2194,7 +2326,7 @@ class RunQueueExecute: | |||
2194 | for deptask in self.rqdata.runtaskentries[t].revdeps: | 2326 | for deptask in self.rqdata.runtaskentries[t].revdeps: |
2195 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: | 2327 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: |
2196 | continue | 2328 | continue |
2197 | if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done): | 2329 | if deptask in self.sqdata.unskippable: |
2198 | new.add(deptask) | 2330 | new.add(deptask) |
2199 | self.tasks_scenequeue_done.add(deptask) | 2331 | self.tasks_scenequeue_done.add(deptask) |
2200 | self.tasks_notcovered.add(deptask) | 2332 | self.tasks_notcovered.add(deptask) |
@@ -2254,8 +2386,9 @@ class RunQueueExecute: | |||
2254 | self.tasks_covered.update(covered) | 2386 | self.tasks_covered.update(covered) |
2255 | self.coveredtopocess.remove(task) | 2387 | self.coveredtopocess.remove(task) |
2256 | for tid in covered: | 2388 | for tid in covered: |
2257 | if len(self.rqdata.runtaskentries[tid].depends) == 0: | 2389 | if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): |
2258 | self.setbuildable(tid) | 2390 | self.setbuildable(tid) |
2391 | self.update_holdofftasks() | ||
2259 | 2392 | ||
2260 | def sq_task_completeoutright(self, task): | 2393 | def sq_task_completeoutright(self, task): |
2261 | """ | 2394 | """ |
@@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2454 | 2587 | ||
2455 | rqdata.init_progress_reporter.next_stage() | 2588 | rqdata.init_progress_reporter.next_stage() |
2456 | 2589 | ||
2457 | # Build a list of setscene tasks which are "unskippable" | 2590 | # Build a list of tasks which are "unskippable" |
2458 | # These are direct endpoints referenced by the build | 2591 | # These are direct endpoints referenced by the build upto and including setscene tasks |
2459 | # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon | 2592 | # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon |
2460 | new = True | 2593 | new = True |
2461 | for tid in rqdata.runtaskentries: | 2594 | for tid in rqdata.runtaskentries: |
@@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2463 | sqdata.unskippable.add(tid) | 2596 | sqdata.unskippable.add(tid) |
2464 | while new: | 2597 | while new: |
2465 | new = False | 2598 | new = False |
2466 | for tid in sqdata.unskippable.copy(): | 2599 | orig = sqdata.unskippable.copy() |
2600 | for tid in orig: | ||
2467 | if tid in rqdata.runq_setscene_tids: | 2601 | if tid in rqdata.runq_setscene_tids: |
2468 | continue | 2602 | continue |
2469 | sqdata.unskippable.remove(tid) | ||
2470 | if len(rqdata.runtaskentries[tid].depends) == 0: | 2603 | if len(rqdata.runtaskentries[tid].depends) == 0: |
2471 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable | 2604 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable |
2472 | sqrq.tasks_notcovered.add(tid) | 2605 | sqrq.tasks_notcovered.add(tid) |
2473 | sqrq.tasks_scenequeue_done.add(tid) | 2606 | sqrq.tasks_scenequeue_done.add(tid) |
2474 | sqrq.setbuildable(tid) | 2607 | sqrq.setbuildable(tid) |
2475 | sqrq.scenequeue_process_unskippable(tid) | 2608 | sqrq.scenequeue_process_unskippable(tid) |
2476 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends | 2609 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends |
2477 | new = True | 2610 | if sqdata.unskippable != orig: |
2611 | new = True | ||
2478 | 2612 | ||
2479 | rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) | 2613 | rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) |
2480 | 2614 | ||
@@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent): | |||
2710 | runQueueEvent.__init__(self, task, stats, rq) | 2844 | runQueueEvent.__init__(self, task, stats, rq) |
2711 | self.reason = reason | 2845 | self.reason = reason |
2712 | 2846 | ||
2847 | class taskUniHashUpdate(bb.event.Event): | ||
2848 | """ | ||
2849 | Base runQueue event class | ||
2850 | """ | ||
2851 | def __init__(self, task, unihash): | ||
2852 | self.taskid = task | ||
2853 | self.unihash = unihash | ||
2854 | bb.event.Event.__init__(self) | ||
2855 | |||
2713 | class runQueuePipe(): | 2856 | class runQueuePipe(): |
2714 | """ | 2857 | """ |
2715 | Abstraction for a pipe between a worker thread and the server | 2858 | Abstraction for a pipe between a worker thread and the server |
@@ -2752,6 +2895,8 @@ class runQueuePipe(): | |||
2752 | except ValueError as e: | 2895 | except ValueError as e: |
2753 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) | 2896 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) |
2754 | bb.event.fire_from_worker(event, self.d) | 2897 | bb.event.fire_from_worker(event, self.d) |
2898 | if isinstance(event, taskUniHashUpdate): | ||
2899 | self.rqexec.updated_taskhash(event.taskid, event.unihash) | ||
2755 | found = True | 2900 | found = True |
2756 | self.queue = self.queue[index+8:] | 2901 | self.queue = self.queue[index+8:] |
2757 | index = self.queue.find(b"</event>") | 2902 | index = self.queue.find(b"</event>") |