summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-05-29 16:47:34 -0600
committerSteve Sakoman <steve@sakoman.com>2024-06-06 06:53:49 -0700
commit934c533196bbe8d3ae51499f1326f26d94a7095f (patch)
tree73360854eadaffdd9832dbcfe6579a9597562b34
parentd1811356b766280553dd84220d3216387f3a0a51 (diff)
downloadpoky-934c533196bbe8d3ae51499f1326f26d94a7095f.tar.gz
bitbake: hashserv: client: Add batch stream API
Changes the stream mode to do "batch" processing. This means that the sending and reciving of messages is done simultaneously so that messages can be sent as fast as possible without having to wait for each reply. This allows multiple messages to be in flight at once, reducing the effect of the round trip latency from the server. (Bitbake rev: f99a17023b972d0d90dccb111f983655af6ccb87) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org> Signed-off-by: Steve Sakoman <steve@sakoman.com>
-rw-r--r--bitbake/lib/hashserv/client.py106
-rw-r--r--bitbake/lib/hashserv/tests.py75
2 files changed, 172 insertions, 9 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 0b254beddd..775faf935a 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -5,6 +5,7 @@
5 5
6import logging 6import logging
7import socket 7import socket
8import asyncio
8import bb.asyncrpc 9import bb.asyncrpc
9import json 10import json
10from . import create_async_client 11from . import create_async_client
@@ -13,6 +14,66 @@ from . import create_async_client
13logger = logging.getLogger("hashserv.client") 14logger = logging.getLogger("hashserv.client")
14 15
15 16
17class Batch(object):
18 def __init__(self):
19 self.done = False
20 self.cond = asyncio.Condition()
21 self.pending = []
22 self.results = []
23 self.sent_count = 0
24
25 async def recv(self, socket):
26 while True:
27 async with self.cond:
28 await self.cond.wait_for(lambda: self.pending or self.done)
29
30 if not self.pending:
31 if self.done:
32 return
33 continue
34
35 r = await socket.recv()
36 self.results.append(r)
37
38 async with self.cond:
39 self.pending.pop(0)
40
41 async def send(self, socket, msgs):
42 try:
43 # In the event of a restart due to a reconnect, all in-flight
44 # messages need to be resent first to keep to result count in sync
45 for m in self.pending:
46 await socket.send(m)
47
48 for m in msgs:
49 # Add the message to the pending list before attempting to send
50 # it so that if the send fails it will be retried
51 async with self.cond:
52 self.pending.append(m)
53 self.cond.notify()
54 self.sent_count += 1
55
56 await socket.send(m)
57
58 finally:
59 async with self.cond:
60 self.done = True
61 self.cond.notify()
62
63 async def process(self, socket, msgs):
64 await asyncio.gather(
65 self.recv(socket),
66 self.send(socket, msgs),
67 )
68
69 if len(self.results) != self.sent_count:
70 raise ValueError(
71 f"Expected result count {len(self.results)}. Expected {self.sent_count}"
72 )
73
74 return self.results
75
76
16class AsyncClient(bb.asyncrpc.AsyncClient): 77class AsyncClient(bb.asyncrpc.AsyncClient):
17 MODE_NORMAL = 0 78 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1 79 MODE_GET_STREAM = 1
@@ -36,11 +97,27 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
36 if become: 97 if become:
37 await self.become_user(become) 98 await self.become_user(become)
38 99
39 async def send_stream(self, mode, msg): 100 async def send_stream_batch(self, mode, msgs):
101 """
102 Does a "batch" process of stream messages. This sends the query
103 messages as fast as possible, and simultaneously attempts to read the
104 messages back. This helps to mitigate the effects of latency to the
105 hash equivalence server be allowing multiple queries to be "in-flight"
106 at once
107
108 The implementation does more complicated tracking using a count of sent
109 messages so that `msgs` can be a generator function (i.e. its length is
110 unknown)
111
112 """
113
114 b = Batch()
115
40 async def proc(): 116 async def proc():
117 nonlocal b
118
41 await self._set_mode(mode) 119 await self._set_mode(mode)
42 await self.socket.send(msg) 120 return await b.process(self.socket, msgs)
43 return await self.socket.recv()
44 121
45 return await self._send_wrapper(proc) 122 return await self._send_wrapper(proc)
46 123
@@ -89,10 +166,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
89 self.mode = new_mode 166 self.mode = new_mode
90 167
91 async def get_unihash(self, method, taskhash): 168 async def get_unihash(self, method, taskhash):
92 r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) 169 r = await self.get_unihash_batch([(method, taskhash)])
93 if not r: 170 return r[0]
94 return None 171
95 return r 172 async def get_unihash_batch(self, args):
173 result = await self.send_stream_batch(
174 self.MODE_GET_STREAM,
175 (f"{method} {taskhash}" for method, taskhash in args),
176 )
177 return [r if r else None for r in result]
96 178
97 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 179 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
98 m = extra.copy() 180 m = extra.copy()
@@ -115,8 +197,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
115 ) 197 )
116 198
117 async def unihash_exists(self, unihash): 199 async def unihash_exists(self, unihash):
118 r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) 200 r = await self.unihash_exists_batch([unihash])
119 return r == "true" 201 return r[0]
202
203 async def unihash_exists_batch(self, unihashes):
204 result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes)
205 return [r == "true" for r in result]
120 206
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 207 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 return await self.invoke( 208 return await self.invoke(
@@ -237,10 +323,12 @@ class Client(bb.asyncrpc.Client):
237 "connect_tcp", 323 "connect_tcp",
238 "connect_websocket", 324 "connect_websocket",
239 "get_unihash", 325 "get_unihash",
326 "get_unihash_batch",
240 "report_unihash", 327 "report_unihash",
241 "report_unihash_equiv", 328 "report_unihash_equiv",
242 "get_taskhash", 329 "get_taskhash",
243 "unihash_exists", 330 "unihash_exists",
331 "unihash_exists_batch",
244 "get_outhash", 332 "get_outhash",
245 "get_stats", 333 "get_stats",
246 "reset_stats", 334 "reset_stats",
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 0809453cf8..5349cd5867 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -594,6 +594,43 @@ class HashEquivalenceCommonTests(object):
594 7: None, 594 7: None,
595 }) 595 })
596 596
597 def test_get_unihash_batch(self):
598 TEST_INPUT = (
599 # taskhash outhash unihash
600 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
601 # Duplicated taskhash with multiple output hashes and unihashes.
602 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
603 # Equivalent hash
604 ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
605 ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
606 ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
607 ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
608 ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
609 )
610 EXTRA_QUERIES = (
611 "6b6be7a84ab179b4240c4302518dc3f6",
612 )
613
614 for taskhash, outhash, unihash in TEST_INPUT:
615 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
616
617
618 result = self.client.get_unihash_batch(
619 [(self.METHOD, data[0]) for data in TEST_INPUT] +
620 [(self.METHOD, e) for e in EXTRA_QUERIES]
621 )
622
623 self.assertListEqual(result, [
624 "218e57509998197d570e2c98512d0105985dffc9",
625 "218e57509998197d570e2c98512d0105985dffc9",
626 "218e57509998197d570e2c98512d0105985dffc9",
627 "3b5d3d83f07f259e9086fcb422c855286e18a57d",
628 "f46d3fbb439bd9b921095da657a4de906510d2cd",
629 "f46d3fbb439bd9b921095da657a4de906510d2cd",
630 "05d2a63c81e32f0a36542ca677e8ad852365c538",
631 None,
632 ])
633
597 def test_client_pool_unihash_exists(self): 634 def test_client_pool_unihash_exists(self):
598 TEST_INPUT = ( 635 TEST_INPUT = (
599 # taskhash outhash unihash 636 # taskhash outhash unihash
@@ -636,6 +673,44 @@ class HashEquivalenceCommonTests(object):
636 result = client_pool.unihashes_exist(query) 673 result = client_pool.unihashes_exist(query)
637 self.assertDictEqual(result, expected) 674 self.assertDictEqual(result, expected)
638 675
676 def test_unihash_exists_batch(self):
677 TEST_INPUT = (
678 # taskhash outhash unihash
679 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
680 # Duplicated taskhash with multiple output hashes and unihashes.
681 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
682 # Equivalent hash
683 ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
684 ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
685 ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
686 ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
687 ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
688 )
689 EXTRA_QUERIES = (
690 "6b6be7a84ab179b4240c4302518dc3f6",
691 )
692
693 result_unihashes = set()
694
695
696 for taskhash, outhash, unihash in TEST_INPUT:
697 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
698 result_unihashes.add(result["unihash"])
699
700 query = []
701 expected = []
702
703 for _, _, unihash in TEST_INPUT:
704 query.append(unihash)
705 expected.append(unihash in result_unihashes)
706
707
708 for unihash in EXTRA_QUERIES:
709 query.append(unihash)
710 expected.append(False)
711
712 result = self.client.unihash_exists_batch(query)
713 self.assertListEqual(result, expected)
639 714
640 def test_auth_read_perms(self): 715 def test_auth_read_perms(self):
641 admin_client = self.start_auth_server() 716 admin_client = self.start_auth_server()