summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbitbake/bin/bitbake-hashclient31
-rw-r--r--bitbake/lib/hashserv/client.py22
-rw-r--r--bitbake/lib/hashserv/server.py29
-rw-r--r--bitbake/lib/hashserv/tests.py42
4 files changed, 124 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient
index a50701a88b..b8755c5797 100755
--- a/bitbake/bin/bitbake-hashclient
+++ b/bitbake/bin/bitbake-hashclient
@@ -227,6 +227,27 @@ def main():
227 print("New hashes marked: %d" % result["count"]) 227 print("New hashes marked: %d" % result["count"])
228 return 0 228 return 0
229 229
230 def handle_gc_mark_stream(args, client):
231 stdin = (l.strip() for l in sys.stdin)
232 marked_hashes = 0
233
234 try:
235 result = client.gc_mark_stream(args.mark, stdin)
236 marked_hashes = result["count"]
237 except ConnectionError:
238 logger.warning(
239 "Server doesn't seem to support `gc-mark-stream`. Sending "
240 "hashes sequentially using `gc-mark` API."
241 )
242 for line in stdin:
243 pairs = line.split()
244 condition = dict(zip(pairs[::2], pairs[1::2]))
245 result = client.gc_mark(args.mark, condition)
246 marked_hashes += result["count"]
247
248 print("New hashes marked: %d" % marked_hashes)
249 return 0
250
230 def handle_gc_sweep(args, client): 251 def handle_gc_sweep(args, client):
231 result = client.gc_sweep(args.mark) 252 result = client.gc_sweep(args.mark)
232 print("Removed %d rows" % result["count"]) 253 print("Removed %d rows" % result["count"])
@@ -366,6 +387,16 @@ def main():
366 help="Keep entries in table where KEY == VALUE") 387 help="Keep entries in table where KEY == VALUE")
367 gc_mark_parser.set_defaults(func=handle_gc_mark) 388 gc_mark_parser.set_defaults(func=handle_gc_mark)
368 389
390 gc_mark_parser_stream = subparsers.add_parser(
391 'gc-mark-stream',
392 help=(
393 "Mark multiple hashes to be retained for garbage collection. Input should be provided via stdin, "
394 "with each line formatted as key-value pairs separated by spaces, for example 'column1 foo column2 bar'."
395 )
396 )
397 gc_mark_parser_stream.add_argument("mark", help="Mark for this garbage collection operation")
398 gc_mark_parser_stream.set_defaults(func=handle_gc_mark_stream)
399
369 gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") 400 gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked")
370 gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") 401 gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation")
371 gc_sweep_parser.set_defaults(func=handle_gc_sweep) 402 gc_sweep_parser.set_defaults(func=handle_gc_sweep)
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index a510f3284f..8cb18050a6 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -78,6 +78,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
78 MODE_NORMAL = 0 78 MODE_NORMAL = 0
79 MODE_GET_STREAM = 1 79 MODE_GET_STREAM = 1
80 MODE_EXIST_STREAM = 2 80 MODE_EXIST_STREAM = 2
81 MODE_MARK_STREAM = 3
81 82
82 def __init__(self, username=None, password=None): 83 def __init__(self, username=None, password=None):
83 super().__init__("OEHASHEQUIV", "1.1", logger) 84 super().__init__("OEHASHEQUIV", "1.1", logger)
@@ -164,6 +165,8 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
164 await normal_to_stream("get-stream") 165 await normal_to_stream("get-stream")
165 elif new_mode == self.MODE_EXIST_STREAM: 166 elif new_mode == self.MODE_EXIST_STREAM:
166 await normal_to_stream("exists-stream") 167 await normal_to_stream("exists-stream")
168 elif new_mode == self.MODE_MARK_STREAM:
169 await normal_to_stream("gc-mark-stream")
167 elif new_mode != self.MODE_NORMAL: 170 elif new_mode != self.MODE_NORMAL:
168 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") 171 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
169 172
@@ -306,6 +309,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
306 """ 309 """
307 return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) 310 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
308 311
312 async def gc_mark_stream(self, mark, rows):
313 """
314 Similar to `gc-mark`, but accepts a list of "where" key-value pair
315 conditions. It utilizes stream mode to mark hashes, which helps reduce
316 the impact of latency when communicating with the hash equivalence
317 server.
318 """
319 def row_to_dict(row):
320 pairs = row.split()
321 return dict(zip(pairs[::2], pairs[1::2]))
322
323 responses = await self.send_stream_batch(
324 self.MODE_MARK_STREAM,
325 (json.dumps({"mark": mark, "where": row_to_dict(row)}) for row in rows),
326 )
327
328 return {"count": sum(int(json.loads(r)["count"]) for r in responses)}
329
309 async def gc_sweep(self, mark): 330 async def gc_sweep(self, mark):
310 """ 331 """
311 Finishes garbage collection for "mark". All unihash entries that have 332 Finishes garbage collection for "mark". All unihash entries that have
@@ -351,6 +372,7 @@ class Client(bb.asyncrpc.Client):
351 "get_db_query_columns", 372 "get_db_query_columns",
352 "gc_status", 373 "gc_status",
353 "gc_mark", 374 "gc_mark",
375 "gc_mark_stream",
354 "gc_sweep", 376 "gc_sweep",
355 ) 377 )
356 378
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 68f64f983b..58f95c7bcd 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -10,6 +10,7 @@ import math
10import time 10import time
11import os 11import os
12import base64 12import base64
13import json
13import hashlib 14import hashlib
14from . import create_async_client 15from . import create_async_client
15import bb.asyncrpc 16import bb.asyncrpc
@@ -256,6 +257,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
256 "backfill-wait": self.handle_backfill_wait, 257 "backfill-wait": self.handle_backfill_wait,
257 "remove": self.handle_remove, 258 "remove": self.handle_remove,
258 "gc-mark": self.handle_gc_mark, 259 "gc-mark": self.handle_gc_mark,
260 "gc-mark-stream": self.handle_gc_mark_stream,
259 "gc-sweep": self.handle_gc_sweep, 261 "gc-sweep": self.handle_gc_sweep,
260 "gc-status": self.handle_gc_status, 262 "gc-status": self.handle_gc_status,
261 "clean-unused": self.handle_clean_unused, 263 "clean-unused": self.handle_clean_unused,
@@ -584,6 +586,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
584 return {"count": await self.db.gc_mark(mark, condition)} 586 return {"count": await self.db.gc_mark(mark, condition)}
585 587
586 @permissions(DB_ADMIN_PERM) 588 @permissions(DB_ADMIN_PERM)
589 async def handle_gc_mark_stream(self, request):
590 async def handler(line):
591 try:
592 decoded_line = json.loads(line)
593 except json.JSONDecodeError as exc:
594 raise bb.asyncrpc.InvokeError(
595 "Could not decode JSONL input '%s'" % line
596 ) from exc
597
598 try:
599 mark = decoded_line["mark"]
600 condition = decoded_line["where"]
601 if not isinstance(mark, str):
602 raise TypeError("Bad mark type %s" % type(mark))
603
604 if not isinstance(condition, dict):
605 raise TypeError("Bad condition type %s" % type(condition))
606 except KeyError as exc:
607 raise bb.asyncrpc.InvokeError(
608 "Input line is missing key '%s' " % exc
609 ) from exc
610
611 return json.dumps({"count": await self.db.gc_mark(mark, condition)})
612
613 return await self._stream_handler(handler)
614
615 @permissions(DB_ADMIN_PERM)
587 async def handle_gc_sweep(self, request): 616 async def handle_gc_sweep(self, request):
588 mark = request["mark"] 617 mark = request["mark"]
589 618
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 13ccb20ebf..da3f8e0884 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -969,6 +969,48 @@ class HashEquivalenceCommonTests(object):
969 # First hash is still present 969 # First hash is still present
970 self.assertClientGetHash(self.client, taskhash, unihash) 970 self.assertClientGetHash(self.client, taskhash, unihash)
971 971
972 def test_gc_stream(self):
973 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
974 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
975 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
976
977 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
978 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
979
980 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
981 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
982 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
983
984 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
985 self.assertClientGetHash(self.client, taskhash2, unihash2)
986
987 taskhash3 = 'a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c0'
988 outhash3 = '7289c414905303700a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c04f9a53c'
989 unihash3 = '905303700a1117c1f5a7c9ab2f5a39cc6fe5e615'
990
991 result = self.client.report_unihash(taskhash3, self.METHOD, outhash3, unihash3)
992 self.assertClientGetHash(self.client, taskhash3, unihash3)
993
994 # Mark the first unihash to be kept
995 ret = self.client.gc_mark_stream("ABC", (f"unihash {h}" for h in [unihash, unihash2]))
996 self.assertEqual(ret, {"count": 2})
997
998 ret = self.client.gc_status()
999 self.assertEqual(ret, {"mark": "ABC", "keep": 2, "remove": 1})
1000
1001 # Third hash is still there; mark doesn't delete hashes
1002 self.assertClientGetHash(self.client, taskhash3, unihash3)
1003
1004 ret = self.client.gc_sweep("ABC")
1005 self.assertEqual(ret, {"count": 1})
1006
1007 # Hash is gone. Taskhash is returned for second hash
1008 self.assertClientGetHash(self.client, taskhash3, None)
1009 # First hash is still present
1010 self.assertClientGetHash(self.client, taskhash, unihash)
1011 # Second hash is still present
1012 self.assertClientGetHash(self.client, taskhash2, unihash2)
1013
972 def test_gc_switch_mark(self): 1014 def test_gc_switch_mark(self):
973 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' 1015 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
974 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' 1016 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'