diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-05-29 16:47:34 -0600 |
---|---|---|
committer | Steve Sakoman <steve@sakoman.com> | 2024-06-06 06:53:49 -0700 |
commit | 934c533196bbe8d3ae51499f1326f26d94a7095f (patch) | |
tree | 73360854eadaffdd9832dbcfe6579a9597562b34 | |
parent | d1811356b766280553dd84220d3216387f3a0a51 (diff) | |
download | poky-934c533196bbe8d3ae51499f1326f26d94a7095f.tar.gz |
bitbake: hashserv: client: Add batch stream API
Changes the stream mode to do "batch" processing. This means that the
sending and reciving of messages is done simultaneously so that messages
can be sent as fast as possible without having to wait for each reply.
This allows multiple messages to be in flight at once, reducing the
effect of the round trip latency from the server.
(Bitbake rev: f99a17023b972d0d90dccb111f983655af6ccb87)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Signed-off-by: Steve Sakoman <steve@sakoman.com>
-rw-r--r-- | bitbake/lib/hashserv/client.py | 106 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 75 |
2 files changed, 172 insertions, 9 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 0b254beddd..775faf935a 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -5,6 +5,7 @@ | |||
5 | 5 | ||
6 | import logging | 6 | import logging |
7 | import socket | 7 | import socket |
8 | import asyncio | ||
8 | import bb.asyncrpc | 9 | import bb.asyncrpc |
9 | import json | 10 | import json |
10 | from . import create_async_client | 11 | from . import create_async_client |
@@ -13,6 +14,66 @@ from . import create_async_client | |||
13 | logger = logging.getLogger("hashserv.client") | 14 | logger = logging.getLogger("hashserv.client") |
14 | 15 | ||
15 | 16 | ||
17 | class Batch(object): | ||
18 | def __init__(self): | ||
19 | self.done = False | ||
20 | self.cond = asyncio.Condition() | ||
21 | self.pending = [] | ||
22 | self.results = [] | ||
23 | self.sent_count = 0 | ||
24 | |||
25 | async def recv(self, socket): | ||
26 | while True: | ||
27 | async with self.cond: | ||
28 | await self.cond.wait_for(lambda: self.pending or self.done) | ||
29 | |||
30 | if not self.pending: | ||
31 | if self.done: | ||
32 | return | ||
33 | continue | ||
34 | |||
35 | r = await socket.recv() | ||
36 | self.results.append(r) | ||
37 | |||
38 | async with self.cond: | ||
39 | self.pending.pop(0) | ||
40 | |||
41 | async def send(self, socket, msgs): | ||
42 | try: | ||
43 | # In the event of a restart due to a reconnect, all in-flight | ||
44 | # messages need to be resent first to keep to result count in sync | ||
45 | for m in self.pending: | ||
46 | await socket.send(m) | ||
47 | |||
48 | for m in msgs: | ||
49 | # Add the message to the pending list before attempting to send | ||
50 | # it so that if the send fails it will be retried | ||
51 | async with self.cond: | ||
52 | self.pending.append(m) | ||
53 | self.cond.notify() | ||
54 | self.sent_count += 1 | ||
55 | |||
56 | await socket.send(m) | ||
57 | |||
58 | finally: | ||
59 | async with self.cond: | ||
60 | self.done = True | ||
61 | self.cond.notify() | ||
62 | |||
63 | async def process(self, socket, msgs): | ||
64 | await asyncio.gather( | ||
65 | self.recv(socket), | ||
66 | self.send(socket, msgs), | ||
67 | ) | ||
68 | |||
69 | if len(self.results) != self.sent_count: | ||
70 | raise ValueError( | ||
71 | f"Expected result count {len(self.results)}. Expected {self.sent_count}" | ||
72 | ) | ||
73 | |||
74 | return self.results | ||
75 | |||
76 | |||
16 | class AsyncClient(bb.asyncrpc.AsyncClient): | 77 | class AsyncClient(bb.asyncrpc.AsyncClient): |
17 | MODE_NORMAL = 0 | 78 | MODE_NORMAL = 0 |
18 | MODE_GET_STREAM = 1 | 79 | MODE_GET_STREAM = 1 |
@@ -36,11 +97,27 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
36 | if become: | 97 | if become: |
37 | await self.become_user(become) | 98 | await self.become_user(become) |
38 | 99 | ||
39 | async def send_stream(self, mode, msg): | 100 | async def send_stream_batch(self, mode, msgs): |
101 | """ | ||
102 | Does a "batch" process of stream messages. This sends the query | ||
103 | messages as fast as possible, and simultaneously attempts to read the | ||
104 | messages back. This helps to mitigate the effects of latency to the | ||
105 | hash equivalence server be allowing multiple queries to be "in-flight" | ||
106 | at once | ||
107 | |||
108 | The implementation does more complicated tracking using a count of sent | ||
109 | messages so that `msgs` can be a generator function (i.e. its length is | ||
110 | unknown) | ||
111 | |||
112 | """ | ||
113 | |||
114 | b = Batch() | ||
115 | |||
40 | async def proc(): | 116 | async def proc(): |
117 | nonlocal b | ||
118 | |||
41 | await self._set_mode(mode) | 119 | await self._set_mode(mode) |
42 | await self.socket.send(msg) | 120 | return await b.process(self.socket, msgs) |
43 | return await self.socket.recv() | ||
44 | 121 | ||
45 | return await self._send_wrapper(proc) | 122 | return await self._send_wrapper(proc) |
46 | 123 | ||
@@ -89,10 +166,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
89 | self.mode = new_mode | 166 | self.mode = new_mode |
90 | 167 | ||
91 | async def get_unihash(self, method, taskhash): | 168 | async def get_unihash(self, method, taskhash): |
92 | r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) | 169 | r = await self.get_unihash_batch([(method, taskhash)]) |
93 | if not r: | 170 | return r[0] |
94 | return None | 171 | |
95 | return r | 172 | async def get_unihash_batch(self, args): |
173 | result = await self.send_stream_batch( | ||
174 | self.MODE_GET_STREAM, | ||
175 | (f"{method} {taskhash}" for method, taskhash in args), | ||
176 | ) | ||
177 | return [r if r else None for r in result] | ||
96 | 178 | ||
97 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): | 179 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): |
98 | m = extra.copy() | 180 | m = extra.copy() |
@@ -115,8 +197,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
115 | ) | 197 | ) |
116 | 198 | ||
117 | async def unihash_exists(self, unihash): | 199 | async def unihash_exists(self, unihash): |
118 | r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) | 200 | r = await self.unihash_exists_batch([unihash]) |
119 | return r == "true" | 201 | return r[0] |
202 | |||
203 | async def unihash_exists_batch(self, unihashes): | ||
204 | result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes) | ||
205 | return [r == "true" for r in result] | ||
120 | 206 | ||
121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 207 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
122 | return await self.invoke( | 208 | return await self.invoke( |
@@ -237,10 +323,12 @@ class Client(bb.asyncrpc.Client): | |||
237 | "connect_tcp", | 323 | "connect_tcp", |
238 | "connect_websocket", | 324 | "connect_websocket", |
239 | "get_unihash", | 325 | "get_unihash", |
326 | "get_unihash_batch", | ||
240 | "report_unihash", | 327 | "report_unihash", |
241 | "report_unihash_equiv", | 328 | "report_unihash_equiv", |
242 | "get_taskhash", | 329 | "get_taskhash", |
243 | "unihash_exists", | 330 | "unihash_exists", |
331 | "unihash_exists_batch", | ||
244 | "get_outhash", | 332 | "get_outhash", |
245 | "get_stats", | 333 | "get_stats", |
246 | "reset_stats", | 334 | "reset_stats", |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 0809453cf8..5349cd5867 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -594,6 +594,43 @@ class HashEquivalenceCommonTests(object): | |||
594 | 7: None, | 594 | 7: None, |
595 | }) | 595 | }) |
596 | 596 | ||
597 | def test_get_unihash_batch(self): | ||
598 | TEST_INPUT = ( | ||
599 | # taskhash outhash unihash | ||
600 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
601 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
602 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
603 | # Equivalent hash | ||
604 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
605 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
606 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
607 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
608 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
609 | ) | ||
610 | EXTRA_QUERIES = ( | ||
611 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
612 | ) | ||
613 | |||
614 | for taskhash, outhash, unihash in TEST_INPUT: | ||
615 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
616 | |||
617 | |||
618 | result = self.client.get_unihash_batch( | ||
619 | [(self.METHOD, data[0]) for data in TEST_INPUT] + | ||
620 | [(self.METHOD, e) for e in EXTRA_QUERIES] | ||
621 | ) | ||
622 | |||
623 | self.assertListEqual(result, [ | ||
624 | "218e57509998197d570e2c98512d0105985dffc9", | ||
625 | "218e57509998197d570e2c98512d0105985dffc9", | ||
626 | "218e57509998197d570e2c98512d0105985dffc9", | ||
627 | "3b5d3d83f07f259e9086fcb422c855286e18a57d", | ||
628 | "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
629 | "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
630 | "05d2a63c81e32f0a36542ca677e8ad852365c538", | ||
631 | None, | ||
632 | ]) | ||
633 | |||
597 | def test_client_pool_unihash_exists(self): | 634 | def test_client_pool_unihash_exists(self): |
598 | TEST_INPUT = ( | 635 | TEST_INPUT = ( |
599 | # taskhash outhash unihash | 636 | # taskhash outhash unihash |
@@ -636,6 +673,44 @@ class HashEquivalenceCommonTests(object): | |||
636 | result = client_pool.unihashes_exist(query) | 673 | result = client_pool.unihashes_exist(query) |
637 | self.assertDictEqual(result, expected) | 674 | self.assertDictEqual(result, expected) |
638 | 675 | ||
676 | def test_unihash_exists_batch(self): | ||
677 | TEST_INPUT = ( | ||
678 | # taskhash outhash unihash | ||
679 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
680 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
681 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
682 | # Equivalent hash | ||
683 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
684 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
685 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
686 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
687 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
688 | ) | ||
689 | EXTRA_QUERIES = ( | ||
690 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
691 | ) | ||
692 | |||
693 | result_unihashes = set() | ||
694 | |||
695 | |||
696 | for taskhash, outhash, unihash in TEST_INPUT: | ||
697 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
698 | result_unihashes.add(result["unihash"]) | ||
699 | |||
700 | query = [] | ||
701 | expected = [] | ||
702 | |||
703 | for _, _, unihash in TEST_INPUT: | ||
704 | query.append(unihash) | ||
705 | expected.append(unihash in result_unihashes) | ||
706 | |||
707 | |||
708 | for unihash in EXTRA_QUERIES: | ||
709 | query.append(unihash) | ||
710 | expected.append(False) | ||
711 | |||
712 | result = self.client.unihash_exists_batch(query) | ||
713 | self.assertListEqual(result, expected) | ||
639 | 714 | ||
640 | def test_auth_read_perms(self): | 715 | def test_auth_read_perms(self): |
641 | admin_client = self.start_auth_server() | 716 | admin_client = self.start_auth_server() |