summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/runqueue.py
diff options
context:
space:
mode:
authorRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-14 11:48:25 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2019-08-14 17:28:23 +0100
commit977a293f2fe99e3cfacb55cae9f8d39b2ed0468e (patch)
treedd7f5466864d9a0a9e66176a47b534f0af47e7f9 /bitbake/lib/bb/runqueue.py
parent3fb90d0fffffd3a615edef648716b73b3b540e9e (diff)
downloadpoky-977a293f2fe99e3cfacb55cae9f8d39b2ed0468e.tar.gz
bitbake: runqueue: Fix event timing race
The event from the task notifiing of hash equivalency should only be processed when the task completes. This can otherwise result in a race where a dependent task may run before the original task completes causing various failures. To make this work reliably, the code had to be restructured quite a bit. (Bitbake rev: 1bf5be46f92f125193638cf41ff207d68f592259) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r--bitbake/lib/bb/runqueue.py141
1 files changed, 72 insertions, 69 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py
index eb8e342761..a04703c870 100644
--- a/bitbake/lib/bb/runqueue.py
+++ b/bitbake/lib/bb/runqueue.py
@@ -1696,7 +1696,8 @@ class RunQueueExecute:
1696 self.sq_running = set() 1696 self.sq_running = set()
1697 self.sq_live = set() 1697 self.sq_live = set()
1698 1698
1699 self.changed_setscene = set() 1699 self.updated_taskhash_queue = []
1700 self.pending_migrations = set()
1700 1701
1701 self.runq_buildable = set() 1702 self.runq_buildable = set()
1702 self.runq_running = set() 1703 self.runq_running = set()
@@ -1910,8 +1911,8 @@ class RunQueueExecute:
1910 if self.sq_deferred: 1911 if self.sq_deferred:
1911 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 1912 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
1912 err = True 1913 err = True
1913 if self.changed_setscene: 1914 if self.updated_taskhash_queue:
1914 logger.error("Scenequeue had unprocessed changed entries: %s" % pprint.pformat(self.changed_setscene)) 1915 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
1915 err = True 1916 err = True
1916 if self.holdoff_tasks: 1917 if self.holdoff_tasks:
1917 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 1918 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
@@ -2023,7 +2024,7 @@ class RunQueueExecute:
2023 if self.can_start_task(): 2024 if self.can_start_task():
2024 return True 2025 return True
2025 2026
2026 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: 2027 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2027 logger.info("Setscene tasks completed") 2028 logger.info("Setscene tasks completed")
2028 2029
2029 err = self.summarise_scenequeue_errors() 2030 err = self.summarise_scenequeue_errors()
@@ -2177,45 +2178,66 @@ class RunQueueExecute:
2177 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2178 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2178 return taskdepdata 2179 return taskdepdata
2179 2180
2180 def updated_taskhash(self, tid, unihash): 2181 def update_holdofftasks(self):
2182 self.holdoff_tasks = set()
2183
2184 for tid in self.rqdata.runq_setscene_tids:
2185 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2186 self.holdoff_tasks.add(tid)
2187
2188 for tid in self.holdoff_tasks.copy():
2189 for dep in self.sqdata.sq_covered_tasks[tid]:
2190 if dep not in self.runq_complete:
2191 self.holdoff_tasks.add(dep)
2192 logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
2193
2194
2195 def process_possible_migrations(self):
2196
2181 changed = set() 2197 changed = set()
2182 if unihash != self.rqdata.runtaskentries[tid].unihash: 2198 for tid, unihash in self.updated_taskhash_queue.copy():
2183 logger.info("Task %s unihash changed to %s" % (tid, unihash)) 2199 if tid in self.runq_running and tid not in self.runq_complete:
2184 self.rqdata.runtaskentries[tid].unihash = unihash 2200 continue
2185 bb.parse.siggen.set_unihash(tid, unihash) 2201
2186 2202 self.updated_taskhash_queue.remove((tid, unihash))
2187 # Work out all tasks which depend on this one 2203
2188 total = set() 2204 if unihash != self.rqdata.runtaskentries[tid].unihash:
2189 next = set(self.rqdata.runtaskentries[tid].revdeps) 2205 logger.info("Task %s unihash changed to %s" % (tid, unihash))
2190 while next: 2206 self.rqdata.runtaskentries[tid].unihash = unihash
2191 current = next.copy() 2207 bb.parse.siggen.set_unihash(tid, unihash)
2192 total = total |next 2208
2193 next = set() 2209 # Work out all tasks which depend on this one
2194 for ntid in current: 2210 total = set()
2195 next |= self.rqdata.runtaskentries[ntid].revdeps 2211 next = set(self.rqdata.runtaskentries[tid].revdeps)
2196 next.difference_update(total) 2212 while next:
2197 2213 current = next.copy()
2198 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2214 total = total |next
2199 done = set() 2215 next = set()
2200 next = set(self.rqdata.runtaskentries[tid].revdeps) 2216 for ntid in current:
2201 while next: 2217 next |= self.rqdata.runtaskentries[ntid].revdeps
2202 current = next.copy() 2218 next.difference_update(total)
2203 next = set() 2219
2204 for tid in current: 2220 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2205 if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2221 done = set()
2206 continue 2222 next = set(self.rqdata.runtaskentries[tid].revdeps)
2207 procdep = [] 2223 while next:
2208 for dep in self.rqdata.runtaskentries[tid].depends: 2224 current = next.copy()
2209 procdep.append(dep) 2225 next = set()
2210 orighash = self.rqdata.runtaskentries[tid].hash 2226 for tid in current:
2211 self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)]) 2227 if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2212 origuni = self.rqdata.runtaskentries[tid].unihash 2228 continue
2213 self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) 2229 procdep = []
2214 logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) 2230 for dep in self.rqdata.runtaskentries[tid].depends:
2215 next |= self.rqdata.runtaskentries[tid].revdeps 2231 procdep.append(dep)
2216 changed.add(tid) 2232 orighash = self.rqdata.runtaskentries[tid].hash
2217 total.remove(tid) 2233 self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)])
2218 next.intersection_update(total) 2234 origuni = self.rqdata.runtaskentries[tid].unihash
2235 self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
2236 logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash))
2237 next |= self.rqdata.runtaskentries[tid].revdeps
2238 changed.add(tid)
2239 total.remove(tid)
2240 next.intersection_update(total)
2219 2241
2220 if changed: 2242 if changed:
2221 for mc in self.rq.worker: 2243 for mc in self.rq.worker:
@@ -2223,7 +2245,7 @@ class RunQueueExecute:
2223 for mc in self.rq.fakeworker: 2245 for mc in self.rq.fakeworker:
2224 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") 2246 self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2225 2247
2226 logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) 2248 logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed)))
2227 2249
2228 for tid in changed: 2250 for tid in changed:
2229 if tid not in self.rqdata.runq_setscene_tids: 2251 if tid not in self.rqdata.runq_setscene_tids:
@@ -2231,31 +2253,12 @@ class RunQueueExecute:
2231 valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) 2253 valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False)
2232 if not valid: 2254 if not valid:
2233 continue 2255 continue
2234 self.changed_setscene.add(tid)
2235
2236 if changed:
2237 self.update_holdofftasks()
2238
2239 def update_holdofftasks(self):
2240 self.holdoff_tasks = set()
2241
2242 for tid in self.rqdata.runq_setscene_tids:
2243 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2244 self.holdoff_tasks.add(tid)
2245
2246 for tid in self.holdoff_tasks.copy():
2247 for dep in self.sqdata.sq_covered_tasks[tid]:
2248 if dep not in self.runq_complete:
2249 self.holdoff_tasks.add(dep)
2250 logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks))
2251
2252 def process_possible_migrations(self):
2253 changes = False
2254 for tid in self.changed_setscene.copy():
2255 if tid in self.runq_running: 2256 if tid in self.runq_running:
2256 self.changed_setscene.remove(tid)
2257 continue 2257 continue
2258 if tid not in self.pending_migrations:
2259 self.pending_migrations.add(tid)
2258 2260
2261 for tid in self.pending_migrations.copy():
2259 valid = True 2262 valid = True
2260 # Check no tasks this covers are running 2263 # Check no tasks this covers are running
2261 for dep in self.sqdata.sq_covered_tasks[tid]: 2264 for dep in self.sqdata.sq_covered_tasks[tid]:
@@ -2266,6 +2269,8 @@ class RunQueueExecute:
2266 if not valid: 2269 if not valid:
2267 continue 2270 continue
2268 2271
2272 self.pending_migrations.remove(tid)
2273
2269 if tid in self.tasks_scenequeue_done: 2274 if tid in self.tasks_scenequeue_done:
2270 self.tasks_scenequeue_done.remove(tid) 2275 self.tasks_scenequeue_done.remove(tid)
2271 for dep in self.sqdata.sq_covered_tasks[tid]: 2276 for dep in self.sqdata.sq_covered_tasks[tid]:
@@ -2296,10 +2301,8 @@ class RunQueueExecute:
2296 2301
2297 logger.info("Setscene task %s now valid and being rerun" % tid) 2302 logger.info("Setscene task %s now valid and being rerun" % tid)
2298 self.sqdone = False 2303 self.sqdone = False
2299 self.changed_setscene.remove(tid)
2300 changes = True
2301 2304
2302 if changes: 2305 if changed:
2303 self.update_holdofftasks() 2306 self.update_holdofftasks()
2304 2307
2305 def scenequeue_updatecounters(self, task, fail=False): 2308 def scenequeue_updatecounters(self, task, fail=False):
@@ -2854,7 +2857,7 @@ class runQueuePipe():
2854 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 2857 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
2855 bb.event.fire_from_worker(event, self.d) 2858 bb.event.fire_from_worker(event, self.d)
2856 if isinstance(event, taskUniHashUpdate): 2859 if isinstance(event, taskUniHashUpdate):
2857 self.rqexec.updated_taskhash(event.taskid, event.unihash) 2860 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
2858 found = True 2861 found = True
2859 self.queue = self.queue[index+8:] 2862 self.queue = self.queue[index+8:]
2860 index = self.queue.find(b"</event>") 2863 index = self.queue.find(b"</event>")