diff options
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 106 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 75 |
2 files changed, 172 insertions, 9 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 0b254beddd..775faf935a 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -5,6 +5,7 @@ | |||
| 5 | 5 | ||
| 6 | import logging | 6 | import logging |
| 7 | import socket | 7 | import socket |
| 8 | import asyncio | ||
| 8 | import bb.asyncrpc | 9 | import bb.asyncrpc |
| 9 | import json | 10 | import json |
| 10 | from . import create_async_client | 11 | from . import create_async_client |
| @@ -13,6 +14,66 @@ from . import create_async_client | |||
| 13 | logger = logging.getLogger("hashserv.client") | 14 | logger = logging.getLogger("hashserv.client") |
| 14 | 15 | ||
| 15 | 16 | ||
| 17 | class Batch(object): | ||
| 18 | def __init__(self): | ||
| 19 | self.done = False | ||
| 20 | self.cond = asyncio.Condition() | ||
| 21 | self.pending = [] | ||
| 22 | self.results = [] | ||
| 23 | self.sent_count = 0 | ||
| 24 | |||
| 25 | async def recv(self, socket): | ||
| 26 | while True: | ||
| 27 | async with self.cond: | ||
| 28 | await self.cond.wait_for(lambda: self.pending or self.done) | ||
| 29 | |||
| 30 | if not self.pending: | ||
| 31 | if self.done: | ||
| 32 | return | ||
| 33 | continue | ||
| 34 | |||
| 35 | r = await socket.recv() | ||
| 36 | self.results.append(r) | ||
| 37 | |||
| 38 | async with self.cond: | ||
| 39 | self.pending.pop(0) | ||
| 40 | |||
| 41 | async def send(self, socket, msgs): | ||
| 42 | try: | ||
| 43 | # In the event of a restart due to a reconnect, all in-flight | ||
| 44 | # messages need to be resent first to keep to result count in sync | ||
| 45 | for m in self.pending: | ||
| 46 | await socket.send(m) | ||
| 47 | |||
| 48 | for m in msgs: | ||
| 49 | # Add the message to the pending list before attempting to send | ||
| 50 | # it so that if the send fails it will be retried | ||
| 51 | async with self.cond: | ||
| 52 | self.pending.append(m) | ||
| 53 | self.cond.notify() | ||
| 54 | self.sent_count += 1 | ||
| 55 | |||
| 56 | await socket.send(m) | ||
| 57 | |||
| 58 | finally: | ||
| 59 | async with self.cond: | ||
| 60 | self.done = True | ||
| 61 | self.cond.notify() | ||
| 62 | |||
| 63 | async def process(self, socket, msgs): | ||
| 64 | await asyncio.gather( | ||
| 65 | self.recv(socket), | ||
| 66 | self.send(socket, msgs), | ||
| 67 | ) | ||
| 68 | |||
| 69 | if len(self.results) != self.sent_count: | ||
| 70 | raise ValueError( | ||
| 71 | f"Expected result count {len(self.results)}. Expected {self.sent_count}" | ||
| 72 | ) | ||
| 73 | |||
| 74 | return self.results | ||
| 75 | |||
| 76 | |||
| 16 | class AsyncClient(bb.asyncrpc.AsyncClient): | 77 | class AsyncClient(bb.asyncrpc.AsyncClient): |
| 17 | MODE_NORMAL = 0 | 78 | MODE_NORMAL = 0 |
| 18 | MODE_GET_STREAM = 1 | 79 | MODE_GET_STREAM = 1 |
| @@ -36,11 +97,27 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 36 | if become: | 97 | if become: |
| 37 | await self.become_user(become) | 98 | await self.become_user(become) |
| 38 | 99 | ||
| 39 | async def send_stream(self, mode, msg): | 100 | async def send_stream_batch(self, mode, msgs): |
| 101 | """ | ||
| 102 | Does a "batch" process of stream messages. This sends the query | ||
| 103 | messages as fast as possible, and simultaneously attempts to read the | ||
| 104 | messages back. This helps to mitigate the effects of latency to the | ||
| 105 | hash equivalence server be allowing multiple queries to be "in-flight" | ||
| 106 | at once | ||
| 107 | |||
| 108 | The implementation does more complicated tracking using a count of sent | ||
| 109 | messages so that `msgs` can be a generator function (i.e. its length is | ||
| 110 | unknown) | ||
| 111 | |||
| 112 | """ | ||
| 113 | |||
| 114 | b = Batch() | ||
| 115 | |||
| 40 | async def proc(): | 116 | async def proc(): |
| 117 | nonlocal b | ||
| 118 | |||
| 41 | await self._set_mode(mode) | 119 | await self._set_mode(mode) |
| 42 | await self.socket.send(msg) | 120 | return await b.process(self.socket, msgs) |
| 43 | return await self.socket.recv() | ||
| 44 | 121 | ||
| 45 | return await self._send_wrapper(proc) | 122 | return await self._send_wrapper(proc) |
| 46 | 123 | ||
| @@ -89,10 +166,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 89 | self.mode = new_mode | 166 | self.mode = new_mode |
| 90 | 167 | ||
| 91 | async def get_unihash(self, method, taskhash): | 168 | async def get_unihash(self, method, taskhash): |
| 92 | r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) | 169 | r = await self.get_unihash_batch([(method, taskhash)]) |
| 93 | if not r: | 170 | return r[0] |
| 94 | return None | 171 | |
| 95 | return r | 172 | async def get_unihash_batch(self, args): |
| 173 | result = await self.send_stream_batch( | ||
| 174 | self.MODE_GET_STREAM, | ||
| 175 | (f"{method} {taskhash}" for method, taskhash in args), | ||
| 176 | ) | ||
| 177 | return [r if r else None for r in result] | ||
| 96 | 178 | ||
| 97 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): | 179 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): |
| 98 | m = extra.copy() | 180 | m = extra.copy() |
| @@ -115,8 +197,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 115 | ) | 197 | ) |
| 116 | 198 | ||
| 117 | async def unihash_exists(self, unihash): | 199 | async def unihash_exists(self, unihash): |
| 118 | r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) | 200 | r = await self.unihash_exists_batch([unihash]) |
| 119 | return r == "true" | 201 | return r[0] |
| 202 | |||
| 203 | async def unihash_exists_batch(self, unihashes): | ||
| 204 | result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes) | ||
| 205 | return [r == "true" for r in result] | ||
| 120 | 206 | ||
| 121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 207 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
| 122 | return await self.invoke( | 208 | return await self.invoke( |
| @@ -237,10 +323,12 @@ class Client(bb.asyncrpc.Client): | |||
| 237 | "connect_tcp", | 323 | "connect_tcp", |
| 238 | "connect_websocket", | 324 | "connect_websocket", |
| 239 | "get_unihash", | 325 | "get_unihash", |
| 326 | "get_unihash_batch", | ||
| 240 | "report_unihash", | 327 | "report_unihash", |
| 241 | "report_unihash_equiv", | 328 | "report_unihash_equiv", |
| 242 | "get_taskhash", | 329 | "get_taskhash", |
| 243 | "unihash_exists", | 330 | "unihash_exists", |
| 331 | "unihash_exists_batch", | ||
| 244 | "get_outhash", | 332 | "get_outhash", |
| 245 | "get_stats", | 333 | "get_stats", |
| 246 | "reset_stats", | 334 | "reset_stats", |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 0809453cf8..5349cd5867 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -594,6 +594,43 @@ class HashEquivalenceCommonTests(object): | |||
| 594 | 7: None, | 594 | 7: None, |
| 595 | }) | 595 | }) |
| 596 | 596 | ||
| 597 | def test_get_unihash_batch(self): | ||
| 598 | TEST_INPUT = ( | ||
| 599 | # taskhash outhash unihash | ||
| 600 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
| 601 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
| 602 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
| 603 | # Equivalent hash | ||
| 604 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
| 605 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
| 606 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
| 607 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
| 608 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
| 609 | ) | ||
| 610 | EXTRA_QUERIES = ( | ||
| 611 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
| 612 | ) | ||
| 613 | |||
| 614 | for taskhash, outhash, unihash in TEST_INPUT: | ||
| 615 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 616 | |||
| 617 | |||
| 618 | result = self.client.get_unihash_batch( | ||
| 619 | [(self.METHOD, data[0]) for data in TEST_INPUT] + | ||
| 620 | [(self.METHOD, e) for e in EXTRA_QUERIES] | ||
| 621 | ) | ||
| 622 | |||
| 623 | self.assertListEqual(result, [ | ||
| 624 | "218e57509998197d570e2c98512d0105985dffc9", | ||
| 625 | "218e57509998197d570e2c98512d0105985dffc9", | ||
| 626 | "218e57509998197d570e2c98512d0105985dffc9", | ||
| 627 | "3b5d3d83f07f259e9086fcb422c855286e18a57d", | ||
| 628 | "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
| 629 | "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
| 630 | "05d2a63c81e32f0a36542ca677e8ad852365c538", | ||
| 631 | None, | ||
| 632 | ]) | ||
| 633 | |||
| 597 | def test_client_pool_unihash_exists(self): | 634 | def test_client_pool_unihash_exists(self): |
| 598 | TEST_INPUT = ( | 635 | TEST_INPUT = ( |
| 599 | # taskhash outhash unihash | 636 | # taskhash outhash unihash |
| @@ -636,6 +673,44 @@ class HashEquivalenceCommonTests(object): | |||
| 636 | result = client_pool.unihashes_exist(query) | 673 | result = client_pool.unihashes_exist(query) |
| 637 | self.assertDictEqual(result, expected) | 674 | self.assertDictEqual(result, expected) |
| 638 | 675 | ||
| 676 | def test_unihash_exists_batch(self): | ||
| 677 | TEST_INPUT = ( | ||
| 678 | # taskhash outhash unihash | ||
| 679 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
| 680 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
| 681 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
| 682 | # Equivalent hash | ||
| 683 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
| 684 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
| 685 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
| 686 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
| 687 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
| 688 | ) | ||
| 689 | EXTRA_QUERIES = ( | ||
| 690 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
| 691 | ) | ||
| 692 | |||
| 693 | result_unihashes = set() | ||
| 694 | |||
| 695 | |||
| 696 | for taskhash, outhash, unihash in TEST_INPUT: | ||
| 697 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 698 | result_unihashes.add(result["unihash"]) | ||
| 699 | |||
| 700 | query = [] | ||
| 701 | expected = [] | ||
| 702 | |||
| 703 | for _, _, unihash in TEST_INPUT: | ||
| 704 | query.append(unihash) | ||
| 705 | expected.append(unihash in result_unihashes) | ||
| 706 | |||
| 707 | |||
| 708 | for unihash in EXTRA_QUERIES: | ||
| 709 | query.append(unihash) | ||
| 710 | expected.append(False) | ||
| 711 | |||
| 712 | result = self.client.unihash_exists_batch(query) | ||
| 713 | self.assertListEqual(result, expected) | ||
| 639 | 714 | ||
| 640 | def test_auth_read_perms(self): | 715 | def test_auth_read_perms(self): |
| 641 | admin_client = self.start_auth_server() | 716 | admin_client = self.start_auth_server() |
