diff options
author | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-08-14 11:48:25 +0100 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2019-08-14 17:28:23 +0100 |
commit | 977a293f2fe99e3cfacb55cae9f8d39b2ed0468e (patch) | |
tree | dd7f5466864d9a0a9e66176a47b534f0af47e7f9 /bitbake/lib/bb/runqueue.py | |
parent | 3fb90d0fffffd3a615edef648716b73b3b540e9e (diff) | |
download | poky-977a293f2fe99e3cfacb55cae9f8d39b2ed0468e.tar.gz |
bitbake: runqueue: Fix event timing race
The event from the task notifiing of hash equivalency should only be processed
when the task completes. This can otherwise result in a race where a dependent
task may run before the original task completes causing various failures.
To make this work reliably, the code had to be restructured quite a bit.
(Bitbake rev: 1bf5be46f92f125193638cf41ff207d68f592259)
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/runqueue.py')
-rw-r--r-- | bitbake/lib/bb/runqueue.py | 141 |
1 files changed, 72 insertions, 69 deletions
diff --git a/bitbake/lib/bb/runqueue.py b/bitbake/lib/bb/runqueue.py index eb8e342761..a04703c870 100644 --- a/bitbake/lib/bb/runqueue.py +++ b/bitbake/lib/bb/runqueue.py | |||
@@ -1696,7 +1696,8 @@ class RunQueueExecute: | |||
1696 | self.sq_running = set() | 1696 | self.sq_running = set() |
1697 | self.sq_live = set() | 1697 | self.sq_live = set() |
1698 | 1698 | ||
1699 | self.changed_setscene = set() | 1699 | self.updated_taskhash_queue = [] |
1700 | self.pending_migrations = set() | ||
1700 | 1701 | ||
1701 | self.runq_buildable = set() | 1702 | self.runq_buildable = set() |
1702 | self.runq_running = set() | 1703 | self.runq_running = set() |
@@ -1910,8 +1911,8 @@ class RunQueueExecute: | |||
1910 | if self.sq_deferred: | 1911 | if self.sq_deferred: |
1911 | logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) | 1912 | logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) |
1912 | err = True | 1913 | err = True |
1913 | if self.changed_setscene: | 1914 | if self.updated_taskhash_queue: |
1914 | logger.error("Scenequeue had unprocessed changed entries: %s" % pprint.pformat(self.changed_setscene)) | 1915 | logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) |
1915 | err = True | 1916 | err = True |
1916 | if self.holdoff_tasks: | 1917 | if self.holdoff_tasks: |
1917 | logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) | 1918 | logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) |
@@ -2023,7 +2024,7 @@ class RunQueueExecute: | |||
2023 | if self.can_start_task(): | 2024 | if self.can_start_task(): |
2024 | return True | 2025 | return True |
2025 | 2026 | ||
2026 | if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.changed_setscene and not self.holdoff_tasks: | 2027 | if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: |
2027 | logger.info("Setscene tasks completed") | 2028 | logger.info("Setscene tasks completed") |
2028 | 2029 | ||
2029 | err = self.summarise_scenequeue_errors() | 2030 | err = self.summarise_scenequeue_errors() |
@@ -2177,45 +2178,66 @@ class RunQueueExecute: | |||
2177 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) | 2178 | #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) |
2178 | return taskdepdata | 2179 | return taskdepdata |
2179 | 2180 | ||
2180 | def updated_taskhash(self, tid, unihash): | 2181 | def update_holdofftasks(self): |
2182 | self.holdoff_tasks = set() | ||
2183 | |||
2184 | for tid in self.rqdata.runq_setscene_tids: | ||
2185 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: | ||
2186 | self.holdoff_tasks.add(tid) | ||
2187 | |||
2188 | for tid in self.holdoff_tasks.copy(): | ||
2189 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2190 | if dep not in self.runq_complete: | ||
2191 | self.holdoff_tasks.add(dep) | ||
2192 | logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) | ||
2193 | |||
2194 | |||
2195 | def process_possible_migrations(self): | ||
2196 | |||
2181 | changed = set() | 2197 | changed = set() |
2182 | if unihash != self.rqdata.runtaskentries[tid].unihash: | 2198 | for tid, unihash in self.updated_taskhash_queue.copy(): |
2183 | logger.info("Task %s unihash changed to %s" % (tid, unihash)) | 2199 | if tid in self.runq_running and tid not in self.runq_complete: |
2184 | self.rqdata.runtaskentries[tid].unihash = unihash | 2200 | continue |
2185 | bb.parse.siggen.set_unihash(tid, unihash) | 2201 | |
2186 | 2202 | self.updated_taskhash_queue.remove((tid, unihash)) | |
2187 | # Work out all tasks which depend on this one | 2203 | |
2188 | total = set() | 2204 | if unihash != self.rqdata.runtaskentries[tid].unihash: |
2189 | next = set(self.rqdata.runtaskentries[tid].revdeps) | 2205 | logger.info("Task %s unihash changed to %s" % (tid, unihash)) |
2190 | while next: | 2206 | self.rqdata.runtaskentries[tid].unihash = unihash |
2191 | current = next.copy() | 2207 | bb.parse.siggen.set_unihash(tid, unihash) |
2192 | total = total |next | 2208 | |
2193 | next = set() | 2209 | # Work out all tasks which depend on this one |
2194 | for ntid in current: | 2210 | total = set() |
2195 | next |= self.rqdata.runtaskentries[ntid].revdeps | 2211 | next = set(self.rqdata.runtaskentries[tid].revdeps) |
2196 | next.difference_update(total) | 2212 | while next: |
2197 | 2213 | current = next.copy() | |
2198 | # Now iterate those tasks in dependency order to regenerate their taskhash/unihash | 2214 | total = total |next |
2199 | done = set() | 2215 | next = set() |
2200 | next = set(self.rqdata.runtaskentries[tid].revdeps) | 2216 | for ntid in current: |
2201 | while next: | 2217 | next |= self.rqdata.runtaskentries[ntid].revdeps |
2202 | current = next.copy() | 2218 | next.difference_update(total) |
2203 | next = set() | 2219 | |
2204 | for tid in current: | 2220 | # Now iterate those tasks in dependency order to regenerate their taskhash/unihash |
2205 | if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): | 2221 | done = set() |
2206 | continue | 2222 | next = set(self.rqdata.runtaskentries[tid].revdeps) |
2207 | procdep = [] | 2223 | while next: |
2208 | for dep in self.rqdata.runtaskentries[tid].depends: | 2224 | current = next.copy() |
2209 | procdep.append(dep) | 2225 | next = set() |
2210 | orighash = self.rqdata.runtaskentries[tid].hash | 2226 | for tid in current: |
2211 | self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)]) | 2227 | if not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): |
2212 | origuni = self.rqdata.runtaskentries[tid].unihash | 2228 | continue |
2213 | self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) | 2229 | procdep = [] |
2214 | logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) | 2230 | for dep in self.rqdata.runtaskentries[tid].depends: |
2215 | next |= self.rqdata.runtaskentries[tid].revdeps | 2231 | procdep.append(dep) |
2216 | changed.add(tid) | 2232 | orighash = self.rqdata.runtaskentries[tid].hash |
2217 | total.remove(tid) | 2233 | self.rqdata.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, procdep, self.rqdata.dataCaches[mc_from_tid(tid)]) |
2218 | next.intersection_update(total) | 2234 | origuni = self.rqdata.runtaskentries[tid].unihash |
2235 | self.rqdata.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) | ||
2236 | logger.debug(1, "Task %s hash changes: %s->%s %s->%s" % (tid, orighash, self.rqdata.runtaskentries[tid].hash, origuni, self.rqdata.runtaskentries[tid].unihash)) | ||
2237 | next |= self.rqdata.runtaskentries[tid].revdeps | ||
2238 | changed.add(tid) | ||
2239 | total.remove(tid) | ||
2240 | next.intersection_update(total) | ||
2219 | 2241 | ||
2220 | if changed: | 2242 | if changed: |
2221 | for mc in self.rq.worker: | 2243 | for mc in self.rq.worker: |
@@ -2223,7 +2245,7 @@ class RunQueueExecute: | |||
2223 | for mc in self.rq.fakeworker: | 2245 | for mc in self.rq.fakeworker: |
2224 | self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") | 2246 | self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") |
2225 | 2247 | ||
2226 | logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) | 2248 | logger.debug(1, pprint.pformat("Tasks changed:\n%s" % (changed))) |
2227 | 2249 | ||
2228 | for tid in changed: | 2250 | for tid in changed: |
2229 | if tid not in self.rqdata.runq_setscene_tids: | 2251 | if tid not in self.rqdata.runq_setscene_tids: |
@@ -2231,31 +2253,12 @@ class RunQueueExecute: | |||
2231 | valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) | 2253 | valid = self.rq.validate_hashes(set([tid]), self.cooker.data, None, False) |
2232 | if not valid: | 2254 | if not valid: |
2233 | continue | 2255 | continue |
2234 | self.changed_setscene.add(tid) | ||
2235 | |||
2236 | if changed: | ||
2237 | self.update_holdofftasks() | ||
2238 | |||
2239 | def update_holdofftasks(self): | ||
2240 | self.holdoff_tasks = set() | ||
2241 | |||
2242 | for tid in self.rqdata.runq_setscene_tids: | ||
2243 | if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: | ||
2244 | self.holdoff_tasks.add(tid) | ||
2245 | |||
2246 | for tid in self.holdoff_tasks.copy(): | ||
2247 | for dep in self.sqdata.sq_covered_tasks[tid]: | ||
2248 | if dep not in self.runq_complete: | ||
2249 | self.holdoff_tasks.add(dep) | ||
2250 | logger.debug(2, "Holding off tasks %s" % pprint.pformat(self.holdoff_tasks)) | ||
2251 | |||
2252 | def process_possible_migrations(self): | ||
2253 | changes = False | ||
2254 | for tid in self.changed_setscene.copy(): | ||
2255 | if tid in self.runq_running: | 2256 | if tid in self.runq_running: |
2256 | self.changed_setscene.remove(tid) | ||
2257 | continue | 2257 | continue |
2258 | if tid not in self.pending_migrations: | ||
2259 | self.pending_migrations.add(tid) | ||
2258 | 2260 | ||
2261 | for tid in self.pending_migrations.copy(): | ||
2259 | valid = True | 2262 | valid = True |
2260 | # Check no tasks this covers are running | 2263 | # Check no tasks this covers are running |
2261 | for dep in self.sqdata.sq_covered_tasks[tid]: | 2264 | for dep in self.sqdata.sq_covered_tasks[tid]: |
@@ -2266,6 +2269,8 @@ class RunQueueExecute: | |||
2266 | if not valid: | 2269 | if not valid: |
2267 | continue | 2270 | continue |
2268 | 2271 | ||
2272 | self.pending_migrations.remove(tid) | ||
2273 | |||
2269 | if tid in self.tasks_scenequeue_done: | 2274 | if tid in self.tasks_scenequeue_done: |
2270 | self.tasks_scenequeue_done.remove(tid) | 2275 | self.tasks_scenequeue_done.remove(tid) |
2271 | for dep in self.sqdata.sq_covered_tasks[tid]: | 2276 | for dep in self.sqdata.sq_covered_tasks[tid]: |
@@ -2296,10 +2301,8 @@ class RunQueueExecute: | |||
2296 | 2301 | ||
2297 | logger.info("Setscene task %s now valid and being rerun" % tid) | 2302 | logger.info("Setscene task %s now valid and being rerun" % tid) |
2298 | self.sqdone = False | 2303 | self.sqdone = False |
2299 | self.changed_setscene.remove(tid) | ||
2300 | changes = True | ||
2301 | 2304 | ||
2302 | if changes: | 2305 | if changed: |
2303 | self.update_holdofftasks() | 2306 | self.update_holdofftasks() |
2304 | 2307 | ||
2305 | def scenequeue_updatecounters(self, task, fail=False): | 2308 | def scenequeue_updatecounters(self, task, fail=False): |
@@ -2854,7 +2857,7 @@ class runQueuePipe(): | |||
2854 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) | 2857 | bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) |
2855 | bb.event.fire_from_worker(event, self.d) | 2858 | bb.event.fire_from_worker(event, self.d) |
2856 | if isinstance(event, taskUniHashUpdate): | 2859 | if isinstance(event, taskUniHashUpdate): |
2857 | self.rqexec.updated_taskhash(event.taskid, event.unihash) | 2860 | self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) |
2858 | found = True | 2861 | found = True |
2859 | self.queue = self.queue[index+8:] | 2862 | self.queue = self.queue[index+8:] |
2860 | index = self.queue.find(b"</event>") | 2863 | index = self.queue.find(b"</event>") |