diff options
| -rwxr-xr-x | bitbake/bin/bitbake-hashclient | 13 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 44 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 61 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 11 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/sqlite.py | 16 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 39 |
6 files changed, 151 insertions, 33 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index f71b87404a..47dd27cd3c 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient | |||
| @@ -217,6 +217,14 @@ def main(): | |||
| 217 | print("Removed %d rows" % result["count"]) | 217 | print("Removed %d rows" % result["count"]) |
| 218 | return 0 | 218 | return 0 |
| 219 | 219 | ||
| 220 | def handle_unihash_exists(args, client): | ||
| 221 | result = client.unihash_exists(args.unihash) | ||
| 222 | if args.quiet: | ||
| 223 | return 0 if result else 1 | ||
| 224 | |||
| 225 | print("true" if result else "false") | ||
| 226 | return 0 | ||
| 227 | |||
| 220 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') | 228 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') |
| 221 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') | 229 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') |
| 222 | parser.add_argument('--log', default='WARNING', help='Set logging level') | 230 | parser.add_argument('--log', default='WARNING', help='Set logging level') |
| @@ -309,6 +317,11 @@ def main(): | |||
| 309 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") | 317 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") |
| 310 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) | 318 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) |
| 311 | 319 | ||
| 320 | unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server") | ||
| 321 | unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not") | ||
| 322 | unihash_exists_parser.add_argument("unihash", help="Unihash to check") | ||
| 323 | unihash_exists_parser.set_defaults(func=handle_unihash_exists) | ||
| 324 | |||
| 312 | args = parser.parse_args() | 325 | args = parser.parse_args() |
| 313 | 326 | ||
| 314 | logger = logging.getLogger('hashserv') | 327 | logger = logging.getLogger('hashserv') |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e6dc417912..daf1e12842 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client") | |||
| 16 | class AsyncClient(bb.asyncrpc.AsyncClient): | 16 | class AsyncClient(bb.asyncrpc.AsyncClient): |
| 17 | MODE_NORMAL = 0 | 17 | MODE_NORMAL = 0 |
| 18 | MODE_GET_STREAM = 1 | 18 | MODE_GET_STREAM = 1 |
| 19 | MODE_EXIST_STREAM = 2 | ||
| 19 | 20 | ||
| 20 | def __init__(self, username=None, password=None): | 21 | def __init__(self, username=None, password=None): |
| 21 | super().__init__("OEHASHEQUIV", "1.1", logger) | 22 | super().__init__("OEHASHEQUIV", "1.1", logger) |
| @@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 49 | await self.socket.send("END") | 50 | await self.socket.send("END") |
| 50 | return await self.socket.recv() | 51 | return await self.socket.recv() |
| 51 | 52 | ||
| 52 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 53 | async def normal_to_stream(command): |
| 54 | r = await self.invoke({command: None}) | ||
| 55 | if r != "ok": | ||
| 56 | raise ConnectionError( | ||
| 57 | f"Unable to transition to stream mode: Bad response from server {r!r}" | ||
| 58 | ) | ||
| 59 | |||
| 60 | self.logger.debug("Mode is now %s", command) | ||
| 61 | |||
| 62 | if new_mode == self.mode: | ||
| 63 | return | ||
| 64 | |||
| 65 | self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) | ||
| 66 | |||
| 67 | # Always transition to normal mode before switching to any other mode | ||
| 68 | if self.mode != self.MODE_NORMAL: | ||
| 53 | r = await self._send_wrapper(stream_to_normal) | 69 | r = await self._send_wrapper(stream_to_normal) |
| 54 | if r != "ok": | 70 | if r != "ok": |
| 55 | self.check_invoke_error(r) | 71 | self.check_invoke_error(r) |
| 56 | raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) | 72 | raise ConnectionError( |
| 57 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 73 | f"Unable to transition to normal mode: Bad response from server {r!r}" |
| 58 | r = await self.invoke({"get-stream": None}) | 74 | ) |
| 59 | if r != "ok": | 75 | self.logger.debug("Mode is now normal") |
| 60 | raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) | 76 | |
| 61 | elif new_mode != self.mode: | 77 | if new_mode == self.MODE_GET_STREAM: |
| 62 | raise Exception( | 78 | await normal_to_stream("get-stream") |
| 63 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 79 | elif new_mode == self.MODE_EXIST_STREAM: |
| 64 | ) | 80 | await normal_to_stream("exists-stream") |
| 81 | elif new_mode != self.MODE_NORMAL: | ||
| 82 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") | ||
| 65 | 83 | ||
| 66 | self.mode = new_mode | 84 | self.mode = new_mode |
| 67 | 85 | ||
| @@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 95 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 113 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
| 96 | ) | 114 | ) |
| 97 | 115 | ||
| 116 | async def unihash_exists(self, unihash): | ||
| 117 | await self._set_mode(self.MODE_EXIST_STREAM) | ||
| 118 | r = await self.send_stream(unihash) | ||
| 119 | return r == "true" | ||
| 120 | |||
| 98 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
| 99 | await self._set_mode(self.MODE_NORMAL) | 122 | await self._set_mode(self.MODE_NORMAL) |
| 100 | return await self.invoke( | 123 | return await self.invoke( |
| @@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client): | |||
| 236 | "report_unihash", | 259 | "report_unihash", |
| 237 | "report_unihash_equiv", | 260 | "report_unihash_equiv", |
| 238 | "get_taskhash", | 261 | "get_taskhash", |
| 262 | "unihash_exists", | ||
| 239 | "get_outhash", | 263 | "get_outhash", |
| 240 | "get_stats", | 264 | "get_stats", |
| 241 | "reset_stats", | 265 | "reset_stats", |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 5ed852d1f3..68f64f983b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 234 | "get": self.handle_get, | 234 | "get": self.handle_get, |
| 235 | "get-outhash": self.handle_get_outhash, | 235 | "get-outhash": self.handle_get_outhash, |
| 236 | "get-stream": self.handle_get_stream, | 236 | "get-stream": self.handle_get_stream, |
| 237 | "exists-stream": self.handle_exists_stream, | ||
| 237 | "get-stats": self.handle_get_stats, | 238 | "get-stats": self.handle_get_stats, |
| 238 | "get-db-usage": self.handle_get_db_usage, | 239 | "get-db-usage": self.handle_get_db_usage, |
| 239 | "get-db-query-columns": self.handle_get_db_query_columns, | 240 | "get-db-query-columns": self.handle_get_db_query_columns, |
| @@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 377 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) | 378 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
| 378 | await self.db.insert_outhash(data) | 379 | await self.db.insert_outhash(data) |
| 379 | 380 | ||
| 380 | @permissions(READ_PERM) | 381 | async def _stream_handler(self, handler): |
| 381 | async def handle_get_stream(self, request): | ||
| 382 | await self.socket.send_message("ok") | 382 | await self.socket.send_message("ok") |
| 383 | 383 | ||
| 384 | while True: | 384 | while True: |
| @@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 400 | if l == "END": | 400 | if l == "END": |
| 401 | break | 401 | break |
| 402 | 402 | ||
| 403 | (method, taskhash) = l.split() | 403 | msg = await handler(l) |
| 404 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) | ||
| 405 | row = await self.db.get_equivalent(method, taskhash) | ||
| 406 | |||
| 407 | if row is not None: | ||
| 408 | msg = row["unihash"] | ||
| 409 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | ||
| 410 | elif self.upstream_client is not None: | ||
| 411 | upstream = await self.upstream_client.get_unihash(method, taskhash) | ||
| 412 | if upstream: | ||
| 413 | msg = upstream | ||
| 414 | else: | ||
| 415 | msg = "" | ||
| 416 | else: | ||
| 417 | msg = "" | ||
| 418 | |||
| 419 | await self.socket.send(msg) | 404 | await self.socket.send(msg) |
| 420 | finally: | 405 | finally: |
| 421 | request_measure.end() | 406 | request_measure.end() |
| 422 | self.request_sample.end() | 407 | self.request_sample.end() |
| 423 | 408 | ||
| 424 | # Post to the backfill queue after writing the result to minimize | ||
| 425 | # the turn around time on a request | ||
| 426 | if upstream is not None: | ||
| 427 | await self.server.backfill_queue.put((method, taskhash)) | ||
| 428 | |||
| 429 | await self.socket.send("ok") | 409 | await self.socket.send("ok") |
| 430 | return self.NO_RESPONSE | 410 | return self.NO_RESPONSE |
| 431 | 411 | ||
| 412 | @permissions(READ_PERM) | ||
| 413 | async def handle_get_stream(self, request): | ||
| 414 | async def handler(l): | ||
| 415 | (method, taskhash) = l.split() | ||
| 416 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) | ||
| 417 | row = await self.db.get_equivalent(method, taskhash) | ||
| 418 | |||
| 419 | if row is not None: | ||
| 420 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | ||
| 421 | return row["unihash"] | ||
| 422 | |||
| 423 | if self.upstream_client is not None: | ||
| 424 | upstream = await self.upstream_client.get_unihash(method, taskhash) | ||
| 425 | if upstream: | ||
| 426 | await self.server.backfill_queue.put((method, taskhash)) | ||
| 427 | return upstream | ||
| 428 | |||
| 429 | return "" | ||
| 430 | |||
| 431 | return await self._stream_handler(handler) | ||
| 432 | |||
| 433 | @permissions(READ_PERM) | ||
| 434 | async def handle_exists_stream(self, request): | ||
| 435 | async def handler(l): | ||
| 436 | if await self.db.unihash_exists(l): | ||
| 437 | return "true" | ||
| 438 | |||
| 439 | if self.upstream_client is not None: | ||
| 440 | if await self.upstream_client.unihash_exists(l): | ||
| 441 | return "true" | ||
| 442 | |||
| 443 | return "false" | ||
| 444 | |||
| 445 | return await self._stream_handler(handler) | ||
| 446 | |||
| 432 | async def report_readonly(self, data): | 447 | async def report_readonly(self, data): |
| 433 | method = data["method"] | 448 | method = data["method"] |
| 434 | outhash = data["outhash"] | 449 | outhash = data["outhash"] |
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index 873547809a..0e28d738f5 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
| @@ -48,6 +48,7 @@ class UnihashesV3(Base): | |||
| 48 | __table_args__ = ( | 48 | __table_args__ = ( |
| 49 | UniqueConstraint("method", "taskhash"), | 49 | UniqueConstraint("method", "taskhash"), |
| 50 | Index("taskhash_lookup_v4", "method", "taskhash"), | 50 | Index("taskhash_lookup_v4", "method", "taskhash"), |
| 51 | Index("unihash_lookup_v1", "unihash"), | ||
| 51 | ) | 52 | ) |
| 52 | 53 | ||
| 53 | 54 | ||
| @@ -279,6 +280,16 @@ class Database(object): | |||
| 279 | ) | 280 | ) |
| 280 | return map_row(result.first()) | 281 | return map_row(result.first()) |
| 281 | 282 | ||
| 283 | async def unihash_exists(self, unihash): | ||
| 284 | async with self.db.begin(): | ||
| 285 | result = await self._execute( | ||
| 286 | select(UnihashesV3) | ||
| 287 | .where(UnihashesV3.unihash == unihash) | ||
| 288 | .limit(1) | ||
| 289 | ) | ||
| 290 | |||
| 291 | return result.first() is not None | ||
| 292 | |||
| 282 | async def get_outhash(self, method, outhash): | 293 | async def get_outhash(self, method, outhash): |
| 283 | async with self.db.begin(): | 294 | async with self.db.begin(): |
| 284 | result = await self._execute( | 295 | result = await self._execute( |
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index 608490730d..da2e844a03 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py | |||
| @@ -145,6 +145,9 @@ class DatabaseEngine(object): | |||
| 145 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" | 145 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" |
| 146 | ) | 146 | ) |
| 147 | cursor.execute( | 147 | cursor.execute( |
| 148 | "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" | ||
| 149 | ) | ||
| 150 | cursor.execute( | ||
| 148 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" | 151 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" |
| 149 | ) | 152 | ) |
| 150 | cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") | 153 | cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") |
| @@ -255,6 +258,19 @@ class Database(object): | |||
| 255 | ) | 258 | ) |
| 256 | return cursor.fetchone() | 259 | return cursor.fetchone() |
| 257 | 260 | ||
| 261 | async def unihash_exists(self, unihash): | ||
| 262 | with closing(self.db.cursor()) as cursor: | ||
| 263 | cursor.execute( | ||
| 264 | """ | ||
| 265 | SELECT * FROM unihashes_v3 WHERE unihash=:unihash | ||
| 266 | LIMIT 1 | ||
| 267 | """, | ||
| 268 | { | ||
| 269 | "unihash": unihash, | ||
| 270 | }, | ||
| 271 | ) | ||
| 272 | return cursor.fetchone() is not None | ||
| 273 | |||
| 258 | async def get_outhash(self, method, outhash): | 274 | async def get_outhash(self, method, outhash): |
| 259 | with closing(self.db.cursor()) as cursor: | 275 | with closing(self.db.cursor()) as cursor: |
| 260 | cursor.execute( | 276 | cursor.execute( |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index aeedab3575..fbbe81512a 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object): | |||
| 442 | self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') | 442 | self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') |
| 443 | self.assertEqual(result['method'], self.METHOD) | 443 | self.assertEqual(result['method'], self.METHOD) |
| 444 | 444 | ||
| 445 | def test_unihash_exsits(self): | ||
| 446 | taskhash, outhash, unihash = self.create_test_hash(self.client) | ||
| 447 | self.assertTrue(self.client.unihash_exists(unihash)) | ||
| 448 | self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8')) | ||
| 449 | |||
| 445 | def test_ro_server(self): | 450 | def test_ro_server(self): |
| 446 | rw_server = self.start_server() | 451 | rw_server = self.start_server() |
| 447 | rw_client = self.start_client(rw_server.address) | 452 | rw_client = self.start_client(rw_server.address) |
| @@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): | |||
| 1031 | def test_stress(self): | 1036 | def test_stress(self): |
| 1032 | self.run_hashclient(["--address", self.server_address, "stress"], check=True) | 1037 | self.run_hashclient(["--address", self.server_address, "stress"], check=True) |
| 1033 | 1038 | ||
| 1039 | def test_unihash_exsits(self): | ||
| 1040 | taskhash, outhash, unihash = self.create_test_hash(self.client) | ||
| 1041 | |||
| 1042 | p = self.run_hashclient([ | ||
| 1043 | "--address", self.server_address, | ||
| 1044 | "unihash-exists", unihash, | ||
| 1045 | ], check=True) | ||
| 1046 | self.assertEqual(p.stdout.strip(), "true") | ||
| 1047 | |||
| 1048 | p = self.run_hashclient([ | ||
| 1049 | "--address", self.server_address, | ||
| 1050 | "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', | ||
| 1051 | ], check=True) | ||
| 1052 | self.assertEqual(p.stdout.strip(), "false") | ||
| 1053 | |||
| 1054 | def test_unihash_exsits_quiet(self): | ||
| 1055 | taskhash, outhash, unihash = self.create_test_hash(self.client) | ||
| 1056 | |||
| 1057 | p = self.run_hashclient([ | ||
| 1058 | "--address", self.server_address, | ||
| 1059 | "unihash-exists", unihash, | ||
| 1060 | "--quiet", | ||
| 1061 | ]) | ||
| 1062 | self.assertEqual(p.returncode, 0) | ||
| 1063 | self.assertEqual(p.stdout.strip(), "") | ||
| 1064 | |||
| 1065 | p = self.run_hashclient([ | ||
| 1066 | "--address", self.server_address, | ||
| 1067 | "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8', | ||
| 1068 | "--quiet", | ||
| 1069 | ]) | ||
| 1070 | self.assertEqual(p.returncode, 1) | ||
| 1071 | self.assertEqual(p.stdout.strip(), "") | ||
| 1072 | |||
| 1034 | def test_remove_taskhash(self): | 1073 | def test_remove_taskhash(self): |
| 1035 | taskhash, outhash, unihash = self.create_test_hash(self.client) | 1074 | taskhash, outhash, unihash = self.create_test_hash(self.client) |
| 1036 | self.run_hashclient([ | 1075 | self.run_hashclient([ |
