diff options
-rwxr-xr-x | bitbake/bin/bitbake-hashclient | 31 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 22 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 29 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 42 |
4 files changed, 124 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index a50701a88b..b8755c5797 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient | |||
@@ -227,6 +227,27 @@ def main(): | |||
227 | print("New hashes marked: %d" % result["count"]) | 227 | print("New hashes marked: %d" % result["count"]) |
228 | return 0 | 228 | return 0 |
229 | 229 | ||
230 | def handle_gc_mark_stream(args, client): | ||
231 | stdin = (l.strip() for l in sys.stdin) | ||
232 | marked_hashes = 0 | ||
233 | |||
234 | try: | ||
235 | result = client.gc_mark_stream(args.mark, stdin) | ||
236 | marked_hashes = result["count"] | ||
237 | except ConnectionError: | ||
238 | logger.warning( | ||
239 | "Server doesn't seem to support `gc-mark-stream`. Sending " | ||
240 | "hashes sequentially using `gc-mark` API." | ||
241 | ) | ||
242 | for line in stdin: | ||
243 | pairs = line.split() | ||
244 | condition = dict(zip(pairs[::2], pairs[1::2])) | ||
245 | result = client.gc_mark(args.mark, condition) | ||
246 | marked_hashes += result["count"] | ||
247 | |||
248 | print("New hashes marked: %d" % marked_hashes) | ||
249 | return 0 | ||
250 | |||
230 | def handle_gc_sweep(args, client): | 251 | def handle_gc_sweep(args, client): |
231 | result = client.gc_sweep(args.mark) | 252 | result = client.gc_sweep(args.mark) |
232 | print("Removed %d rows" % result["count"]) | 253 | print("Removed %d rows" % result["count"]) |
@@ -366,6 +387,16 @@ def main(): | |||
366 | help="Keep entries in table where KEY == VALUE") | 387 | help="Keep entries in table where KEY == VALUE") |
367 | gc_mark_parser.set_defaults(func=handle_gc_mark) | 388 | gc_mark_parser.set_defaults(func=handle_gc_mark) |
368 | 389 | ||
390 | gc_mark_parser_stream = subparsers.add_parser( | ||
391 | 'gc-mark-stream', | ||
392 | help=( | ||
393 | "Mark multiple hashes to be retained for garbage collection. Input should be provided via stdin, " | ||
394 | "with each line formatted as key-value pairs separated by spaces, for example 'column1 foo column2 bar'." | ||
395 | ) | ||
396 | ) | ||
397 | gc_mark_parser_stream.add_argument("mark", help="Mark for this garbage collection operation") | ||
398 | gc_mark_parser_stream.set_defaults(func=handle_gc_mark_stream) | ||
399 | |||
369 | gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") | 400 | gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") |
370 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") | 401 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") |
371 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) | 402 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index a510f3284f..8cb18050a6 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -78,6 +78,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
78 | MODE_NORMAL = 0 | 78 | MODE_NORMAL = 0 |
79 | MODE_GET_STREAM = 1 | 79 | MODE_GET_STREAM = 1 |
80 | MODE_EXIST_STREAM = 2 | 80 | MODE_EXIST_STREAM = 2 |
81 | MODE_MARK_STREAM = 3 | ||
81 | 82 | ||
82 | def __init__(self, username=None, password=None): | 83 | def __init__(self, username=None, password=None): |
83 | super().__init__("OEHASHEQUIV", "1.1", logger) | 84 | super().__init__("OEHASHEQUIV", "1.1", logger) |
@@ -164,6 +165,8 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
164 | await normal_to_stream("get-stream") | 165 | await normal_to_stream("get-stream") |
165 | elif new_mode == self.MODE_EXIST_STREAM: | 166 | elif new_mode == self.MODE_EXIST_STREAM: |
166 | await normal_to_stream("exists-stream") | 167 | await normal_to_stream("exists-stream") |
168 | elif new_mode == self.MODE_MARK_STREAM: | ||
169 | await normal_to_stream("gc-mark-stream") | ||
167 | elif new_mode != self.MODE_NORMAL: | 170 | elif new_mode != self.MODE_NORMAL: |
168 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") | 171 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") |
169 | 172 | ||
@@ -306,6 +309,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
306 | """ | 309 | """ |
307 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | 310 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) |
308 | 311 | ||
312 | async def gc_mark_stream(self, mark, rows): | ||
313 | """ | ||
314 | Similar to `gc-mark`, but accepts a list of "where" key-value pair | ||
315 | conditions. It utilizes stream mode to mark hashes, which helps reduce | ||
316 | the impact of latency when communicating with the hash equivalence | ||
317 | server. | ||
318 | """ | ||
319 | def row_to_dict(row): | ||
320 | pairs = row.split() | ||
321 | return dict(zip(pairs[::2], pairs[1::2])) | ||
322 | |||
323 | responses = await self.send_stream_batch( | ||
324 | self.MODE_MARK_STREAM, | ||
325 | (json.dumps({"mark": mark, "where": row_to_dict(row)}) for row in rows), | ||
326 | ) | ||
327 | |||
328 | return {"count": sum(int(json.loads(r)["count"]) for r in responses)} | ||
329 | |||
309 | async def gc_sweep(self, mark): | 330 | async def gc_sweep(self, mark): |
310 | """ | 331 | """ |
311 | Finishes garbage collection for "mark". All unihash entries that have | 332 | Finishes garbage collection for "mark". All unihash entries that have |
@@ -351,6 +372,7 @@ class Client(bb.asyncrpc.Client): | |||
351 | "get_db_query_columns", | 372 | "get_db_query_columns", |
352 | "gc_status", | 373 | "gc_status", |
353 | "gc_mark", | 374 | "gc_mark", |
375 | "gc_mark_stream", | ||
354 | "gc_sweep", | 376 | "gc_sweep", |
355 | ) | 377 | ) |
356 | 378 | ||
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 68f64f983b..58f95c7bcd 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -10,6 +10,7 @@ import math | |||
10 | import time | 10 | import time |
11 | import os | 11 | import os |
12 | import base64 | 12 | import base64 |
13 | import json | ||
13 | import hashlib | 14 | import hashlib |
14 | from . import create_async_client | 15 | from . import create_async_client |
15 | import bb.asyncrpc | 16 | import bb.asyncrpc |
@@ -256,6 +257,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
256 | "backfill-wait": self.handle_backfill_wait, | 257 | "backfill-wait": self.handle_backfill_wait, |
257 | "remove": self.handle_remove, | 258 | "remove": self.handle_remove, |
258 | "gc-mark": self.handle_gc_mark, | 259 | "gc-mark": self.handle_gc_mark, |
260 | "gc-mark-stream": self.handle_gc_mark_stream, | ||
259 | "gc-sweep": self.handle_gc_sweep, | 261 | "gc-sweep": self.handle_gc_sweep, |
260 | "gc-status": self.handle_gc_status, | 262 | "gc-status": self.handle_gc_status, |
261 | "clean-unused": self.handle_clean_unused, | 263 | "clean-unused": self.handle_clean_unused, |
@@ -584,6 +586,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
584 | return {"count": await self.db.gc_mark(mark, condition)} | 586 | return {"count": await self.db.gc_mark(mark, condition)} |
585 | 587 | ||
586 | @permissions(DB_ADMIN_PERM) | 588 | @permissions(DB_ADMIN_PERM) |
589 | async def handle_gc_mark_stream(self, request): | ||
590 | async def handler(line): | ||
591 | try: | ||
592 | decoded_line = json.loads(line) | ||
593 | except json.JSONDecodeError as exc: | ||
594 | raise bb.asyncrpc.InvokeError( | ||
595 | "Could not decode JSONL input '%s'" % line | ||
596 | ) from exc | ||
597 | |||
598 | try: | ||
599 | mark = decoded_line["mark"] | ||
600 | condition = decoded_line["where"] | ||
601 | if not isinstance(mark, str): | ||
602 | raise TypeError("Bad mark type %s" % type(mark)) | ||
603 | |||
604 | if not isinstance(condition, dict): | ||
605 | raise TypeError("Bad condition type %s" % type(condition)) | ||
606 | except KeyError as exc: | ||
607 | raise bb.asyncrpc.InvokeError( | ||
608 | "Input line is missing key '%s' " % exc | ||
609 | ) from exc | ||
610 | |||
611 | return json.dumps({"count": await self.db.gc_mark(mark, condition)}) | ||
612 | |||
613 | return await self._stream_handler(handler) | ||
614 | |||
615 | @permissions(DB_ADMIN_PERM) | ||
587 | async def handle_gc_sweep(self, request): | 616 | async def handle_gc_sweep(self, request): |
588 | mark = request["mark"] | 617 | mark = request["mark"] |
589 | 618 | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 13ccb20ebf..da3f8e0884 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -969,6 +969,48 @@ class HashEquivalenceCommonTests(object): | |||
969 | # First hash is still present | 969 | # First hash is still present |
970 | self.assertClientGetHash(self.client, taskhash, unihash) | 970 | self.assertClientGetHash(self.client, taskhash, unihash) |
971 | 971 | ||
972 | def test_gc_stream(self): | ||
973 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
974 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
975 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
976 | |||
977 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
978 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
979 | |||
980 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
981 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
982 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
983 | |||
984 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
985 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
986 | |||
987 | taskhash3 = 'a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c0' | ||
988 | outhash3 = '7289c414905303700a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c04f9a53c' | ||
989 | unihash3 = '905303700a1117c1f5a7c9ab2f5a39cc6fe5e615' | ||
990 | |||
991 | result = self.client.report_unihash(taskhash3, self.METHOD, outhash3, unihash3) | ||
992 | self.assertClientGetHash(self.client, taskhash3, unihash3) | ||
993 | |||
994 | # Mark the first unihash to be kept | ||
995 | ret = self.client.gc_mark_stream("ABC", (f"unihash {h}" for h in [unihash, unihash2])) | ||
996 | self.assertEqual(ret, {"count": 2}) | ||
997 | |||
998 | ret = self.client.gc_status() | ||
999 | self.assertEqual(ret, {"mark": "ABC", "keep": 2, "remove": 1}) | ||
1000 | |||
1001 | # Third hash is still there; mark doesn't delete hashes | ||
1002 | self.assertClientGetHash(self.client, taskhash3, unihash3) | ||
1003 | |||
1004 | ret = self.client.gc_sweep("ABC") | ||
1005 | self.assertEqual(ret, {"count": 1}) | ||
1006 | |||
1007 | # Hash is gone. Taskhash is returned for second hash | ||
1008 | self.assertClientGetHash(self.client, taskhash3, None) | ||
1009 | # First hash is still present | ||
1010 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
1011 | # Second hash is still present | ||
1012 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
1013 | |||
972 | def test_gc_switch_mark(self): | 1014 | def test_gc_switch_mark(self): |
973 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | 1015 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' |
974 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | 1016 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' |