diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-02-18 15:59:48 -0700 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-02-19 11:58:12 +0000 |
commit | 3bd2c69e70853584beaaa5a4fd62589fa051d911 (patch) | |
tree | 3dc6469a1895cc7a31a58e4fc9dc43cef9a82cfb | |
parent | be909636c608d5ba24a41327c53d6a4ba3b70151 (diff) | |
download | poky-3bd2c69e70853584beaaa5a4fd62589fa051d911.tar.gz |
bitbake: hashserv: Add unihash-exists API
Adds API to check if the server is aware of the existence of a given
unihash. This can be used as an optimization for sstate where a client
can query the hash equivalence server to check if a unihash exists
before querying the sstate cache. If the hash server isn't aware of the
existence of a unihash, then there is very likely not a matching sstate
object, so this should be able to significantly cut down on the number
of negative hits on the sstate cache.
(Bitbake rev: cfe0ac071cfb998e4a1dd263f8860b140843361a)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-x | bitbake/bin/bitbake-hashclient | 13 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 44 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 61 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 11 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlite.py | 16 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 39 |
6 files changed, 151 insertions, 33 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index f71b87404a..47dd27cd3c 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient | |||
@@ -217,6 +217,14 @@ def main(): | |||
217 | print("Removed %d rows" % result["count"]) | 217 | print("Removed %d rows" % result["count"]) |
218 | return 0 | 218 | return 0 |
219 | 219 | ||
220 | def handle_unihash_exists(args, client): | ||
221 | result = client.unihash_exists(args.unihash) | ||
222 | if args.quiet: | ||
223 | return 0 if result else 1 | ||
224 | |||
225 | print("true" if result else "false") | ||
226 | return 0 | ||
227 | |||
220 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') | 228 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') |
221 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') | 229 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') |
222 | parser.add_argument('--log', default='WARNING', help='Set logging level') | 230 | parser.add_argument('--log', default='WARNING', help='Set logging level') |
@@ -309,6 +317,11 @@ def main(): | |||
309 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") | 317 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") |
310 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) | 318 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) |
311 | 319 | ||
320 | unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server") | ||
321 | unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not") | ||
322 | unihash_exists_parser.add_argument("unihash", help="Unihash to check") | ||
323 | unihash_exists_parser.set_defaults(func=handle_unihash_exists) | ||
324 | |||
312 | args = parser.parse_args() | 325 | args = parser.parse_args() |
313 | 326 | ||
314 | logger = logging.getLogger('hashserv') | 327 | logger = logging.getLogger('hashserv') |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e6dc417912..daf1e12842 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") | |||
16 | class AsyncClient(bb.asyncrpc.AsyncClient): | 16 | class AsyncClient(bb.asyncrpc.AsyncClient): |
17 | MODE_NORMAL = 0 | 17 | MODE_NORMAL = 0 |
18 | MODE_GET_STREAM = 1 | 18 | MODE_GET_STREAM = 1 |
19 | MODE_EXIST_STREAM = 2 | ||
19 | 20 | ||
20 | def __init__(self, username=None, password=None): | 21 | def __init__(self, username=None, password=None): |
21 | super().__init__("OEHASHEQUIV", "1.1", logger) | 22 | super().__init__("OEHASHEQUIV", "1.1", logger) |
@@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
49 | await self.socket.send("END") | 50 | await self.socket.send("END") |
50 | return await self.socket.recv() | 51 | return await self.socket.recv() |
51 | 52 | ||
52 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 53 | async def normal_to_stream(command): |
54 | r = await self.invoke({command: None}) | ||
55 | if r != "ok": | ||
56 | raise ConnectionError( | ||
57 | f"Unable to transition to stream mode: Bad response from server {r!r}" | ||
58 | ) | ||
59 | |||
60 | self.logger.debug("Mode is now %s", command) | ||
61 | |||
62 | if new_mode == self.mode: | ||
63 | return | ||
64 | |||
65 | self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) | ||
66 | |||
67 | # Always transition to normal mode before switching to any other mode | ||
68 | if self.mode != self.MODE_NORMAL: | ||
53 | r = await self._send_wrapper(stream_to_normal) | 69 | r = await self._send_wrapper(stream_to_normal) |
54 | if r != "ok": | 70 | if r != "ok": |
55 | self.check_invoke_error(r) | 71 | self.check_invoke_error(r) |
56 | raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) | 72 | raise ConnectionError( |
57 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 73 | f"Unable to transition to normal mode: Bad response from server {r!r}" |
58 | r = await self.invoke({"get-stream": None}) | 74 | ) |
59 | if r != "ok": | 75 | self.logger.debug("Mode is now normal") |
60 | raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) | 76 | |
61 | elif new_mode != self.mode: | 77 | if new_mode == self.MODE_GET_STREAM: |
62 | raise Exception( | 78 | await normal_to_stream("get-stream") |
63 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 79 | elif new_mode == self.MODE_EXIST_STREAM: |
64 | ) | 80 | await normal_to_stream("exists-stream") |
81 | elif new_mode != self.MODE_NORMAL: | ||
82 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") | ||
65 | 83 | ||
66 | self.mode = new_mode | 84 | self.mode = new_mode |
67 | 85 | ||
@@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
95 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 113 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
96 | ) | 114 | ) |
97 | 115 | ||
116 | async def unihash_exists(self, unihash): | ||
117 | await self._set_mode(self.MODE_EXIST_STREAM) | ||
118 | r = await self.send_stream(unihash) | ||
119 | return r == "true" | ||
120 | |||
98 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
99 | await self._set_mode(self.MODE_NORMAL) | 122 | await self._set_mode(self.MODE_NORMAL) |
100 | return await self.invoke( | 123 | return await self.invoke( |
@@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): | |||
236 | "report_unihash", | 259 | "report_unihash", |
237 | "report_unihash_equiv", | 260 | "report_unihash_equiv", |
238 | "get_taskhash", | 261 | "get_taskhash", |
262 | "unihash_exists", | ||
239 | "get_outhash", | 263 | "get_outhash", |
240 | "get_stats", | 264 | "get_stats", |
241 | "reset_stats", | 265 | "reset_stats", |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 5ed852d1f3..68f64f983b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
234 | "get": self.handle_get, | 234 | "get": self.handle_get, |
235 | "get-outhash": self.handle_get_outhash, | 235 | "get-outhash": self.handle_get_outhash, |
236 | "get-stream": self.handle_get_stream, | 236 | "get-stream": self.handle_get_stream, |
237 | "exists-stream": self.handle_exists_stream, | ||
237 | "get-stats": self.handle_get_stats, | 238 | "get-stats": self.handle_get_stats, |
238 | "get-db-usage": self.handle_get_db_usage, | 239 | "get-db-usage": self.handle_get_db_usage, |
239 | "get-db-query-columns": self.handle_get_db_query_columns, | 240 | "get-db-query-columns": self.handle_get_db_query_columns, |
@@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
377 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) | 378 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
378 | await self.db.insert_outhash(data) | 379 | await self.db.insert_outhash(data) |
379 | 380 | ||
380 | @permissions(READ_PERM) | 381 | async def _stream_handler(self, handler): |
381 | async def handle_get_stream(self, request): | ||
382 | await self.socket.send_message("ok") | 382 | await self.socket.send_message("ok") |
383 | 383 | ||
384 | while True: | 384 | while True: |
@@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
400 | if l == "END": | 400 | if l == "END": |
401 | break | 401 | break |
402 | 402 | ||
403 | (method, taskhash) = l.split() | 403 | msg = await handler(l) |
404 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) | ||
405 | row = await self.db.get_equivalent(method, taskhash) | ||
406 | |||
407 | if row is not None: | ||
408 | msg = row["unihash"] | ||
409 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | ||
410 | elif self.upstream_client is not None: | ||
411 | upstream = await self.upstream_client.get_unihash(method, taskhash) | ||
412 | if upstream: | ||
413 | msg = upstream | ||
414 | else: | ||
415 | msg = "" | ||
416 | else: | ||
417 | msg = "" | ||
418 | |||
419 | await self.socket.send(msg) | 404 | await self.socket.send(msg) |
420 | finally: | 405 | finally: |
421 | request_measure.end() | 406 | request_measure.end() |
422 | self.request_sample.end() | 407 | self.request_sample.end() |
423 | 408 | ||
424 | # Post to the backfill queue after writing the result to minimize | ||
425 | # the turn around time on a request | ||
426 | if upstream is not None: | ||
427 | await self.server.backfill_queue.put((method, taskhash)) | ||
428 | |||
429 | await self.socket.send("ok") | 409 | await self.socket.send("ok") |
430 | return self.NO_RESPONSE | 410 | return self.NO_RESPONSE |
431 | 411 | ||
412 | @permissions(READ_PERM) | ||
413 | async def handle_get_stream(self, request): | ||
414 | async def handler(l): | ||
415 | (method, taskhash) = l.split() | ||
416 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) | ||
417 | row = await self.db.get_equivalent(method, taskhash) | ||
418 | |||
419 | if row is not None: | ||
420 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | ||
421 | return row["unihash"] | ||
422 | |||
423 | if self.upstream_client is not None: | ||
424 | upstream = await self.upstream_client.get_unihash(method, taskhash) | ||
425 | if upstream: | ||
426 | await self.server.backfill_queue.put((method, taskhash)) | ||
427 | return upstream | ||
428 | |||
429 | return "" | ||
430 | |||
431 | return await self._stream_handler(handler) | ||
432 | |||
433 | @permissions(READ_PERM) | ||
434 | async def handle_exists_stream(self, request): | ||
435 | async def handler(l): | ||
436 | if await self.db.unihash_exists(l): | ||
437 | return "true" | ||
438 | |||
439 | if self.upstream_client is not None: | ||
440 | if await self.upstream_client.unihash_exists(l): | ||
441 | return "true" | ||
442 | |||
443 | return "false" | ||
444 | |||
445 | return await self._stream_handler(handler) | ||
446 | |||
432 | async def report_readonly(self, data): | 447 | async def report_readonly(self, data): |
433 | method = data["method"] | 448 | method = data["method"] |
434 | outhash = data["outhash"] | 449 | outhash = data["outhash"] |
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 873547809a..0e28d738f5 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
@@ -48,6 +48,7 @@ class UnihashesV3(Base): | |||
48 | __table_args__ = ( | 48 | __table_args__ = ( |
49 | UniqueConstraint("method", "taskhash"), | 49 | UniqueConstraint("method", "taskhash"), |
50 | Index("taskhash_lookup_v4", "method", "taskhash"), | 50 | Index("taskhash_lookup_v4", "method", "taskhash"), |
51 | Index("unihash_lookup_v1", "unihash"), | ||
51 | ) | 52 | ) |
52 | 53 | ||
53 | 54 | ||
@@ -279,6 +280,16 @@ class Database(object): | |||
279 | ) | 280 | ) |
280 | return map_row(result.first()) | 281 | return map_row(result.first()) |
281 | 282 | ||
283 | async def unihash_exists(self, unihash): | ||
284 | async with self.db.begin(): | ||
285 | result = await self._execute( | ||
286 | select(UnihashesV3) | ||
287 | .where(UnihashesV3.unihash == unihash) | ||
288 | .limit(1) | ||
289 | ) | ||
290 | |||
291 | return result.first() is not None | ||
292 | |||
282 | async def get_outhash(self, method, outhash): | 293 | async def get_outhash(self, method, outhash): |
283 | async with self.db.begin(): | 294 | async with self.db.begin(): |
284 | result = await self._execute( | 295 | result = await self._execute( |
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index 608490730d..da2e844a03 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py | |||
@@ -145,6 +145,9 @@ class DatabaseEngine(object): | |||
145 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" | 145 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" |
146 | ) | 146 | ) |
147 | cursor.execute( | 147 | cursor.execute( |
148 | "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" | ||
149 | ) | ||
150 | cursor.execute( | ||
148 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" | 151 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" |
149 | ) | 152 | ) |
150 | cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") | 153 | cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") |
@@ -255,6 +258,19 @@ class Database(object): | |||
255 | ) | 258 | ) |
256 | return cursor.fetchone() | 259 | return cursor.fetchone() |
257 | 260 | ||
261 | async def unihash_exists(self, unihash): | ||
262 | with closing(self.db.cursor()) as cursor: | ||
263 | cursor.execute( | ||
264 | """ | ||
265 | SELECT * FROM unihashes_v3 WHERE unihash=:unihash | ||
266 | LIMIT 1 | ||
267 | """, | ||
268 | { | ||
269 | "unihash": unihash, | ||
270 | }, | ||
271 | ) | ||
272 | return cursor.fetchone() is not None | ||
273 | |||
258 | async def get_outhash(self, method, outhash): | 274 | async def get_outhash(self, method, outhash): |
259 | with closing(self.db.cursor()) as cursor: | 275 | with closing(self.db.cursor()) as cursor: |
260 | cursor.execute( | 276 | cursor.execute( |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index aeedab3575..fbbe81512a 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): | |||
442 | self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') | 442 | self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') |
443 | self.assertEqual(result['method'], self.METHOD) | 443 | self.assertEqual(result['method'], self.METHOD) |
444 | 444 | ||
445 | def test_unihash_exsits(self): | ||
446 | taskhash, outhash, unihash = self.create_test_hash(self.client) | ||
447 | self.assertTrue(self.client.unihash_exists(unihash)) | ||
448 | self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) | ||
449 | |||
445 | def test_ro_server(self): | 450 | def test_ro_server(self): |
446 | rw_server = self.start_server() | 451 | rw_server = self.start_server() |
447 | rw_client = self.start_client(rw_server.address) | 452 | rw_client = self.start_client(rw_server.address) |
@@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): | |||
1031 | def test_stress(self): | 1036 | def test_stress(self): |
1032 | self.run_hashclient(["--address", self.server_address, "stress"], check=True) | 1037 | self.run_hashclient(["--address", self.server_address, "stress"], check=True) |
1033 | 1038 | ||
1039 | def test_unihash_exsits(self): | ||
1040 | taskhash, outhash, unihash = self.create_test_hash(self.client) | ||
1041 | |||
1042 | p = self.run_hashclient([ | ||
1043 | "--address", self.server_address, | ||
1044 | "unihash-exists", unihash, | ||
1045 | ], check=True) | ||
1046 | self.assertEqual(p.stdout.strip(), "true") | ||
1047 | |||
1048 | p = self.run_hashclient([ | ||
1049 | "--address", self.server_address, | ||
1050 | "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', | ||
1051 | ], check=True) | ||
1052 | self.assertEqual(p.stdout.strip(), "false") | ||
1053 | |||
1054 | def test_unihash_exsits_quiet(self): | ||
1055 | taskhash, outhash, unihash = self.create_test_hash(self.client) | ||
1056 | |||
1057 | p = self.run_hashclient([ | ||
1058 | "--address", self.server_address, | ||
1059 | "unihash-exists", unihash, | ||
1060 | "--quiet", | ||
1061 | ]) | ||
1062 | self.assertEqual(p.returncode, 0) | ||
1063 | self.assertEqual(p.stdout.strip(), "") | ||
1064 | |||
1065 | p = self.run_hashclient([ | ||
1066 | "--address", self.server_address, | ||
1067 | "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', | ||
1068 | "--quiet", | ||
1069 | ]) | ||
1070 | self.assertEqual(p.returncode, 1) | ||
1071 | self.assertEqual(p.stdout.strip(), "") | ||
1072 | |||
1034 | def test_remove_taskhash(self): | 1073 | def test_remove_taskhash(self): |
1035 | taskhash, outhash, unihash = self.create_test_hash(self.client) | 1074 | taskhash, outhash, unihash = self.create_test_hash(self.client) |
1036 | self.run_hashclient([ | 1075 | self.run_hashclient([ |