diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-07-23 22:51:15 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-08-06 11:21:31 +0100 |
commit | 7df31ff36892c2f9c65326b06b4c7093b1462f54 (patch) | |
tree | 7992a608c29625c40fc1701537bf185dd93f8aec /bitbake | |
parent | 40eb5b344b4de5310a89e36024b826fc99484747 (diff) | |
download | poky-7df31ff36892c2f9c65326b06b4c7093b1462f54.tar.gz |
bitbake: runqueue: Enable dynamic task adjustment to hash equivalency
There is a compelling usecase for tasks being able to notify runqueue
that their "unihash" has changed. When this is recieved, the hashes of
all subsequent tasks should be recomputed and their new hashes checked
against existing setscene validity. Any newly available setscene tasks
should then be executed.
Making this work effectively needs several pieces. An event is added
which the cooker listen for. If a new hash becomes available it can
send an event to notify of this.
When such an event is seen, hash recomputations are made. A setscene
task can't be run until all the tasks it "covers" are stopped. The
notion of "holdoff" tasks is therefore added, these are removed from
the buildable list with the assumption that some setscene task will
run and cover them.
The workers need to be notified when taskhashes change to update their
own internal siggen data stores. A new worker command is added to do this
which will affect all newly spawned worker processes from that worker.
An example workflow which tests this code is:
Configuration:
BB_SIGNATURE_HANDLER = "OEEquivHash"
SSTATE_HASHEQUIV_SERVER = "http://localhost:8686"
$ bitbake-hashserv &
$ bitbake automake-native
$ bitbake autoconf-native automake-native -c clean
$ bitbake m4-native -c install -f
$ bitbake automake-native
with the test being whether automake-native is installed from sstate.
(Bitbake rev: 1f630fdf0260db08541d3ca9f25f852931c19905)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rwxr-xr-x | bitbake/bin/bitbake-worker | 6 | ||||
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 165 |
2 files changed, 161 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker index f63f060c57..3e502d5ca9 100755 --- a/bitbake/bin/bitbake-worker +++ b/bitbake/bin/bitbake-worker | |||
@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha | |||
234 | the_data.setVar(varname, value) | 234 | the_data.setVar(varname, value) |
235 | 235 | ||
236 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) | 236 | bb.parse.siggen.set_taskdata(workerdata["sigdata"]) |
237 | if "newhashes" in workerdata: | ||
238 | bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) | ||
237 | ret = 0 | 239 | ret = 0 |
238 | 240 | ||
239 | the_data = bb_cache.loadDataFull(fn, appends) | 241 | the_data = bb_cache.loadDataFull(fn, appends) |
@@ -377,6 +379,7 @@ class BitbakeWorker(object): | |||
377 | self.handle_item(b"cookerconfig", self.handle_cookercfg) | 379 | self.handle_item(b"cookerconfig", self.handle_cookercfg) |
378 | self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) | 380 | self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) |
379 | self.handle_item(b"workerdata", self.handle_workerdata) | 381 | self.handle_item(b"workerdata", self.handle_workerdata) |
382 | self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) | ||
380 | self.handle_item(b"runtask", self.handle_runtask) | 383 | self.handle_item(b"runtask", self.handle_runtask) |
381 | self.handle_item(b"finishnow", self.handle_finishnow) | 384 | self.handle_item(b"finishnow", self.handle_finishnow) |
382 | self.handle_item(b"ping", self.handle_ping) | 385 | self.handle_item(b"ping", self.handle_ping) |
@@ -416,6 +419,9 @@ class BitbakeWorker(object): | |||
416 | for mc in self.databuilder.mcdata: | 419 | for mc in self.databuilder.mcdata: |
417 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) | 420 | self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) |
418 | 421 | ||
422 | def handle_newtaskhashes(self, data): | ||
423 | self.workerdata["newhashes"] = pickle.loads(data) | ||
424 | |||
419 | def handle_ping(self, _): | 425 | def handle_ping(self, _): |
420 | workerlog_write("Handling ping\n") | 426 | workerlog_write("Handling ping\n") |
421 | 427 | ||
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index 519561c231..11b98f698d 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -149,7 +149,7 @@ class RunQueueScheduler(object): | |||
149 | Return the id of the first task we find that is buildable | 149 | Return the id of the first task we find that is buildable |
150 | """ | 150 | """ |
151 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] | 151 | self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] |
152 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] | 152 | buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks] |
153 | if not buildable: | 153 | if not buildable: |
154 | return None | 154 | return None |
155 | 155 | ||
@@ -206,6 +206,9 @@ class RunQueueScheduler(object): | |||
206 | def newbuildable(self, task): | 206 | def newbuildable(self, task): |
207 | self.buildable.append(task) | 207 | self.buildable.append(task) |
208 | 208 | ||
209 | def removebuildable(self, task): | ||
210 | self.buildable.remove(task) | ||
211 | |||
209 | def describe_task(self, taskid): | 212 | def describe_task(self, taskid): |
210 | result = 'ID %s' % taskid | 213 | result = 'ID %s' % taskid |
211 | if self.rev_prio_map: | 214 | if self.rev_prio_map: |
@@ -1719,6 +1722,8 @@ class RunQueueExecute: | |||
1719 | self.sq_running = set() | 1722 | self.sq_running = set() |
1720 | self.sq_live = set() | 1723 | self.sq_live = set() |
1721 | 1724 | ||
1725 | self.changed_setscene = set() | ||
1726 | |||
1722 | self.runq_buildable = set() | 1727 | self.runq_buildable = set() |
1723 | self.runq_running = set() | 1728 | self.runq_running = set() |
1724 | self.runq_complete = set() | 1729 | self.runq_complete = set() |
@@ -1730,6 +1735,7 @@ class RunQueueExecute: | |||
1730 | 1735 | ||
1731 | self.stampcache = {} | 1736 | self.stampcache = {} |
1732 | 1737 | ||
1738 | self.holdoff_tasks = set() | ||
1733 | self.sqdone = False | 1739 | self.sqdone = False |
1734 | 1740 | ||
1735 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) | 1741 | self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) |
@@ -1925,6 +1931,7 @@ class RunQueueExecute: | |||
1925 | """ | 1931 | """ |
1926 | 1932 | ||
1927 | self.rq.read_workers() | 1933 | self.rq.read_workers() |
1934 | self.process_possible_migrations() | ||
1928 | 1935 | ||
1929 | task = None | 1936 | task = None |
1930 | if not self.sqdone and self.can_start_task(): | 1937 | if not self.sqdone and self.can_start_task(): |
@@ -2007,7 +2014,7 @@ class RunQueueExecute: | |||
2007 | if self.can_start_task(): | 2014 | if self.can_start_task(): |
2008 | return True | 2015 | return True |
2009 | 2016 | ||
2010 | if not self.sq_live and not self.sqdone and not self.sq_deferred: | 2017 | if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: |
2011 | logger.info("Setscene tasks completed") | 2018 | logger.info("Setscene tasks completed") |
2012 | logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) | 2019 | logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) |
2013 | 2020 | ||
@@ -2167,6 +2174,131 @@ class RunQueueExecute: | |||
2167 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) | 2174 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) |
2168 | return taskdepdata | 2175 | return taskdepdata |
2169 | 2176 | ||
2177 | def updated_taskhash(self, tid, unihash): | ||
2178 | changed = set() | ||
2179 | if unihash != self.rqdata.runtaskentries[tid].unihash: | ||
2180 | logger.info("Task %s unihash changed to %s" % (tid, unihash)) | ||
2181 | self.rqdata.runtaskentries[tid].unihash = unihash | ||
2182 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
2183 | bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash) | ||
2184 | |||
2185 | # Work out all tasks which depend on this one | ||
2186 | total = set() | ||
2187 | next = set(self.rqdata.runtaskentries[tid].revdeps) | ||
2188 | while next: | ||
2189 | current = next.copy() | ||
2190 | total = total |next | ||
2191 | next = set() | ||
2192 | for ntid in current: | ||
2193 | next |= self.rqdata.runtaskentries[ntid].revdeps | ||
2194 | next.difference_update(total) | ||
2195 | |||
2196 | # Now iterate those tasks in dependency order to regenerate their taskhash/unihash | ||
2197 | done = set() | ||
2198 | next = set(self.rqdata.runtaskentries[tid].revdeps) | ||
2199 | while next: | ||
2200 | current = next.copy() | ||
2201 | next = set() | ||
2202 | for tid in current: | ||
2203 | if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): | ||
2204 | continue | ||
2205 | procdep = [] | ||
2206 | for dep in self.rqdata.runtaskentries[tid].depends: | ||
2207 | procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep)) | ||
2208 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
2209 | orighash = self.rqdata.runtaskentries[tid].hash | ||
2210 | self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc]) | ||
2211 | origuni = self.rqdata.runtaskentries[tid].unihash | ||
2212 | self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname) | ||
2213 | logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) | ||
2214 | next |= self.rqdata.runtaskentries[tid].revdeps | ||
2215 | changed.add(tid) | ||
2216 | total.remove(tid) | ||
2217 | next.intersection_update(total) | ||
2218 | |||
2219 | if changed: | ||
2220 | for mc in self.rq.worker: | ||
2221 | self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | ||
2222 | for mc in self.rq.fakeworker: | ||
2223 | self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | ||
2224 | |||
2225 | logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) | ||
2226 | |||
2227 | for tid in changed: | ||
2228 | if tid not in self.rqdata.runq_setscene_tids: | ||
2229 | continue | ||
2230 | valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) | ||
2231 | if not valid: | ||
2232 | continue | ||
2233 | self.changed_setscene.add(tid) | ||
2234 | |||
2235 | if changed: | ||
2236 | self.update_holdofftasks() | ||
2237 | |||
2238 | def update_holdofftasks(self): | ||
2239 | self.holdoff_tasks = set(self.changed_setscene) | ||
2240 | |||
2241 | for tid in self.rqdata.runq_setscene_tids: | ||
2242 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: | ||
2243 | self.holdoff_tasks.add(tid) | ||
2244 | |||
2245 | for tid in self.holdoff_tasks.copy(): | ||
2246 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2247 | if dep not in self.runq_complete: | ||
2248 | self.holdoff_tasks.add(dep) | ||
2249 | logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) | ||
2250 | |||
2251 | def process_possible_migrations(self): | ||
2252 | changes = False | ||
2253 | for tid in self.changed_setscene.copy(): | ||
2254 | if tid in self.runq_running: | ||
2255 | self.changed_setscene.remove(tid) | ||
2256 | continue | ||
2257 | |||
2258 | valid = True | ||
2259 | # Check no tasks this covers are running | ||
2260 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2261 | if dep in self.runq_running and dep not in self.runq_complete: | ||
2262 | logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid)) | ||
2263 | valid = False | ||
2264 | break | ||
2265 | if not valid: | ||
2266 | continue | ||
2267 | |||
2268 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2269 | if dep not in self.runq_complete: | ||
2270 | if dep in self.tasks_scenequeue_done: | ||
2271 | self.tasks_scenequeue_done.remove(dep) | ||
2272 | if dep in self.tasks_notcovered: | ||
2273 | self.tasks_notcovered.remove(dep) | ||
2274 | |||
2275 | if tid in self.sq_buildable: | ||
2276 | self.sq_buildable.remove(tid) | ||
2277 | if tid in self.sq_running: | ||
2278 | self.sq_running.remove(tid) | ||
2279 | if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): | ||
2280 | if tid not in self.sq_buildable: | ||
2281 | self.sq_buildable.add(tid) | ||
2282 | |||
2283 | if tid in self.sqdata.outrightfail: | ||
2284 | self.sqdata.outrightfail.remove(tid) | ||
2285 | if tid in self.scenequeue_notcovered: | ||
2286 | self.scenequeue_notcovered.remove(tid) | ||
2287 | |||
2288 | (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) | ||
2289 | self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True) | ||
2290 | |||
2291 | if tid in self.build_stamps: | ||
2292 | del self.build_stamps[tid] | ||
2293 | |||
2294 | logger.info("Setscene task %s now valid and being rerun" % tid) | ||
2295 | self.sqdone = False | ||
2296 | self.changed_setscene.remove(tid) | ||
2297 | changes = True | ||
2298 | |||
2299 | if changes: | ||
2300 | self.update_holdofftasks() | ||
2301 | |||
2170 | def scenequeue_process_notcovered(self, task): | 2302 | def scenequeue_process_notcovered(self, task): |
2171 | if len(self.rqdata.runtaskentries[task].depends) == 0: | 2303 | if len(self.rqdata.runtaskentries[task].depends) == 0: |
2172 | self.setbuildable(task) | 2304 | self.setbuildable(task) |
@@ -2194,7 +2326,7 @@ class RunQueueExecute: | |||
2194 | for deptask in self.rqdata.runtaskentries[t].revdeps: | 2326 | for deptask in self.rqdata.runtaskentries[t].revdeps: |
2195 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: | 2327 | if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: |
2196 | continue | 2328 | continue |
2197 | if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done): | 2329 | if deptask in self.sqdata.unskippable: |
2198 | new.add(deptask) | 2330 | new.add(deptask) |
2199 | self.tasks_scenequeue_done.add(deptask) | 2331 | self.tasks_scenequeue_done.add(deptask) |
2200 | self.tasks_notcovered.add(deptask) | 2332 | self.tasks_notcovered.add(deptask) |
@@ -2254,8 +2386,9 @@ class RunQueueExecute: | |||
2254 | self.tasks_covered.update(covered) | 2386 | self.tasks_covered.update(covered) |
2255 | self.coveredtopocess.remove(task) | 2387 | self.coveredtopocess.remove(task) |
2256 | for tid in covered: | 2388 | for tid in covered: |
2257 | if len(self.rqdata.runtaskentries[tid].depends) == 0: | 2389 | if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): |
2258 | self.setbuildable(tid) | 2390 | self.setbuildable(tid) |
2391 | self.update_holdofftasks() | ||
2259 | 2392 | ||
2260 | def sq_task_completeoutright(self, task): | 2393 | def sq_task_completeoutright(self, task): |
2261 | """ | 2394 | """ |
@@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2454 | 2587 | ||
2455 | rqdata.init_progress_reporter.next_stage() | 2588 | rqdata.init_progress_reporter.next_stage() |
2456 | 2589 | ||
2457 | # Build a list of setscene tasks which are "unskippable" | 2590 | # Build a list of tasks which are "unskippable" |
2458 | # These are direct endpoints referenced by the build | 2591 | # These are direct endpoints referenced by the build upto and including setscene tasks |
2459 | # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon | 2592 | # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon |
2460 | new = True | 2593 | new = True |
2461 | for tid in rqdata.runtaskentries: | 2594 | for tid in rqdata.runtaskentries: |
@@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): | |||
2463 | sqdata.unskippable.add(tid) | 2596 | sqdata.unskippable.add(tid) |
2464 | while new: | 2597 | while new: |
2465 | new = False | 2598 | new = False |
2466 | for tid in sqdata.unskippable.copy(): | 2599 | orig = sqdata.unskippable.copy() |
2600 | for tid in orig: | ||
2467 | if tid in rqdata.runq_setscene_tids: | 2601 | if tid in rqdata.runq_setscene_tids: |
2468 | continue | 2602 | continue |
2469 | sqdata.unskippable.remove(tid) | ||
2470 | if len(rqdata.runtaskentries[tid].depends) == 0: | 2603 | if len(rqdata.runtaskentries[tid].depends) == 0: |
2471 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable | 2604 | # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable |
2472 | sqrq.tasks_notcovered.add(tid) | 2605 | sqrq.tasks_notcovered.add(tid) |
2473 | sqrq.tasks_scenequeue_done.add(tid) | 2606 | sqrq.tasks_scenequeue_done.add(tid) |
2474 | sqrq.setbuildable(tid) | 2607 | sqrq.setbuildable(tid) |
2475 | sqrq.scenequeue_process_unskippable(tid) | 2608 | sqrq.scenequeue_process_unskippable(tid) |
2476 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends | 2609 | sqdata.unskippable |= rqdata.runtaskentries[tid].depends |
2477 | new = True | 2610 | if sqdata.unskippable != orig: |
2611 | new = True | ||
2478 | 2612 | ||
2479 | rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) | 2613 | rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) |
2480 | 2614 | ||
@@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent): | |||
2710 | runQueueEvent.__init__(self, task, stats, rq) | 2844 | runQueueEvent.__init__(self, task, stats, rq) |
2711 | self.reason = reason | 2845 | self.reason = reason |
2712 | 2846 | ||
2847 | class taskUniHashUpdate(bb.event.Event): | ||
2848 | """ | ||
2849 | Base runQueue event class | ||
2850 | """ | ||
2851 | def __init__(self, task, unihash): | ||
2852 | self.taskid = task | ||
2853 | self.unihash = unihash | ||
2854 | bb.event.Event.__init__(self) | ||
2855 | |||
2713 | class runQueuePipe(): | 2856 | class runQueuePipe(): |
2714 | """ | 2857 | """ |
2715 | Abstraction for a pipe between a worker thread and the server | 2858 | Abstraction for a pipe between a worker thread and the server |
@@ -2752,6 +2895,8 @@ class runQueuePipe(): | |||
2752 | except ValueError as e: | 2895 | except ValueError as e: |
2753 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) | 2896 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) |
2754 | bb.event.fire_from_worker(event, self.d) | 2897 | bb.event.fire_from_worker(event, self.d) |
2898 | if isinstance(event, taskUniHashUpdate): | ||
2899 | self.rqexec.updated_taskhash(event.taskid, event.unihash) | ||
2755 | found = True | 2900 | found = True |
2756 | self.queue = self.queue[index+8:] | 2901 | self.queue = self.queue[index+8:] |
2757 | index = self.queue.find(b"</event>") | 2902 | index = self.queue.find(b"</event>") |