diff options
| -rwxr-xr-x | bitbake/bin/bitbake-worker | 6 | ||||
| -rw-r--r-- | bitbake/lib/bb/runqueue.py | 165 |
2 files changed, 161 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index f63f060c57..3e502d5ca9 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
| @@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
| 234 | the_data.setVar(varname, value) | 234 | the_data.setVar(varname, value) |
| 235 | 235 | ||
| 236 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) | 236 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) |
| 237 | if "newhashes" in workerdata: | ||
| 238 | bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) | ||
| 237 | ret = 0 | 239 | ret = 0 |
| 238 | 240 | ||
| 239 | the_data = bb_cache.loadDataFull(fn, appends) | 241 | the_data = bb_cache.loadDataFull(fn, appends) |
| @@ -377,6 +379,7 @@ class BitbakeWorker(object): | |||
| 377 | self.handle_item(b"cookerconfig", self.handle_cookercfg) | 379 | self.handle_item(b"cookerconfig", self.handle_cookercfg) |
| 378 | self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) | 380 | self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) |
| 379 | self.handle_item(b"workerdata", self.handle_workerdata) | 381 | self.handle_item(b"workerdata", self.handle_workerdata) |
| 382 | self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) | ||
| 380 | self.handle_item(b"runtask", self.handle_runtask) | 383 | self.handle_item(b"runtask", self.handle_runtask) |
| 381 | self.handle_item(b"finishnow", self.handle_finishnow) | 384 | self.handle_item(b"finishnow", self.handle_finishnow) |
| 382 | self.handle_item(b"ping", self.handle_ping) | 385 | self.handle_item(b"ping", self.handle_ping) |
| @@ -416,6 +419,9 @@ class BitbakeWorker(object): | |||
| 416 | for mc in self.databuilder.mcdata: | 419 | for mc in self.databuilder.mcdata: |
| 417 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) | 420 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) |
| 418 | 421 | ||
| 422 | def handle_newtaskhashes(self, data): | ||
| 423 | self.workerdata["newhashes"] = pickle.loads(data) | ||
| 424 | |||
| 419 | def handle_ping(self, _): | 425 | def handle_ping(self, _): |
| 420 | workerlog_write("Handling ping\n") | 426 | workerlog_write("Handling ping\n") |
| 421 | 427 | ||
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 519561c231..11b98f698d 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
| @@ -149,7 +149,7 @@ class RunQueueScheduler(object): | |||
| 149 | Return the id of the first task we find that is buildable | 149 | Return the id of the first task we find that is buildable |
| 150 | """ | 150 | """ |
| 151 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] | 151 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] |
| 152 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] | 152 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks] |
| 153 | if not buildable: | 153 | if not buildable: |
| 154 | return None | 154 | return None |
| 155 | 155 | ||
| @@ -206,6 +206,9 @@ class RunQueueScheduler(object): | |||
| 206 | def newbuildable(self, task): | 206 | def newbuildable(self, task): |
| 207 | self.buildable.append(task) | 207 | self.buildable.append(task) |
| 208 | 208 | ||
| 209 | def removebuildable(self, task): | ||
| 210 | self.buildable.remove(task) | ||
| 211 | |||
| 209 | def describe_task(self, taskid): | 212 | def describe_task(self, taskid): |
| 210 | result = 'ID %s' % taskid | 213 | result = 'ID %s' % taskid |
| 211 | if self.rev_prio_map: | 214 | if self.rev_prio_map: |
| @@ -1719,6 +1722,8 @@ class RunQueueExecute: | |||
| 1719 | self.sq_running = set() | 1722 | self.sq_running = set() |
| 1720 | self.sq_live = set() | 1723 | self.sq_live = set() |
| 1721 | 1724 | ||
| 1725 | self.changed_setscene = set() | ||
| 1726 | |||
| 1722 | self.runq_buildable = set() | 1727 | self.runq_buildable = set() |
| 1723 | self.runq_running = set() | 1728 | self.runq_running = set() |
| 1724 | self.runq_complete = set() | 1729 | self.runq_complete = set() |
| @@ -1730,6 +1735,7 @@ class RunQueueExecute: | |||
| 1730 | 1735 | ||
| 1731 | self.stampcache = {} | 1736 | self.stampcache = {} |
| 1732 | 1737 | ||
| 1738 | self.holdoff_tasks = set() | ||
| 1733 | self.sqdone = False | 1739 | self.sqdone = False |
| 1734 | 1740 | ||
| 1735 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) | 1741 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) |
| @@ -1925,6 +1931,7 @@ class RunQueueExecute: | |||
| 1925 | """ | 1931 | """ |
| 1926 | 1932 | ||
| 1927 | self.rq.read_workers() | 1933 | self.rq.read_workers() |
| 1934 | self.process_possible_migrations() | ||
| 1928 | 1935 | ||
| 1929 | task = None | 1936 | task = None |
| 1930 | if not self.sqdone and self.can_start_task(): | 1937 | if not self.sqdone and self.can_start_task(): |
| @@ -2007,7 +2014,7 @@ class RunQueueExecute: | |||
| 2007 | if self.can_start_task(): | 2014 | if self.can_start_task(): |
| 2008 | return True | 2015 | return True |
| 2009 | 2016 | ||
| 2010 | if not self.sq_live and not self.sqdone and not self.sq_deferred: | 2017 | if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: |
| 2011 | logger.info("Setscene tasks completed") | 2018 | logger.info("Setscene tasks completed") |
| 2012 | logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) | 2019 | logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) |
| 2013 | 2020 | ||
| @@ -2167,6 +2174,131 @@ class RunQueueExecute: | |||
| 2167 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) | 2174 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) |
| 2168 | return taskdepdata | 2175 | return taskdepdata |
| 2169 | 2176 | ||
| 2177 | def updated_taskhash(self, tid, unihash): | ||
| 2178 | changed = set() | ||
| 2179 | if unihash != self.rqdata.runtaskentries[tid].unihash: | ||
| 2180 | logger.info("Task %s unihash changed to %s" % (tid, unihash)) | ||
| 2181 | self.rqdata.runtaskentries[tid].unihash = unihash | ||
| 2182 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
| 2183 | bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash) | ||
| 2184 | |||
| 2185 | # Work out all tasks which depend on this one | ||
| 2186 | total = set() | ||
| 2187 | next = set(self.rqdata.runtaskentries[tid].revdeps) | ||
| 2188 | while next: | ||
| 2189 | current = next.copy() | ||
| 2190 | total = total |next | ||
| 2191 | next = set() | ||
| 2192 | for ntid in current: | ||
| 2193 | next |= self.rqdata.runtaskentries[ntid].revdeps | ||
| 2194 | next.difference_update(total) | ||
| 2195 | |||
| 2196 | # Now iterate those tasks in dependency order to regenerate their taskhash/unihash | ||
| 2197 | done = set() | ||
| 2198 | next = set(self.rqdata.runtaskentries[tid].revdeps) | ||
| 2199 | while next: | ||
| 2200 | current = next.copy() | ||
| 2201 | next = set() | ||
| 2202 | for tid in current: | ||
| 2203 | if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): | ||
| 2204 | continue | ||
| 2205 | procdep = [] | ||
| 2206 | for dep in self.rqdata.runtaskentries[tid].depends: | ||
| 2207 | procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep)) | ||
| 2208 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
| 2209 | orighash = self.rqdata.runtaskentries[tid].hash | ||
| 2210 | self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc]) | ||
| 2211 | origuni = self.rqdata.runtaskentries[tid].unihash | ||
| 2212 | self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname) | ||
| 2213 | logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) | ||
| 2214 | next |= self.rqdata.runtaskentries[tid].revdeps | ||
| 2215 | changed.add(tid) | ||
| 2216 | total.remove(tid) | ||
| 2217 | next.intersection_update(total) | ||
| 2218 | |||
| 2219 | if changed: | ||
| 2220 | for mc in self.rq.worker: | ||
| 2221 | self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | ||
| 2222 | for mc in self.rq.fakeworker: | ||
| 2223 | self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | ||
| 2224 | |||
| 2225 | logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) | ||
| 2226 | |||
| 2227 | for tid in changed: | ||
| 2228 | if tid not in self.rqdata.runq_setscene_tids: | ||
| 2229 | continue | ||
| 2230 | valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) | ||
| 2231 | if not valid: | ||
| 2232 | continue | ||
| 2233 | self.changed_setscene.add(tid) | ||
| 2234 | |||
| 2235 | if changed: | ||
| 2236 | self.update_holdofftasks() | ||
| 2237 | |||
| 2238 | def update_holdofftasks(self): | ||
| 2239 | self.holdoff_tasks = set(self.changed_setscene) | ||
| 2240 | |||
| 2241 | for tid in self.rqdata.runq_setscene_tids: | ||
| 2242 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: | ||
| 2243 | self.holdoff_tasks.add(tid) | ||
| 2244 | |||
| 2245 | for tid in self.holdoff_tasks.copy(): | ||
| 2246 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
| 2247 | if dep not in self.runq_complete: | ||
| 2248 | self.holdoff_tasks.add(dep) | ||
| 2249 | logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) | ||
| 2250 | |||
| 2251 | def process_possible_migrations(self): | ||
| 2252 | changes = False | ||
| 2253 | for tid in self.changed_setscene.copy(): | ||
| 2254 | if tid in self.runq_running: | ||
| 2255 | self.changed_setscene.remove(tid) | ||
| 2256 | continue | ||
| 2257 | |||
| 2258 | valid = True | ||
| 2259 | # Check no tasks this covers are running | ||
| 2260 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
| 2261 | if dep in self.runq_running and dep not in self.runq_complete: | ||
| 2262 | logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid)) | ||
| 2263 | valid = False | ||
| 2264 | break | ||
| 2265 | if not valid: | ||
| 2266 | continue | ||
| 2267 | |||
| 2268 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
| 2269 | if dep not in self.runq_complete: | ||
| 2270 | if dep in self.tasks_scenequeue_done: | ||
| 2271 | self.tasks_scenequeue_done.remove(dep) | ||
| 2272 | if dep in self.tasks_notcovered: | ||
| 2273 | self.tasks_notcovered.remove(dep) | ||
| 2274 | |||
| 2275 | if tid in self.sq_buildable: | ||
| 2276 | self.sq_buildable.remove(tid) | ||
| 2277 | if tid in self.sq_running: | ||
| 2278 | self.sq_running.remove(tid) | ||
| 2279 | if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): | ||
| 2280 | if tid not in self.sq_buildable: | ||
| 2281 | self.sq_buildable.add(tid) | ||
| 2282 | |||
| 2283 | if tid in self.sqdata.outrightfail: | ||
| 2284 | self.sqdata.outrightfail.remove(tid) | ||
| 2285 | if tid in self.scenequeue_notcovered: | ||
| 2286 | self.scenequeue_notcovered.remove(tid) | ||
| 2287 | |||
| 2288 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
| 2289 | self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) | ||
| 2290 | |||
| 2291 | if tid in self.build_stamps: | ||
| 2292 | del self.build_stamps[tid] | ||
| 2293 | |||
| 2294 | logger.info("Setscene task %s now valid and being rerun" % tid) | ||
| 2295 | self.sqdone = False | ||
| 2296 | self.changed_setscene.remove(tid) | ||
| 2297 | changes = True | ||
| 2298 | |||
| 2299 | if changes: | ||
| 2300 | self.update_holdofftasks() | ||
| 2301 | |||
| 2170 | def scenequeue_process_notcovered(self, task): | 2302 | def scenequeue_process_notcovered(self, task): |
| 2171 | if len(self.rqdata.runtaskentries[task].depends) == 0: | 2303 | if len(self.rqdata.runtaskentries[task].depends) == 0: |
| 2172 | self.setbuildable(task) | 2304 | self.setbuildable(task) |
| @@ -2194,7 +2326,7 @@ class RunQueueExecute: | |||
| 2194 | for deptask in self.rqdata.runtaskentries[t].revdeps: | 2326 | for deptask in self.rqdata.runtaskentries[t].revdeps: |
| 2195 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: | 2327 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: |
| 2196 | continue | 2328 | continue |
| 2197 | if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done): | 2329 | if deptask in self.sqdata.unskippable: |
| 2198 | new.add(deptask) | 2330 | new.add(deptask) |
| 2199 | self.tasks_scenequeue_done.add(deptask) | 2331 | self.tasks_scenequeue_done.add(deptask) |
| 2200 | self.tasks_notcovered.add(deptask) | 2332 | self.tasks_notcovered.add(deptask) |
| @@ -2254,8 +2386,9 @@ class RunQueueExecute: | |||
| 2254 | self.tasks_covered.update(covered) | 2386 | self.tasks_covered.update(covered) |
| 2255 | self.coveredtopocess.remove(task) | 2387 | self.coveredtopocess.remove(task) |
| 2256 | for tid in covered: | 2388 | for tid in covered: |
| 2257 | if len(self.rqdata.runtaskentries[tid].depends) == 0: | 2389 | if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): |
| 2258 | self.setbuildable(tid) | 2390 | self.setbuildable(tid) |
| 2391 | self.update_holdofftasks() | ||
| 2259 | 2392 | ||
| 2260 | def sq_task_completeoutright(self, task): | 2393 | def sq_task_completeoutright(self, task): |
| 2261 | """ | 2394 | """ |
| @@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
| 2454 | 2587 | ||
| 2455 | rqdata.init_progress_reporter.next_stage() | 2588 | rqdata.init_progress_reporter.next_stage() |
| 2456 | 2589 | ||
| 2457 | # Build a list of setscene tasks which are "unskippable" | 2590 | # Build a list of tasks which are "unskippable" |
| 2458 | # These are direct endpoints referenced by the build | 2591 | # These are direct endpoints referenced by the build upto and including setscene tasks |
| 2459 | # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon | 2592 | # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon |
| 2460 | new = True | 2593 | new = True |
| 2461 | for tid in rqdata.runtaskentries: | 2594 | for tid in rqdata.runtaskentries: |
| @@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
| 2463 | sqdata.unskippable.add(tid) | 2596 | sqdata.unskippable.add(tid) |
| 2464 | while new: | 2597 | while new: |
| 2465 | new = False | 2598 | new = False |
| 2466 | for tid in sqdata.unskippable.copy(): | 2599 | orig = sqdata.unskippable.copy() |
| 2600 | for tid in orig: | ||
| 2467 | if tid in rqdata.runq_setscene_tids: | 2601 | if tid in rqdata.runq_setscene_tids: |
| 2468 | continue | 2602 | continue |
| 2469 | sqdata.unskippable.remove(tid) | ||
| 2470 | if len(rqdata.runtaskentries[tid].depends) == 0: | 2603 | if len(rqdata.runtaskentries[tid].depends) == 0: |
| 2471 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable | 2604 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable |
| 2472 | sqrq.tasks_notcovered.add(tid) | 2605 | sqrq.tasks_notcovered.add(tid) |
| 2473 | sqrq.tasks_scenequeue_done.add(tid) | 2606 | sqrq.tasks_scenequeue_done.add(tid) |
| 2474 | sqrq.setbuildable(tid) | 2607 | sqrq.setbuildable(tid) |
| 2475 | sqrq.scenequeue_process_unskippable(tid) | 2608 | sqrq.scenequeue_process_unskippable(tid) |
| 2476 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends | 2609 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends |
| 2477 | new = True | 2610 | if sqdata.unskippable != orig: |
| 2611 | new = True | ||
| 2478 | 2612 | ||
| 2479 | rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) | 2613 | rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) |
| 2480 | 2614 | ||
| @@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent): | |||
| 2710 | runQueueEvent.__init__(self, task, stats, rq) | 2844 | runQueueEvent.__init__(self, task, stats, rq) |
| 2711 | self.reason = reason | 2845 | self.reason = reason |
| 2712 | 2846 | ||
| 2847 | class taskUniHashUpdate(bb.event.Event): | ||
| 2848 | """ | ||
| 2849 | Base runQueue event class | ||
| 2850 | """ | ||
| 2851 | def __init__(self, task, unihash): | ||
| 2852 | self.taskid = task | ||
| 2853 | self.unihash = unihash | ||
| 2854 | bb.event.Event.__init__(self) | ||
| 2855 | |||
| 2713 | class runQueuePipe(): | 2856 | class runQueuePipe(): |
| 2714 | """ | 2857 | """ |
| 2715 | Abstraction for a pipe between a worker thread and the server | 2858 | Abstraction for a pipe between a worker thread and the server |
| @@ -2752,6 +2895,8 @@ class runQueuePipe(): | |||
| 2752 | except ValueError as e: | 2895 | except ValueError as e: |
| 2753 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) | 2896 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) |
| 2754 | bb.event.fire_from_worker(event, self.d) | 2897 | bb.event.fire_from_worker(event, self.d) |
| 2898 | if isinstance(event, taskUniHashUpdate): | ||
| 2899 | self.rqexec.updated_taskhash(event.taskid, event.unihash) | ||
| 2755 | found = True | 2900 | found = True |
| 2756 | self.queue = self.queue[index+8:] | 2901 | self.queue = self.queue[index+8:] |
| 2757 | index = self.queue.find(b"</event>") | 2902 | index = self.queue.find(b"</event>") |
