summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-02-18 15:59:48 -0700
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-02-19 11:58:12 +0000
commit3bd2c69e70853584beaaa5a4fd62589fa051d911 (patch)
tree3dc6469a1895cc7a31a58e4fc9dc43cef9a82cfb
parentbe909636c608d5ba24a41327c53d6a4ba3b70151 (diff)
downloadpoky-3bd2c69e70853584beaaa5a4fd62589fa051d911.tar.gz
bitbake: hashserv: Add unihash-exists API
Adds API to check if the server is aware of the existence of a given unihash. This can be used as an optimization for sstate where a client can query the hash equivalence server to check if a unihash exists before querying the sstate cache. If the hash server isn't aware of the existence of a unihash, then there is very likely not a matching sstate object, so this should be able to significantly cut down on the number of negative hits on the sstate cache. (Bitbake rev: cfe0ac071cfb998e4a1dd263f8860b140843361a) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-xbitbake/bin/bitbake-hashclient13
-rw-r--r--bitbake/lib/hashserv/client.py44
-rw-r--r--bitbake/lib/hashserv/server.py61
-rw-r--r--bitbake/lib/hashserv/sqlalchemy.py11
-rw-r--r--bitbake/lib/hashserv/sqlite.py16
-rw-r--r--bitbake/lib/hashserv/tests.py39
6 files changed, 151 insertions, 33 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient
index f71b87404a..47dd27cd3c 100755
--- a/bitbake/bin/bitbake-hashclient
+++ b/bitbake/bin/bitbake-hashclient
@@ -217,6 +217,14 @@ def main():
217 print("Removed %d rows" % result["count"]) 217 print("Removed %d rows" % result["count"])
218 return 0 218 return 0
219 219
220 def handle_unihash_exists(args, client):
221 result = client.unihash_exists(args.unihash)
222 if args.quiet:
223 return 0 if result else 1
224
225 print("true" if result else "false")
226 return 0
227
220 parser = argparse.ArgumentParser(description='Hash Equivalence Client') 228 parser = argparse.ArgumentParser(description='Hash Equivalence Client')
221 parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') 229 parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")')
222 parser.add_argument('--log', default='WARNING', help='Set logging level') 230 parser.add_argument('--log', default='WARNING', help='Set logging level')
@@ -309,6 +317,11 @@ def main():
309 gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") 317 gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation")
310 gc_sweep_parser.set_defaults(func=handle_gc_sweep) 318 gc_sweep_parser.set_defaults(func=handle_gc_sweep)
311 319
320 unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server")
321 unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not")
322 unihash_exists_parser.add_argument("unihash", help="Unihash to check")
323 unihash_exists_parser.set_defaults(func=handle_unihash_exists)
324
312 args = parser.parse_args() 325 args = parser.parse_args()
313 326
314 logger = logging.getLogger('hashserv') 327 logger = logging.getLogger('hashserv')
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index e6dc417912..daf1e12842 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -16,6 +16,7 @@ logger = logging.getLogger("hashserv.client")
16class AsyncClient(bb.asyncrpc.AsyncClient): 16class AsyncClient(bb.asyncrpc.AsyncClient):
17 MODE_NORMAL = 0 17 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1 18 MODE_GET_STREAM = 1
19 MODE_EXIST_STREAM = 2
19 20
20 def __init__(self, username=None, password=None): 21 def __init__(self, username=None, password=None):
21 super().__init__("OEHASHEQUIV", "1.1", logger) 22 super().__init__("OEHASHEQUIV", "1.1", logger)
@@ -49,19 +50,36 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
49 await self.socket.send("END") 50 await self.socket.send("END")
50 return await self.socket.recv() 51 return await self.socket.recv()
51 52
52 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 53 async def normal_to_stream(command):
54 r = await self.invoke({command: None})
55 if r != "ok":
56 raise ConnectionError(
57 f"Unable to transition to stream mode: Bad response from server {r!r}"
58 )
59
60 self.logger.debug("Mode is now %s", command)
61
62 if new_mode == self.mode:
63 return
64
65 self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode)
66
67 # Always transition to normal mode before switching to any other mode
68 if self.mode != self.MODE_NORMAL:
53 r = await self._send_wrapper(stream_to_normal) 69 r = await self._send_wrapper(stream_to_normal)
54 if r != "ok": 70 if r != "ok":
55 self.check_invoke_error(r) 71 self.check_invoke_error(r)
56 raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) 72 raise ConnectionError(
57 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 73 f"Unable to transition to normal mode: Bad response from server {r!r}"
58 r = await self.invoke({"get-stream": None}) 74 )
59 if r != "ok": 75 self.logger.debug("Mode is now normal")
60 raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) 76
61 elif new_mode != self.mode: 77 if new_mode == self.MODE_GET_STREAM:
62 raise Exception( 78 await normal_to_stream("get-stream")
63 "Undefined mode transition %r -> %r" % (self.mode, new_mode) 79 elif new_mode == self.MODE_EXIST_STREAM:
64 ) 80 await normal_to_stream("exists-stream")
81 elif new_mode != self.MODE_NORMAL:
82 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
65 83
66 self.mode = new_mode 84 self.mode = new_mode
67 85
@@ -95,6 +113,11 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
95 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 113 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
96 ) 114 )
97 115
116 async def unihash_exists(self, unihash):
117 await self._set_mode(self.MODE_EXIST_STREAM)
118 r = await self.send_stream(unihash)
119 return r == "true"
120
98 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
99 await self._set_mode(self.MODE_NORMAL) 122 await self._set_mode(self.MODE_NORMAL)
100 return await self.invoke( 123 return await self.invoke(
@@ -236,6 +259,7 @@ class Client(bb.asyncrpc.Client):
236 "report_unihash", 259 "report_unihash",
237 "report_unihash_equiv", 260 "report_unihash_equiv",
238 "get_taskhash", 261 "get_taskhash",
262 "unihash_exists",
239 "get_outhash", 263 "get_outhash",
240 "get_stats", 264 "get_stats",
241 "reset_stats", 265 "reset_stats",
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 5ed852d1f3..68f64f983b 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -234,6 +234,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
234 "get": self.handle_get, 234 "get": self.handle_get,
235 "get-outhash": self.handle_get_outhash, 235 "get-outhash": self.handle_get_outhash,
236 "get-stream": self.handle_get_stream, 236 "get-stream": self.handle_get_stream,
237 "exists-stream": self.handle_exists_stream,
237 "get-stats": self.handle_get_stats, 238 "get-stats": self.handle_get_stats,
238 "get-db-usage": self.handle_get_db_usage, 239 "get-db-usage": self.handle_get_db_usage,
239 "get-db-query-columns": self.handle_get_db_query_columns, 240 "get-db-query-columns": self.handle_get_db_query_columns,
@@ -377,8 +378,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
377 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 378 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
378 await self.db.insert_outhash(data) 379 await self.db.insert_outhash(data)
379 380
380 @permissions(READ_PERM) 381 async def _stream_handler(self, handler):
381 async def handle_get_stream(self, request):
382 await self.socket.send_message("ok") 382 await self.socket.send_message("ok")
383 383
384 while True: 384 while True:
@@ -400,35 +400,50 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
400 if l == "END": 400 if l == "END":
401 break 401 break
402 402
403 (method, taskhash) = l.split() 403 msg = await handler(l)
404 # self.logger.debug('Looking up %s %s' % (method, taskhash))
405 row = await self.db.get_equivalent(method, taskhash)
406
407 if row is not None:
408 msg = row["unihash"]
409 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
410 elif self.upstream_client is not None:
411 upstream = await self.upstream_client.get_unihash(method, taskhash)
412 if upstream:
413 msg = upstream
414 else:
415 msg = ""
416 else:
417 msg = ""
418
419 await self.socket.send(msg) 404 await self.socket.send(msg)
420 finally: 405 finally:
421 request_measure.end() 406 request_measure.end()
422 self.request_sample.end() 407 self.request_sample.end()
423 408
424 # Post to the backfill queue after writing the result to minimize
425 # the turn around time on a request
426 if upstream is not None:
427 await self.server.backfill_queue.put((method, taskhash))
428
429 await self.socket.send("ok") 409 await self.socket.send("ok")
430 return self.NO_RESPONSE 410 return self.NO_RESPONSE
431 411
412 @permissions(READ_PERM)
413 async def handle_get_stream(self, request):
414 async def handler(l):
415 (method, taskhash) = l.split()
416 # self.logger.debug('Looking up %s %s' % (method, taskhash))
417 row = await self.db.get_equivalent(method, taskhash)
418
419 if row is not None:
420 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
421 return row["unihash"]
422
423 if self.upstream_client is not None:
424 upstream = await self.upstream_client.get_unihash(method, taskhash)
425 if upstream:
426 await self.server.backfill_queue.put((method, taskhash))
427 return upstream
428
429 return ""
430
431 return await self._stream_handler(handler)
432
433 @permissions(READ_PERM)
434 async def handle_exists_stream(self, request):
435 async def handler(l):
436 if await self.db.unihash_exists(l):
437 return "true"
438
439 if self.upstream_client is not None:
440 if await self.upstream_client.unihash_exists(l):
441 return "true"
442
443 return "false"
444
445 return await self._stream_handler(handler)
446
432 async def report_readonly(self, data): 447 async def report_readonly(self, data):
433 method = data["method"] 448 method = data["method"]
434 outhash = data["outhash"] 449 outhash = data["outhash"]
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
index 873547809a..0e28d738f5 100644
--- a/bitbake/lib/hashserv/sqlalchemy.py
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -48,6 +48,7 @@ class UnihashesV3(Base):
48 __table_args__ = ( 48 __table_args__ = (
49 UniqueConstraint("method", "taskhash"), 49 UniqueConstraint("method", "taskhash"),
50 Index("taskhash_lookup_v4", "method", "taskhash"), 50 Index("taskhash_lookup_v4", "method", "taskhash"),
51 Index("unihash_lookup_v1", "unihash"),
51 ) 52 )
52 53
53 54
@@ -279,6 +280,16 @@ class Database(object):
279 ) 280 )
280 return map_row(result.first()) 281 return map_row(result.first())
281 282
283 async def unihash_exists(self, unihash):
284 async with self.db.begin():
285 result = await self._execute(
286 select(UnihashesV3)
287 .where(UnihashesV3.unihash == unihash)
288 .limit(1)
289 )
290
291 return result.first() is not None
292
282 async def get_outhash(self, method, outhash): 293 async def get_outhash(self, method, outhash):
283 async with self.db.begin(): 294 async with self.db.begin():
284 result = await self._execute( 295 result = await self._execute(
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py
index 608490730d..da2e844a03 100644
--- a/bitbake/lib/hashserv/sqlite.py
+++ b/bitbake/lib/hashserv/sqlite.py
@@ -145,6 +145,9 @@ class DatabaseEngine(object):
145 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" 145 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)"
146 ) 146 )
147 cursor.execute( 147 cursor.execute(
148 "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)"
149 )
150 cursor.execute(
148 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" 151 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
149 ) 152 )
150 cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") 153 cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)")
@@ -255,6 +258,19 @@ class Database(object):
255 ) 258 )
256 return cursor.fetchone() 259 return cursor.fetchone()
257 260
261 async def unihash_exists(self, unihash):
262 with closing(self.db.cursor()) as cursor:
263 cursor.execute(
264 """
265 SELECT * FROM unihashes_v3 WHERE unihash=:unihash
266 LIMIT 1
267 """,
268 {
269 "unihash": unihash,
270 },
271 )
272 return cursor.fetchone() is not None
273
258 async def get_outhash(self, method, outhash): 274 async def get_outhash(self, method, outhash):
259 with closing(self.db.cursor()) as cursor: 275 with closing(self.db.cursor()) as cursor:
260 cursor.execute( 276 cursor.execute(
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index aeedab3575..fbbe81512a 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -442,6 +442,11 @@ class HashEquivalenceCommonTests(object):
442 self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream') 442 self.assertEqual(result['taskhash'], taskhash9, 'Server failed to copy unihash from upstream')
443 self.assertEqual(result['method'], self.METHOD) 443 self.assertEqual(result['method'], self.METHOD)
444 444
445 def test_unihash_exsits(self):
446 taskhash, outhash, unihash = self.create_test_hash(self.client)
447 self.assertTrue(self.client.unihash_exists(unihash))
448 self.assertFalse(self.client.unihash_exists('6662e699d6e3d894b24408ff9a4031ef9b038ee8'))
449
445 def test_ro_server(self): 450 def test_ro_server(self):
446 rw_server = self.start_server() 451 rw_server = self.start_server()
447 rw_client = self.start_client(rw_server.address) 452 rw_client = self.start_client(rw_server.address)
@@ -1031,6 +1036,40 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
1031 def test_stress(self): 1036 def test_stress(self):
1032 self.run_hashclient(["--address", self.server_address, "stress"], check=True) 1037 self.run_hashclient(["--address", self.server_address, "stress"], check=True)
1033 1038
1039 def test_unihash_exsits(self):
1040 taskhash, outhash, unihash = self.create_test_hash(self.client)
1041
1042 p = self.run_hashclient([
1043 "--address", self.server_address,
1044 "unihash-exists", unihash,
1045 ], check=True)
1046 self.assertEqual(p.stdout.strip(), "true")
1047
1048 p = self.run_hashclient([
1049 "--address", self.server_address,
1050 "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8',
1051 ], check=True)
1052 self.assertEqual(p.stdout.strip(), "false")
1053
1054 def test_unihash_exsits_quiet(self):
1055 taskhash, outhash, unihash = self.create_test_hash(self.client)
1056
1057 p = self.run_hashclient([
1058 "--address", self.server_address,
1059 "unihash-exists", unihash,
1060 "--quiet",
1061 ])
1062 self.assertEqual(p.returncode, 0)
1063 self.assertEqual(p.stdout.strip(), "")
1064
1065 p = self.run_hashclient([
1066 "--address", self.server_address,
1067 "unihash-exists", '6662e699d6e3d894b24408ff9a4031ef9b038ee8',
1068 "--quiet",
1069 ])
1070 self.assertEqual(p.returncode, 1)
1071 self.assertEqual(p.stdout.strip(), "")
1072
1034 def test_remove_taskhash(self): 1073 def test_remove_taskhash(self):
1035 taskhash, outhash, unihash = self.create_test_hash(self.client) 1074 taskhash, outhash, unihash = self.create_test_hash(self.client)
1036 self.run_hashclient([ 1075 self.run_hashclient([