diff options
Diffstat (limited to 'bitbake')
| -rwxr-xr-x | bitbake/bin/bitbake-hashclient | 31 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 22 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 29 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 42 |
4 files changed, 124 insertions, 0 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index a50701a88b..b8755c5797 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient | |||
| @@ -227,6 +227,27 @@ def main(): | |||
| 227 | print("New hashes marked: %d" % result["count"]) | 227 | print("New hashes marked: %d" % result["count"]) |
| 228 | return 0 | 228 | return 0 |
| 229 | 229 | ||
| 230 | def handle_gc_mark_stream(args, client): | ||
| 231 | stdin = (l.strip() for l in sys.stdin) | ||
| 232 | marked_hashes = 0 | ||
| 233 | |||
| 234 | try: | ||
| 235 | result = client.gc_mark_stream(args.mark, stdin) | ||
| 236 | marked_hashes = result["count"] | ||
| 237 | except ConnectionError: | ||
| 238 | logger.warning( | ||
| 239 | "Server doesn't seem to support `gc-mark-stream`. Sending " | ||
| 240 | "hashes sequentially using `gc-mark` API." | ||
| 241 | ) | ||
| 242 | for line in stdin: | ||
| 243 | pairs = line.split() | ||
| 244 | condition = dict(zip(pairs[::2], pairs[1::2])) | ||
| 245 | result = client.gc_mark(args.mark, condition) | ||
| 246 | marked_hashes += result["count"] | ||
| 247 | |||
| 248 | print("New hashes marked: %d" % marked_hashes) | ||
| 249 | return 0 | ||
| 250 | |||
| 230 | def handle_gc_sweep(args, client): | 251 | def handle_gc_sweep(args, client): |
| 231 | result = client.gc_sweep(args.mark) | 252 | result = client.gc_sweep(args.mark) |
| 232 | print("Removed %d rows" % result["count"]) | 253 | print("Removed %d rows" % result["count"]) |
| @@ -366,6 +387,16 @@ def main(): | |||
| 366 | help="Keep entries in table where KEY == VALUE") | 387 | help="Keep entries in table where KEY == VALUE") |
| 367 | gc_mark_parser.set_defaults(func=handle_gc_mark) | 388 | gc_mark_parser.set_defaults(func=handle_gc_mark) |
| 368 | 389 | ||
| 390 | gc_mark_parser_stream = subparsers.add_parser( | ||
| 391 | 'gc-mark-stream', | ||
| 392 | help=( | ||
| 393 | "Mark multiple hashes to be retained for garbage collection. Input should be provided via stdin, " | ||
| 394 | "with each line formatted as key-value pairs separated by spaces, for example 'column1 foo column2 bar'." | ||
| 395 | ) | ||
| 396 | ) | ||
| 397 | gc_mark_parser_stream.add_argument("mark", help="Mark for this garbage collection operation") | ||
| 398 | gc_mark_parser_stream.set_defaults(func=handle_gc_mark_stream) | ||
| 399 | |||
| 369 | gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") | 400 | gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") |
| 370 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") | 401 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") |
| 371 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) | 402 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index a510f3284f..8cb18050a6 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -78,6 +78,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 78 | MODE_NORMAL = 0 | 78 | MODE_NORMAL = 0 |
| 79 | MODE_GET_STREAM = 1 | 79 | MODE_GET_STREAM = 1 |
| 80 | MODE_EXIST_STREAM = 2 | 80 | MODE_EXIST_STREAM = 2 |
| 81 | MODE_MARK_STREAM = 3 | ||
| 81 | 82 | ||
| 82 | def __init__(self, username=None, password=None): | 83 | def __init__(self, username=None, password=None): |
| 83 | super().__init__("OEHASHEQUIV", "1.1", logger) | 84 | super().__init__("OEHASHEQUIV", "1.1", logger) |
| @@ -164,6 +165,8 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 164 | await normal_to_stream("get-stream") | 165 | await normal_to_stream("get-stream") |
| 165 | elif new_mode == self.MODE_EXIST_STREAM: | 166 | elif new_mode == self.MODE_EXIST_STREAM: |
| 166 | await normal_to_stream("exists-stream") | 167 | await normal_to_stream("exists-stream") |
| 168 | elif new_mode == self.MODE_MARK_STREAM: | ||
| 169 | await normal_to_stream("gc-mark-stream") | ||
| 167 | elif new_mode != self.MODE_NORMAL: | 170 | elif new_mode != self.MODE_NORMAL: |
| 168 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") | 171 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") |
| 169 | 172 | ||
| @@ -306,6 +309,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 306 | """ | 309 | """ |
| 307 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | 310 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) |
| 308 | 311 | ||
| 312 | async def gc_mark_stream(self, mark, rows): | ||
| 313 | """ | ||
| 314 | Similar to `gc-mark`, but accepts a list of "where" key-value pair | ||
| 315 | conditions. It utilizes stream mode to mark hashes, which helps reduce | ||
| 316 | the impact of latency when communicating with the hash equivalence | ||
| 317 | server. | ||
| 318 | """ | ||
| 319 | def row_to_dict(row): | ||
| 320 | pairs = row.split() | ||
| 321 | return dict(zip(pairs[::2], pairs[1::2])) | ||
| 322 | |||
| 323 | responses = await self.send_stream_batch( | ||
| 324 | self.MODE_MARK_STREAM, | ||
| 325 | (json.dumps({"mark": mark, "where": row_to_dict(row)}) for row in rows), | ||
| 326 | ) | ||
| 327 | |||
| 328 | return {"count": sum(int(json.loads(r)["count"]) for r in responses)} | ||
| 329 | |||
| 309 | async def gc_sweep(self, mark): | 330 | async def gc_sweep(self, mark): |
| 310 | """ | 331 | """ |
| 311 | Finishes garbage collection for "mark". All unihash entries that have | 332 | Finishes garbage collection for "mark". All unihash entries that have |
| @@ -351,6 +372,7 @@ class Client(bb.asyncrpc.Client): | |||
| 351 | "get_db_query_columns", | 372 | "get_db_query_columns", |
| 352 | "gc_status", | 373 | "gc_status", |
| 353 | "gc_mark", | 374 | "gc_mark", |
| 375 | "gc_mark_stream", | ||
| 354 | "gc_sweep", | 376 | "gc_sweep", |
| 355 | ) | 377 | ) |
| 356 | 378 | ||
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 68f64f983b..58f95c7bcd 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -10,6 +10,7 @@ import math | |||
| 10 | import time | 10 | import time |
| 11 | import os | 11 | import os |
| 12 | import base64 | 12 | import base64 |
| 13 | import json | ||
| 13 | import hashlib | 14 | import hashlib |
| 14 | from . import create_async_client | 15 | from . import create_async_client |
| 15 | import bb.asyncrpc | 16 | import bb.asyncrpc |
| @@ -256,6 +257,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 256 | "backfill-wait": self.handle_backfill_wait, | 257 | "backfill-wait": self.handle_backfill_wait, |
| 257 | "remove": self.handle_remove, | 258 | "remove": self.handle_remove, |
| 258 | "gc-mark": self.handle_gc_mark, | 259 | "gc-mark": self.handle_gc_mark, |
| 260 | "gc-mark-stream": self.handle_gc_mark_stream, | ||
| 259 | "gc-sweep": self.handle_gc_sweep, | 261 | "gc-sweep": self.handle_gc_sweep, |
| 260 | "gc-status": self.handle_gc_status, | 262 | "gc-status": self.handle_gc_status, |
| 261 | "clean-unused": self.handle_clean_unused, | 263 | "clean-unused": self.handle_clean_unused, |
| @@ -584,6 +586,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 584 | return {"count": await self.db.gc_mark(mark, condition)} | 586 | return {"count": await self.db.gc_mark(mark, condition)} |
| 585 | 587 | ||
| 586 | @permissions(DB_ADMIN_PERM) | 588 | @permissions(DB_ADMIN_PERM) |
| 589 | async def handle_gc_mark_stream(self, request): | ||
| 590 | async def handler(line): | ||
| 591 | try: | ||
| 592 | decoded_line = json.loads(line) | ||
| 593 | except json.JSONDecodeError as exc: | ||
| 594 | raise bb.asyncrpc.InvokeError( | ||
| 595 | "Could not decode JSONL input '%s'" % line | ||
| 596 | ) from exc | ||
| 597 | |||
| 598 | try: | ||
| 599 | mark = decoded_line["mark"] | ||
| 600 | condition = decoded_line["where"] | ||
| 601 | if not isinstance(mark, str): | ||
| 602 | raise TypeError("Bad mark type %s" % type(mark)) | ||
| 603 | |||
| 604 | if not isinstance(condition, dict): | ||
| 605 | raise TypeError("Bad condition type %s" % type(condition)) | ||
| 606 | except KeyError as exc: | ||
| 607 | raise bb.asyncrpc.InvokeError( | ||
| 608 | "Input line is missing key '%s' " % exc | ||
| 609 | ) from exc | ||
| 610 | |||
| 611 | return json.dumps({"count": await self.db.gc_mark(mark, condition)}) | ||
| 612 | |||
| 613 | return await self._stream_handler(handler) | ||
| 614 | |||
| 615 | @permissions(DB_ADMIN_PERM) | ||
| 587 | async def handle_gc_sweep(self, request): | 616 | async def handle_gc_sweep(self, request): |
| 588 | mark = request["mark"] | 617 | mark = request["mark"] |
| 589 | 618 | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 13ccb20ebf..da3f8e0884 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -969,6 +969,48 @@ class HashEquivalenceCommonTests(object): | |||
| 969 | # First hash is still present | 969 | # First hash is still present |
| 970 | self.assertClientGetHash(self.client, taskhash, unihash) | 970 | self.assertClientGetHash(self.client, taskhash, unihash) |
| 971 | 971 | ||
| 972 | def test_gc_stream(self): | ||
| 973 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
| 974 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
| 975 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
| 976 | |||
| 977 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 978 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
| 979 | |||
| 980 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
| 981 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
| 982 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
| 983 | |||
| 984 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
| 985 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 986 | |||
| 987 | taskhash3 = 'a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c0' | ||
| 988 | outhash3 = '7289c414905303700a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c04f9a53c' | ||
| 989 | unihash3 = '905303700a1117c1f5a7c9ab2f5a39cc6fe5e615' | ||
| 990 | |||
| 991 | result = self.client.report_unihash(taskhash3, self.METHOD, outhash3, unihash3) | ||
| 992 | self.assertClientGetHash(self.client, taskhash3, unihash3) | ||
| 993 | |||
| 994 | # Mark the first unihash to be kept | ||
| 995 | ret = self.client.gc_mark_stream("ABC", (f"unihash {h}" for h in [unihash, unihash2])) | ||
| 996 | self.assertEqual(ret, {"count": 2}) | ||
| 997 | |||
| 998 | ret = self.client.gc_status() | ||
| 999 | self.assertEqual(ret, {"mark": "ABC", "keep": 2, "remove": 1}) | ||
| 1000 | |||
| 1001 | # Third hash is still there; mark doesn't delete hashes | ||
| 1002 | self.assertClientGetHash(self.client, taskhash3, unihash3) | ||
| 1003 | |||
| 1004 | ret = self.client.gc_sweep("ABC") | ||
| 1005 | self.assertEqual(ret, {"count": 1}) | ||
| 1006 | |||
| 1007 | # Hash is gone. Taskhash is returned for second hash | ||
| 1008 | self.assertClientGetHash(self.client, taskhash3, None) | ||
| 1009 | # First hash is still present | ||
| 1010 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
| 1011 | # Second hash is still present | ||
| 1012 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 1013 | |||
| 972 | def test_gc_switch_mark(self): | 1014 | def test_gc_switch_mark(self): |
| 973 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | 1015 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' |
| 974 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | 1016 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' |
