summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r--bitbake/lib/hashserv/client.py106
1 files changed, 97 insertions, 9 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 0b254beddd..775faf935a 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -5,6 +5,7 @@
5 5
6import logging 6import logging
7import socket 7import socket
8import asyncio
8import bb.asyncrpc 9import bb.asyncrpc
9import json 10import json
10from . import create_async_client 11from . import create_async_client
@@ -13,6 +14,66 @@ from . import create_async_client
13logger = logging.getLogger("hashserv.client") 14logger = logging.getLogger("hashserv.client")
14 15
15 16
17class Batch(object):
18 def __init__(self):
19 self.done = False
20 self.cond = asyncio.Condition()
21 self.pending = []
22 self.results = []
23 self.sent_count = 0
24
25 async def recv(self, socket):
26 while True:
27 async with self.cond:
28 await self.cond.wait_for(lambda: self.pending or self.done)
29
30 if not self.pending:
31 if self.done:
32 return
33 continue
34
35 r = await socket.recv()
36 self.results.append(r)
37
38 async with self.cond:
39 self.pending.pop(0)
40
41 async def send(self, socket, msgs):
42 try:
43 # In the event of a restart due to a reconnect, all in-flight
44 # messages need to be resent first to keep to result count in sync
45 for m in self.pending:
46 await socket.send(m)
47
48 for m in msgs:
49 # Add the message to the pending list before attempting to send
50 # it so that if the send fails it will be retried
51 async with self.cond:
52 self.pending.append(m)
53 self.cond.notify()
54 self.sent_count += 1
55
56 await socket.send(m)
57
58 finally:
59 async with self.cond:
60 self.done = True
61 self.cond.notify()
62
63 async def process(self, socket, msgs):
64 await asyncio.gather(
65 self.recv(socket),
66 self.send(socket, msgs),
67 )
68
69 if len(self.results) != self.sent_count:
70 raise ValueError(
71 f"Expected result count {len(self.results)}. Expected {self.sent_count}"
72 )
73
74 return self.results
75
76
16class AsyncClient(bb.asyncrpc.AsyncClient): 77class AsyncClient(bb.asyncrpc.AsyncClient):
17 MODE_NORMAL = 0 78 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1 79 MODE_GET_STREAM = 1
@@ -36,11 +97,27 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
36 if become: 97 if become:
37 await self.become_user(become) 98 await self.become_user(become)
38 99
39 async def send_stream(self, mode, msg): 100 async def send_stream_batch(self, mode, msgs):
101 """
102 Does a "batch" process of stream messages. This sends the query
103 messages as fast as possible, and simultaneously attempts to read the
104 messages back. This helps to mitigate the effects of latency to the
105 hash equivalence server be allowing multiple queries to be "in-flight"
106 at once
107
108 The implementation does more complicated tracking using a count of sent
109 messages so that `msgs` can be a generator function (i.e. its length is
110 unknown)
111
112 """
113
114 b = Batch()
115
40 async def proc(): 116 async def proc():
117 nonlocal b
118
41 await self._set_mode(mode) 119 await self._set_mode(mode)
42 await self.socket.send(msg) 120 return await b.process(self.socket, msgs)
43 return await self.socket.recv()
44 121
45 return await self._send_wrapper(proc) 122 return await self._send_wrapper(proc)
46 123
@@ -89,10 +166,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
89 self.mode = new_mode 166 self.mode = new_mode
90 167
91 async def get_unihash(self, method, taskhash): 168 async def get_unihash(self, method, taskhash):
92 r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) 169 r = await self.get_unihash_batch([(method, taskhash)])
93 if not r: 170 return r[0]
94 return None 171
95 return r 172 async def get_unihash_batch(self, args):
173 result = await self.send_stream_batch(
174 self.MODE_GET_STREAM,
175 (f"{method} {taskhash}" for method, taskhash in args),
176 )
177 return [r if r else None for r in result]
96 178
97 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 179 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
98 m = extra.copy() 180 m = extra.copy()
@@ -115,8 +197,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
115 ) 197 )
116 198
117 async def unihash_exists(self, unihash): 199 async def unihash_exists(self, unihash):
118 r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) 200 r = await self.unihash_exists_batch([unihash])
119 return r == "true" 201 return r[0]
202
203 async def unihash_exists_batch(self, unihashes):
204 result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes)
205 return [r == "true" for r in result]
120 206
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 207 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 return await self.invoke( 208 return await self.invoke(
@@ -237,10 +323,12 @@ class Client(bb.asyncrpc.Client):
237 "connect_tcp", 323 "connect_tcp",
238 "connect_websocket", 324 "connect_websocket",
239 "get_unihash", 325 "get_unihash",
326 "get_unihash_batch",
240 "report_unihash", 327 "report_unihash",
241 "report_unihash_equiv", 328 "report_unihash_equiv",
242 "get_taskhash", 329 "get_taskhash",
243 "unihash_exists", 330 "unihash_exists",
331 "unihash_exists_batch",
244 "get_outhash", 332 "get_outhash",
245 "get_stats", 333 "get_stats",
246 "reset_stats", 334 "reset_stats",