summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2019-07-23 22:51:15 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-06 11:21:31 +0100
commit7df31ff36892c2f9c65326b06b4c7093b1462f54 (patch)
tree7992a608c29625c40fc1701537bf185dd93f8aec /bitbake
parent40eb5b344b4de5310a89e36024b826fc99484747 (diff)
downloadpoky-7df31ff36892c2f9c65326b06b4c7093b1462f54.tar.gz
bitbake: runqueue: Enable dynamic task adjustment to hash equivalency
There is a compelling usecase for tasks being able to notify runqueue that their "unihash" has changed. When this is recieved, the hashes of all subsequent tasks should be recomputed and their new hashes checked against existing setscene validity. Any newly available setscene tasks should then be executed. Making this work effectively needs several pieces. An event is added which the cooker listen for. If a new hash becomes available it can send an event to notify of this. When such an event is seen, hash recomputations are made. A setscene task can't be run until all the tasks it "covers" are stopped. The notion of "holdoff" tasks is therefore added, these are removed from the buildable list with the assumption that some setscene task will run and cover them. The workers need to be notified when taskhashes change to update their own internal siggen data stores. A new worker command is added to do this which will affect all newly spawned worker processes from that worker. An example workflow which tests this code is: Configuration: BB_SIGNATURE_HANDLER = "OEEquivHash" SSTATE_HASHEQUIV_SERVER = "http://localhost:8686" $ bitbake-hashserv & $ bitbake automake-native $ bitbake autoconf-native automake-native -c clean $ bitbake m4-native -c install -f $ bitbake automake-native with the test being whether automake-native is installed from sstate. (Bitbake rev: 1f630fdf0260db08541d3ca9f25f852931c19905) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rwxr-xr-xbitbake/bin/bitbake-worker6
-rw-r--r--bitbake/lib/bb/runqueue.py165
2 files changed, 161 insertions, 10 deletions
diff --git a/bitbake/bin/bitbake-worker b/bitbake/bin/bitbake-worker
index f63f060c57..3e502d5ca9 100755
--- a/bitbake/bin/bitbake-worker
+++ b/bitbake/bin/bitbake-worker
@@ -234,6 +234,8 @@ def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskha
234 the_data.setVar(varname, value) 234 the_data.setVar(varname, value)
235 235
236 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 236 bb.parse.siggen.set_taskdata(workerdata["sigdata"])
237 if "newhashes" in workerdata:
238 bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
237 ret = 0 239 ret = 0
238 240
239 the_data = bb_cache.loadDataFull(fn, appends) 241 the_data = bb_cache.loadDataFull(fn, appends)
@@ -377,6 +379,7 @@ class BitbakeWorker(object):
377 self.handle_item(b"cookerconfig", self.handle_cookercfg) 379 self.handle_item(b"cookerconfig", self.handle_cookercfg)
378 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 380 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
379 self.handle_item(b"workerdata", self.handle_workerdata) 381 self.handle_item(b"workerdata", self.handle_workerdata)
382 self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
380 self.handle_item(b"runtask", self.handle_runtask) 383 self.handle_item(b"runtask", self.handle_runtask)
381 self.handle_item(b"finishnow", self.handle_finishnow) 384 self.handle_item(b"finishnow", self.handle_finishnow)
382 self.handle_item(b"ping", self.handle_ping) 385 self.handle_item(b"ping", self.handle_ping)
@@ -416,6 +419,9 @@ class BitbakeWorker(object):
416 for mc in self.databuilder.mcdata: 419 for mc in self.databuilder.mcdata:
417 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 420 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
418 421
422 def handle_newtaskhashes(self, data):
423 self.workerdata["newhashes"] = pickle.loads(data)
424
419 def handle_ping(self, _): 425 def handle_ping(self, _):
420 workerlog_write("Handling ping\n") 426 workerlog_write("Handling ping\n")
421 427
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index 519561c231..11b98f698d 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -149,7 +149,7 @@ class RunQueueScheduler(object):
149 Return the id of the first task we find that is buildable 149 Return the id of the first task we find that is buildable
150 """ 150 """
151 self.buildable = [x for x in self.buildable if x not in self.rq.runq_running] 151 self.buildable = [x for x in self.buildable if x not in self.rq.runq_running]
152 buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered)] 152 buildable = [x for x in self.buildable if (x in self.rq.tasks_covered or x in self.rq.tasks_notcovered) and x not in self.rq.holdoff_tasks]
153 if not buildable: 153 if not buildable:
154 return None 154 return None
155 155
@@ -206,6 +206,9 @@ class RunQueueScheduler(object):
206 def newbuildable(self, task): 206 def newbuildable(self, task):
207 self.buildable.append(task) 207 self.buildable.append(task)
208 208
209 def removebuildable(self, task):
210 self.buildable.remove(task)
211
209 def describe_task(self, taskid): 212 def describe_task(self, taskid):
210 result = 'ID %s' % taskid 213 result = 'ID %s' % taskid
211 if self.rev_prio_map: 214 if self.rev_prio_map:
@@ -1719,6 +1722,8 @@ class RunQueueExecute:
1719 self.sq_running = set() 1722 self.sq_running = set()
1720 self.sq_live = set() 1723 self.sq_live = set()
1721 1724
1725 self.changed_setscene = set()
1726
1722 self.runq_buildable = set() 1727 self.runq_buildable = set()
1723 self.runq_running = set() 1728 self.runq_running = set()
1724 self.runq_complete = set() 1729 self.runq_complete = set()
@@ -1730,6 +1735,7 @@ class RunQueueExecute:
1730 1735
1731 self.stampcache = {} 1736 self.stampcache = {}
1732 1737
1738 self.holdoff_tasks = set()
1733 self.sqdone = False 1739 self.sqdone = False
1734 1740
1735 self.stats = RunQueueStats(len(self.rqdata.runtaskentries)) 1741 self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
@@ -1925,6 +1931,7 @@ class RunQueueExecute:
1925 """ 1931 """
1926 1932
1927 self.rq.read_workers() 1933 self.rq.read_workers()
1934 self.process_possible_migrations()
1928 1935
1929 task = None 1936 task = None
1930 if not self.sqdone and self.can_start_task(): 1937 if not self.sqdone and self.can_start_task():
@@ -2007,7 +2014,7 @@ class RunQueueExecute:
2007 if self.can_start_task(): 2014 if self.can_start_task():
2008 return True 2015 return True
2009 2016
2010 if not self.sq_live and not self.sqdone and not self.sq_deferred: 2017 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks:
2011 logger.info("Setscene tasks completed") 2018 logger.info("Setscene tasks completed")
2012 logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2019 logger.debug(1, 'We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2013 2020
@@ -2167,6 +2174,131 @@ class RunQueueExecute:
2167 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2174 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2168 return taskdepdata 2175 return taskdepdata
2169 2176
2177 def updated_taskhash(self, tid, unihash):
2178 changed = set()
2179 if unihash != self.rqdata.runtaskentries[tid].unihash:
2180 logger.info("Task %s unihash changed to %s" % (tid, unihash))
2181 self.rqdata.runtaskentries[tid].unihash = unihash
2182 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2183 bb.parse.siggen.set_unihash(taskfn + "." + taskname, unihash)
2184
2185 # Work out all tasks which depend on this one
2186 total = set()
2187 next = set(self.rqdata.runtaskentries[tid].revdeps)
2188 while next:
2189 current = next.copy()
2190 total = total |next
2191 next = set()
2192 for ntid in current:
2193 next |= self.rqdata.runtaskentries[ntid].revdeps
2194 next.difference_update(total)
2195
2196 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2197 done = set()
2198 next = set(self.rqdata.runtaskentries[tid].revdeps)
2199 while next:
2200 current = next.copy()
2201 next = set()
2202 for tid in current:
2203 if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2204 continue
2205 procdep = []
2206 for dep in self.rqdata.runtaskentries[tid].depends:
2207 procdep.append(fn_from_tid(dep) + "." + taskname_from_tid(dep))
2208 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2209 orighash = self.rqdata.runtaskentries[tid].hash
2210 self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(taskfn, taskname, procdep, self.rqdata.dataCaches[mc])
2211 origuni = self.rqdata.runtaskentries[tid].unihash
2212 self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(taskfn + "." + taskname)
2213 logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
2214 next |= self.rqdata.runtaskentries[tid].revdeps
2215 changed.add(tid)
2216 total.remove(tid)
2217 next.intersection_update(total)
2218
2219 if changed:
2220 for mc in self.rq.worker:
2221 self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2222 for mc in self.rq.fakeworker:
2223 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2224
2225 logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
2226
2227 for tid in changed:
2228 if tid not in self.rqdata.runq_setscene_tids:
2229 continue
2230 valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
2231 if not valid:
2232 continue
2233 self.changed_setscene.add(tid)
2234
2235 if changed:
2236 self.update_holdofftasks()
2237
2238 def update_holdofftasks(self):
2239 self.holdoff_tasks = set(self.changed_setscene)
2240
2241 for tid in self.rqdata.runq_setscene_tids:
2242 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2243 self.holdoff_tasks.add(tid)
2244
2245 for tid in self.holdoff_tasks.copy():
2246 for dep in self.sqdata.sq_covered_tasks[tid]:
2247 if dep not in self.runq_complete:
2248 self.holdoff_tasks.add(dep)
2249 logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
2250
2251 def process_possible_migrations(self):
2252 changes = False
2253 for tid in self.changed_setscene.copy():
2254 if tid in self.runq_running:
2255 self.changed_setscene.remove(tid)
2256 continue
2257
2258 valid = True
2259 # Check no tasks this covers are running
2260 for dep in self.sqdata.sq_covered_tasks[tid]:
2261 if dep in self.runq_running and dep not in self.runq_complete:
2262 logger.debug(2, "Task %s is running which blocks setscene for %s from running" % (dep, tid))
2263 valid = False
2264 break
2265 if not valid:
2266 continue
2267
2268 for dep in self.sqdata.sq_covered_tasks[tid]:
2269 if dep not in self.runq_complete:
2270 if dep in self.tasks_scenequeue_done:
2271 self.tasks_scenequeue_done.remove(dep)
2272 if dep in self.tasks_notcovered:
2273 self.tasks_notcovered.remove(dep)
2274
2275 if tid in self.sq_buildable:
2276 self.sq_buildable.remove(tid)
2277 if tid in self.sq_running:
2278 self.sq_running.remove(tid)
2279 if self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2280 if tid not in self.sq_buildable:
2281 self.sq_buildable.add(tid)
2282
2283 if tid in self.sqdata.outrightfail:
2284 self.sqdata.outrightfail.remove(tid)
2285 if tid in self.scenequeue_notcovered:
2286 self.scenequeue_notcovered.remove(tid)
2287
2288 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2289 self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
2290
2291 if tid in self.build_stamps:
2292 del self.build_stamps[tid]
2293
2294 logger.info("Setscene task %s now valid and being rerun" % tid)
2295 self.sqdone = False
2296 self.changed_setscene.remove(tid)
2297 changes = True
2298
2299 if changes:
2300 self.update_holdofftasks()
2301
2170 def scenequeue_process_notcovered(self, task): 2302 def scenequeue_process_notcovered(self, task):
2171 if len(self.rqdata.runtaskentries[task].depends) == 0: 2303 if len(self.rqdata.runtaskentries[task].depends) == 0:
2172 self.setbuildable(task) 2304 self.setbuildable(task)
@@ -2194,7 +2326,7 @@ class RunQueueExecute:
2194 for deptask in self.rqdata.runtaskentries[t].revdeps: 2326 for deptask in self.rqdata.runtaskentries[t].revdeps:
2195 if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids: 2327 if deptask in ready or deptask in new or deptask in self.tasks_scenequeue_done or deptask in self.rqdata.runq_setscene_tids:
2196 continue 2328 continue
2197 if self.rqdata.runtaskentries[deptask].depends.issubset(self.tasks_scenequeue_done): 2329 if deptask in self.sqdata.unskippable:
2198 new.add(deptask) 2330 new.add(deptask)
2199 self.tasks_scenequeue_done.add(deptask) 2331 self.tasks_scenequeue_done.add(deptask)
2200 self.tasks_notcovered.add(deptask) 2332 self.tasks_notcovered.add(deptask)
@@ -2254,8 +2386,9 @@ class RunQueueExecute:
2254 self.tasks_covered.update(covered) 2386 self.tasks_covered.update(covered)
2255 self.coveredtopocess.remove(task) 2387 self.coveredtopocess.remove(task)
2256 for tid in covered: 2388 for tid in covered:
2257 if len(self.rqdata.runtaskentries[tid].depends) == 0: 2389 if self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2258 self.setbuildable(tid) 2390 self.setbuildable(tid)
2391 self.update_holdofftasks()
2259 2392
2260 def sq_task_completeoutright(self, task): 2393 def sq_task_completeoutright(self, task):
2261 """ 2394 """
@@ -2454,8 +2587,8 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2454 2587
2455 rqdata.init_progress_reporter.next_stage() 2588 rqdata.init_progress_reporter.next_stage()
2456 2589
2457 # Build a list of setscene tasks which are "unskippable" 2590 # Build a list of tasks which are "unskippable"
2458 # These are direct endpoints referenced by the build 2591 # These are direct endpoints referenced by the build upto and including setscene tasks
2459 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 2592 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
2460 new = True 2593 new = True
2461 for tid in rqdata.runtaskentries: 2594 for tid in rqdata.runtaskentries:
@@ -2463,18 +2596,19 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2463 sqdata.unskippable.add(tid) 2596 sqdata.unskippable.add(tid)
2464 while new: 2597 while new:
2465 new = False 2598 new = False
2466 for tid in sqdata.unskippable.copy(): 2599 orig = sqdata.unskippable.copy()
2600 for tid in orig:
2467 if tid in rqdata.runq_setscene_tids: 2601 if tid in rqdata.runq_setscene_tids:
2468 continue 2602 continue
2469 sqdata.unskippable.remove(tid)
2470 if len(rqdata.runtaskentries[tid].depends) == 0: 2603 if len(rqdata.runtaskentries[tid].depends) == 0:
2471 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 2604 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2472 sqrq.tasks_notcovered.add(tid) 2605 sqrq.tasks_notcovered.add(tid)
2473 sqrq.tasks_scenequeue_done.add(tid) 2606 sqrq.tasks_scenequeue_done.add(tid)
2474 sqrq.setbuildable(tid) 2607 sqrq.setbuildable(tid)
2475 sqrq.scenequeue_process_unskippable(tid) 2608 sqrq.scenequeue_process_unskippable(tid)
2476 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 2609 sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2477 new = True 2610 if sqdata.unskippable != orig:
2611 new = True
2478 2612
2479 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 2613 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
2480 2614
@@ -2710,6 +2844,15 @@ class runQueueTaskSkipped(runQueueEvent):
2710 runQueueEvent.__init__(self, task, stats, rq) 2844 runQueueEvent.__init__(self, task, stats, rq)
2711 self.reason = reason 2845 self.reason = reason
2712 2846
2847class taskUniHashUpdate(bb.event.Event):
2848 """
2849 Base runQueue event class
2850 """
2851 def __init__(self, task, unihash):
2852 self.taskid = task
2853 self.unihash = unihash
2854 bb.event.Event.__init__(self)
2855
2713class runQueuePipe(): 2856class runQueuePipe():
2714 """ 2857 """
2715 Abstraction for a pipe between a worker thread and the server 2858 Abstraction for a pipe between a worker thread and the server
@@ -2752,6 +2895,8 @@ class runQueuePipe():
2752 except ValueError as e: 2895 except ValueError as e:
2753 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 2896 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2754 bb.event.fire_from_worker(event, self.d) 2897 bb.event.fire_from_worker(event, self.d)
2898 if isinstance(event, taskUniHashUpdate):
2899 self.rqexec.updated_taskhash(event.taskid, event.unihash)
2755 found = True 2900 found = True
2756 self.queue = self.queue[index+8:] 2901 self.queue = self.queue[index+8:]
2757 index = self.queue.find(b"</event>") 2902 index = self.queue.find(b"</event>")