diff options
Diffstat (limited to 'bitbake/lib/hashserv')
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 31 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 227 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 78 |
3 files changed, 148 insertions, 188 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 552a33278f..ac891e0174 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -5,39 +5,15 @@ | |||
5 | 5 | ||
6 | import asyncio | 6 | import asyncio |
7 | from contextlib import closing | 7 | from contextlib import closing |
8 | import re | ||
9 | import itertools | 8 | import itertools |
10 | import json | 9 | import json |
11 | from collections import namedtuple | 10 | from collections import namedtuple |
12 | from urllib.parse import urlparse | 11 | from urllib.parse import urlparse |
13 | 12 | from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS | |
14 | UNIX_PREFIX = "unix://" | ||
15 | WS_PREFIX = "ws://" | ||
16 | WSS_PREFIX = "wss://" | ||
17 | |||
18 | ADDR_TYPE_UNIX = 0 | ||
19 | ADDR_TYPE_TCP = 1 | ||
20 | ADDR_TYPE_WS = 2 | ||
21 | 13 | ||
22 | User = namedtuple("User", ("username", "permissions")) | 14 | User = namedtuple("User", ("username", "permissions")) |
23 | 15 | ||
24 | 16 | ||
25 | def parse_address(addr): | ||
26 | if addr.startswith(UNIX_PREFIX): | ||
27 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) | ||
28 | elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): | ||
29 | return (ADDR_TYPE_WS, (addr,)) | ||
30 | else: | ||
31 | m = re.match(r"\[(?P<host>[^\]]*)\]:(?P<port>\d+)$", addr) | ||
32 | if m is not None: | ||
33 | host = m.group("host") | ||
34 | port = m.group("port") | ||
35 | else: | ||
36 | host, port = addr.split(":") | ||
37 | |||
38 | return (ADDR_TYPE_TCP, (host, int(port))) | ||
39 | |||
40 | |||
41 | def create_server( | 17 | def create_server( |
42 | addr, | 18 | addr, |
43 | dbname, | 19 | dbname, |
@@ -50,6 +26,7 @@ def create_server( | |||
50 | anon_perms=None, | 26 | anon_perms=None, |
51 | admin_username=None, | 27 | admin_username=None, |
52 | admin_password=None, | 28 | admin_password=None, |
29 | reuseport=False, | ||
53 | ): | 30 | ): |
54 | def sqlite_engine(): | 31 | def sqlite_engine(): |
55 | from .sqlite import DatabaseEngine | 32 | from .sqlite import DatabaseEngine |
@@ -85,9 +62,9 @@ def create_server( | |||
85 | s.start_unix_server(*a) | 62 | s.start_unix_server(*a) |
86 | elif typ == ADDR_TYPE_WS: | 63 | elif typ == ADDR_TYPE_WS: |
87 | url = urlparse(a[0]) | 64 | url = urlparse(a[0]) |
88 | s.start_websocket_server(url.hostname, url.port) | 65 | s.start_websocket_server(url.hostname, url.port, reuseport=reuseport) |
89 | else: | 66 | else: |
90 | s.start_tcp_server(*a) | 67 | s.start_tcp_server(*a, reuseport=reuseport) |
91 | 68 | ||
92 | return s | 69 | return s |
93 | 70 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index b269879ecf..a510f3284f 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -5,6 +5,7 @@ | |||
5 | 5 | ||
6 | import logging | 6 | import logging |
7 | import socket | 7 | import socket |
8 | import asyncio | ||
8 | import bb.asyncrpc | 9 | import bb.asyncrpc |
9 | import json | 10 | import json |
10 | from . import create_async_client | 11 | from . import create_async_client |
@@ -13,6 +14,66 @@ from . import create_async_client | |||
13 | logger = logging.getLogger("hashserv.client") | 14 | logger = logging.getLogger("hashserv.client") |
14 | 15 | ||
15 | 16 | ||
17 | class Batch(object): | ||
18 | def __init__(self): | ||
19 | self.done = False | ||
20 | self.cond = asyncio.Condition() | ||
21 | self.pending = [] | ||
22 | self.results = [] | ||
23 | self.sent_count = 0 | ||
24 | |||
25 | async def recv(self, socket): | ||
26 | while True: | ||
27 | async with self.cond: | ||
28 | await self.cond.wait_for(lambda: self.pending or self.done) | ||
29 | |||
30 | if not self.pending: | ||
31 | if self.done: | ||
32 | return | ||
33 | continue | ||
34 | |||
35 | r = await socket.recv() | ||
36 | self.results.append(r) | ||
37 | |||
38 | async with self.cond: | ||
39 | self.pending.pop(0) | ||
40 | |||
41 | async def send(self, socket, msgs): | ||
42 | try: | ||
43 | # In the event of a restart due to a reconnect, all in-flight | ||
44 | # messages need to be resent first to keep to result count in sync | ||
45 | for m in self.pending: | ||
46 | await socket.send(m) | ||
47 | |||
48 | for m in msgs: | ||
49 | # Add the message to the pending list before attempting to send | ||
50 | # it so that if the send fails it will be retried | ||
51 | async with self.cond: | ||
52 | self.pending.append(m) | ||
53 | self.cond.notify() | ||
54 | self.sent_count += 1 | ||
55 | |||
56 | await socket.send(m) | ||
57 | |||
58 | finally: | ||
59 | async with self.cond: | ||
60 | self.done = True | ||
61 | self.cond.notify() | ||
62 | |||
63 | async def process(self, socket, msgs): | ||
64 | await asyncio.gather( | ||
65 | self.recv(socket), | ||
66 | self.send(socket, msgs), | ||
67 | ) | ||
68 | |||
69 | if len(self.results) != self.sent_count: | ||
70 | raise ValueError( | ||
71 | f"Expected result count {len(self.results)}. Expected {self.sent_count}" | ||
72 | ) | ||
73 | |||
74 | return self.results | ||
75 | |||
76 | |||
16 | class AsyncClient(bb.asyncrpc.AsyncClient): | 77 | class AsyncClient(bb.asyncrpc.AsyncClient): |
17 | MODE_NORMAL = 0 | 78 | MODE_NORMAL = 0 |
18 | MODE_GET_STREAM = 1 | 79 | MODE_GET_STREAM = 1 |
@@ -27,9 +88,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
27 | 88 | ||
28 | async def setup_connection(self): | 89 | async def setup_connection(self): |
29 | await super().setup_connection() | 90 | await super().setup_connection() |
30 | cur_mode = self.mode | ||
31 | self.mode = self.MODE_NORMAL | 91 | self.mode = self.MODE_NORMAL |
32 | await self._set_mode(cur_mode) | ||
33 | if self.username: | 92 | if self.username: |
34 | # Save off become user temporarily because auth() resets it | 93 | # Save off become user temporarily because auth() resets it |
35 | become = self.saved_become_user | 94 | become = self.saved_become_user |
@@ -38,25 +97,52 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
38 | if become: | 97 | if become: |
39 | await self.become_user(become) | 98 | await self.become_user(become) |
40 | 99 | ||
41 | async def send_stream(self, msg): | 100 | async def send_stream_batch(self, mode, msgs): |
101 | """ | ||
102 | Does a "batch" process of stream messages. This sends the query | ||
103 | messages as fast as possible, and simultaneously attempts to read the | ||
104 | messages back. This helps to mitigate the effects of latency to the | ||
105 | hash equivalence server be allowing multiple queries to be "in-flight" | ||
106 | at once | ||
107 | |||
108 | The implementation does more complicated tracking using a count of sent | ||
109 | messages so that `msgs` can be a generator function (i.e. its length is | ||
110 | unknown) | ||
111 | |||
112 | """ | ||
113 | |||
114 | b = Batch() | ||
115 | |||
42 | async def proc(): | 116 | async def proc(): |
43 | await self.socket.send(msg) | 117 | nonlocal b |
44 | return await self.socket.recv() | 118 | |
119 | await self._set_mode(mode) | ||
120 | return await b.process(self.socket, msgs) | ||
45 | 121 | ||
46 | return await self._send_wrapper(proc) | 122 | return await self._send_wrapper(proc) |
47 | 123 | ||
124 | async def invoke(self, *args, skip_mode=False, **kwargs): | ||
125 | # It's OK if connection errors cause a failure here, because the mode | ||
126 | # is also reset to normal on a new connection | ||
127 | if not skip_mode: | ||
128 | await self._set_mode(self.MODE_NORMAL) | ||
129 | return await super().invoke(*args, **kwargs) | ||
130 | |||
48 | async def _set_mode(self, new_mode): | 131 | async def _set_mode(self, new_mode): |
49 | async def stream_to_normal(): | 132 | async def stream_to_normal(): |
133 | # Check if already in normal mode (e.g. due to a connection reset) | ||
134 | if self.mode == self.MODE_NORMAL: | ||
135 | return "ok" | ||
50 | await self.socket.send("END") | 136 | await self.socket.send("END") |
51 | return await self.socket.recv() | 137 | return await self.socket.recv() |
52 | 138 | ||
53 | async def normal_to_stream(command): | 139 | async def normal_to_stream(command): |
54 | r = await self.invoke({command: None}) | 140 | r = await self.invoke({command: None}, skip_mode=True) |
55 | if r != "ok": | 141 | if r != "ok": |
142 | self.check_invoke_error(r) | ||
56 | raise ConnectionError( | 143 | raise ConnectionError( |
57 | f"Unable to transition to stream mode: Bad response from server {r!r}" | 144 | f"Unable to transition to stream mode: Bad response from server {r!r}" |
58 | ) | 145 | ) |
59 | |||
60 | self.logger.debug("Mode is now %s", command) | 146 | self.logger.debug("Mode is now %s", command) |
61 | 147 | ||
62 | if new_mode == self.mode: | 148 | if new_mode == self.mode: |
@@ -84,14 +170,17 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
84 | self.mode = new_mode | 170 | self.mode = new_mode |
85 | 171 | ||
86 | async def get_unihash(self, method, taskhash): | 172 | async def get_unihash(self, method, taskhash): |
87 | await self._set_mode(self.MODE_GET_STREAM) | 173 | r = await self.get_unihash_batch([(method, taskhash)]) |
88 | r = await self.send_stream("%s %s" % (method, taskhash)) | 174 | return r[0] |
89 | if not r: | 175 | |
90 | return None | 176 | async def get_unihash_batch(self, args): |
91 | return r | 177 | result = await self.send_stream_batch( |
178 | self.MODE_GET_STREAM, | ||
179 | (f"{method} {taskhash}" for method, taskhash in args), | ||
180 | ) | ||
181 | return [r if r else None for r in result] | ||
92 | 182 | ||
93 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): | 183 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): |
94 | await self._set_mode(self.MODE_NORMAL) | ||
95 | m = extra.copy() | 184 | m = extra.copy() |
96 | m["taskhash"] = taskhash | 185 | m["taskhash"] = taskhash |
97 | m["method"] = method | 186 | m["method"] = method |
@@ -100,7 +189,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
100 | return await self.invoke({"report": m}) | 189 | return await self.invoke({"report": m}) |
101 | 190 | ||
102 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 191 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
103 | await self._set_mode(self.MODE_NORMAL) | ||
104 | m = extra.copy() | 192 | m = extra.copy() |
105 | m["taskhash"] = taskhash | 193 | m["taskhash"] = taskhash |
106 | m["method"] = method | 194 | m["method"] = method |
@@ -108,18 +196,19 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
108 | return await self.invoke({"report-equiv": m}) | 196 | return await self.invoke({"report-equiv": m}) |
109 | 197 | ||
110 | async def get_taskhash(self, method, taskhash, all_properties=False): | 198 | async def get_taskhash(self, method, taskhash, all_properties=False): |
111 | await self._set_mode(self.MODE_NORMAL) | ||
112 | return await self.invoke( | 199 | return await self.invoke( |
113 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 200 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
114 | ) | 201 | ) |
115 | 202 | ||
116 | async def unihash_exists(self, unihash): | 203 | async def unihash_exists(self, unihash): |
117 | await self._set_mode(self.MODE_EXIST_STREAM) | 204 | r = await self.unihash_exists_batch([unihash]) |
118 | r = await self.send_stream(unihash) | 205 | return r[0] |
119 | return r == "true" | 206 | |
207 | async def unihash_exists_batch(self, unihashes): | ||
208 | result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes) | ||
209 | return [r == "true" for r in result] | ||
120 | 210 | ||
121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 211 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
122 | await self._set_mode(self.MODE_NORMAL) | ||
123 | return await self.invoke( | 212 | return await self.invoke( |
124 | { | 213 | { |
125 | "get-outhash": { | 214 | "get-outhash": { |
@@ -132,27 +221,21 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
132 | ) | 221 | ) |
133 | 222 | ||
134 | async def get_stats(self): | 223 | async def get_stats(self): |
135 | await self._set_mode(self.MODE_NORMAL) | ||
136 | return await self.invoke({"get-stats": None}) | 224 | return await self.invoke({"get-stats": None}) |
137 | 225 | ||
138 | async def reset_stats(self): | 226 | async def reset_stats(self): |
139 | await self._set_mode(self.MODE_NORMAL) | ||
140 | return await self.invoke({"reset-stats": None}) | 227 | return await self.invoke({"reset-stats": None}) |
141 | 228 | ||
142 | async def backfill_wait(self): | 229 | async def backfill_wait(self): |
143 | await self._set_mode(self.MODE_NORMAL) | ||
144 | return (await self.invoke({"backfill-wait": None}))["tasks"] | 230 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
145 | 231 | ||
146 | async def remove(self, where): | 232 | async def remove(self, where): |
147 | await self._set_mode(self.MODE_NORMAL) | ||
148 | return await self.invoke({"remove": {"where": where}}) | 233 | return await self.invoke({"remove": {"where": where}}) |
149 | 234 | ||
150 | async def clean_unused(self, max_age): | 235 | async def clean_unused(self, max_age): |
151 | await self._set_mode(self.MODE_NORMAL) | ||
152 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) | 236 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) |
153 | 237 | ||
154 | async def auth(self, username, token): | 238 | async def auth(self, username, token): |
155 | await self._set_mode(self.MODE_NORMAL) | ||
156 | result = await self.invoke({"auth": {"username": username, "token": token}}) | 239 | result = await self.invoke({"auth": {"username": username, "token": token}}) |
157 | self.username = username | 240 | self.username = username |
158 | self.password = token | 241 | self.password = token |
@@ -160,7 +243,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
160 | return result | 243 | return result |
161 | 244 | ||
162 | async def refresh_token(self, username=None): | 245 | async def refresh_token(self, username=None): |
163 | await self._set_mode(self.MODE_NORMAL) | ||
164 | m = {} | 246 | m = {} |
165 | if username: | 247 | if username: |
166 | m["username"] = username | 248 | m["username"] = username |
@@ -174,34 +256,28 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
174 | return result | 256 | return result |
175 | 257 | ||
176 | async def set_user_perms(self, username, permissions): | 258 | async def set_user_perms(self, username, permissions): |
177 | await self._set_mode(self.MODE_NORMAL) | ||
178 | return await self.invoke( | 259 | return await self.invoke( |
179 | {"set-user-perms": {"username": username, "permissions": permissions}} | 260 | {"set-user-perms": {"username": username, "permissions": permissions}} |
180 | ) | 261 | ) |
181 | 262 | ||
182 | async def get_user(self, username=None): | 263 | async def get_user(self, username=None): |
183 | await self._set_mode(self.MODE_NORMAL) | ||
184 | m = {} | 264 | m = {} |
185 | if username: | 265 | if username: |
186 | m["username"] = username | 266 | m["username"] = username |
187 | return await self.invoke({"get-user": m}) | 267 | return await self.invoke({"get-user": m}) |
188 | 268 | ||
189 | async def get_all_users(self): | 269 | async def get_all_users(self): |
190 | await self._set_mode(self.MODE_NORMAL) | ||
191 | return (await self.invoke({"get-all-users": {}}))["users"] | 270 | return (await self.invoke({"get-all-users": {}}))["users"] |
192 | 271 | ||
193 | async def new_user(self, username, permissions): | 272 | async def new_user(self, username, permissions): |
194 | await self._set_mode(self.MODE_NORMAL) | ||
195 | return await self.invoke( | 273 | return await self.invoke( |
196 | {"new-user": {"username": username, "permissions": permissions}} | 274 | {"new-user": {"username": username, "permissions": permissions}} |
197 | ) | 275 | ) |
198 | 276 | ||
199 | async def delete_user(self, username): | 277 | async def delete_user(self, username): |
200 | await self._set_mode(self.MODE_NORMAL) | ||
201 | return await self.invoke({"delete-user": {"username": username}}) | 278 | return await self.invoke({"delete-user": {"username": username}}) |
202 | 279 | ||
203 | async def become_user(self, username): | 280 | async def become_user(self, username): |
204 | await self._set_mode(self.MODE_NORMAL) | ||
205 | result = await self.invoke({"become-user": {"username": username}}) | 281 | result = await self.invoke({"become-user": {"username": username}}) |
206 | if username == self.username: | 282 | if username == self.username: |
207 | self.saved_become_user = None | 283 | self.saved_become_user = None |
@@ -210,15 +286,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
210 | return result | 286 | return result |
211 | 287 | ||
212 | async def get_db_usage(self): | 288 | async def get_db_usage(self): |
213 | await self._set_mode(self.MODE_NORMAL) | ||
214 | return (await self.invoke({"get-db-usage": {}}))["usage"] | 289 | return (await self.invoke({"get-db-usage": {}}))["usage"] |
215 | 290 | ||
216 | async def get_db_query_columns(self): | 291 | async def get_db_query_columns(self): |
217 | await self._set_mode(self.MODE_NORMAL) | ||
218 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] | 292 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] |
219 | 293 | ||
220 | async def gc_status(self): | 294 | async def gc_status(self): |
221 | await self._set_mode(self.MODE_NORMAL) | ||
222 | return await self.invoke({"gc-status": {}}) | 295 | return await self.invoke({"gc-status": {}}) |
223 | 296 | ||
224 | async def gc_mark(self, mark, where): | 297 | async def gc_mark(self, mark, where): |
@@ -231,7 +304,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
231 | kept. In addition, any new entries added to the database after this | 304 | kept. In addition, any new entries added to the database after this |
232 | command will be automatically marked with "mark" | 305 | command will be automatically marked with "mark" |
233 | """ | 306 | """ |
234 | await self._set_mode(self.MODE_NORMAL) | ||
235 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | 307 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) |
236 | 308 | ||
237 | async def gc_sweep(self, mark): | 309 | async def gc_sweep(self, mark): |
@@ -242,7 +314,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
242 | It is recommended to clean unused outhash entries after running this to | 314 | It is recommended to clean unused outhash entries after running this to |
243 | cleanup any dangling outhashes | 315 | cleanup any dangling outhashes |
244 | """ | 316 | """ |
245 | await self._set_mode(self.MODE_NORMAL) | ||
246 | return await self.invoke({"gc-sweep": {"mark": mark}}) | 317 | return await self.invoke({"gc-sweep": {"mark": mark}}) |
247 | 318 | ||
248 | 319 | ||
@@ -256,10 +327,12 @@ class Client(bb.asyncrpc.Client): | |||
256 | "connect_tcp", | 327 | "connect_tcp", |
257 | "connect_websocket", | 328 | "connect_websocket", |
258 | "get_unihash", | 329 | "get_unihash", |
330 | "get_unihash_batch", | ||
259 | "report_unihash", | 331 | "report_unihash", |
260 | "report_unihash_equiv", | 332 | "report_unihash_equiv", |
261 | "get_taskhash", | 333 | "get_taskhash", |
262 | "unihash_exists", | 334 | "unihash_exists", |
335 | "unihash_exists_batch", | ||
263 | "get_outhash", | 336 | "get_outhash", |
264 | "get_stats", | 337 | "get_stats", |
265 | "reset_stats", | 338 | "reset_stats", |
@@ -283,83 +356,3 @@ class Client(bb.asyncrpc.Client): | |||
283 | 356 | ||
284 | def _get_async_client(self): | 357 | def _get_async_client(self): |
285 | return AsyncClient(self.username, self.password) | 358 | return AsyncClient(self.username, self.password) |
286 | |||
287 | |||
288 | class ClientPool(bb.asyncrpc.ClientPool): | ||
289 | def __init__( | ||
290 | self, | ||
291 | address, | ||
292 | max_clients, | ||
293 | *, | ||
294 | username=None, | ||
295 | password=None, | ||
296 | become=None, | ||
297 | ): | ||
298 | super().__init__(max_clients) | ||
299 | self.address = address | ||
300 | self.username = username | ||
301 | self.password = password | ||
302 | self.become = become | ||
303 | |||
304 | async def _new_client(self): | ||
305 | client = await create_async_client( | ||
306 | self.address, | ||
307 | username=self.username, | ||
308 | password=self.password, | ||
309 | ) | ||
310 | if self.become: | ||
311 | await client.become_user(self.become) | ||
312 | return client | ||
313 | |||
314 | def _run_key_tasks(self, queries, call): | ||
315 | results = {key: None for key in queries.keys()} | ||
316 | |||
317 | def make_task(key, args): | ||
318 | async def task(client): | ||
319 | nonlocal results | ||
320 | unihash = await call(client, args) | ||
321 | results[key] = unihash | ||
322 | |||
323 | return task | ||
324 | |||
325 | def gen_tasks(): | ||
326 | for key, args in queries.items(): | ||
327 | yield make_task(key, args) | ||
328 | |||
329 | self.run_tasks(gen_tasks()) | ||
330 | return results | ||
331 | |||
332 | def get_unihashes(self, queries): | ||
333 | """ | ||
334 | Query multiple unihashes in parallel. | ||
335 | |||
336 | The queries argument is a dictionary with arbitrary key. The values | ||
337 | must be a tuple of (method, taskhash). | ||
338 | |||
339 | Returns a dictionary with a corresponding key for each input key, and | ||
340 | the value is the queried unihash (which might be none if the query | ||
341 | failed) | ||
342 | """ | ||
343 | |||
344 | async def call(client, args): | ||
345 | method, taskhash = args | ||
346 | return await client.get_unihash(method, taskhash) | ||
347 | |||
348 | return self._run_key_tasks(queries, call) | ||
349 | |||
350 | def unihashes_exist(self, queries): | ||
351 | """ | ||
352 | Query multiple unihash existence checks in parallel. | ||
353 | |||
354 | The queries argument is a dictionary with arbitrary key. The values | ||
355 | must be a unihash. | ||
356 | |||
357 | Returns a dictionary with a corresponding key for each input key, and | ||
358 | the value is True or False if the unihash is known by the server (or | ||
359 | None if there was a failure) | ||
360 | """ | ||
361 | |||
362 | async def call(client, unihash): | ||
363 | return await client.unihash_exists(unihash) | ||
364 | |||
365 | return self._run_key_tasks(queries, call) | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 0809453cf8..13ccb20ebf 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -8,7 +8,6 @@ | |||
8 | from . import create_server, create_client | 8 | from . import create_server, create_client |
9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS | 9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS |
10 | from bb.asyncrpc import InvokeError | 10 | from bb.asyncrpc import InvokeError |
11 | from .client import ClientPool | ||
12 | import hashlib | 11 | import hashlib |
13 | import logging | 12 | import logging |
14 | import multiprocessing | 13 | import multiprocessing |
@@ -94,9 +93,6 @@ class HashEquivalenceTestSetup(object): | |||
94 | return self.start_client(self.auth_server_address, user["username"], user["token"]) | 93 | return self.start_client(self.auth_server_address, user["username"], user["token"]) |
95 | 94 | ||
96 | def setUp(self): | 95 | def setUp(self): |
97 | if sys.version_info < (3, 5, 0): | ||
98 | self.skipTest('Python 3.5 or later required') | ||
99 | |||
100 | self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') | 96 | self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') |
101 | self.addCleanup(self.temp_dir.cleanup) | 97 | self.addCleanup(self.temp_dir.cleanup) |
102 | 98 | ||
@@ -555,8 +551,7 @@ class HashEquivalenceCommonTests(object): | |||
555 | # shares a taskhash with Task 2 | 551 | # shares a taskhash with Task 2 |
556 | self.assertClientGetHash(self.client, taskhash2, unihash2) | 552 | self.assertClientGetHash(self.client, taskhash2, unihash2) |
557 | 553 | ||
558 | 554 | def test_get_unihash_batch(self): | |
559 | def test_client_pool_get_unihashes(self): | ||
560 | TEST_INPUT = ( | 555 | TEST_INPUT = ( |
561 | # taskhash outhash unihash | 556 | # taskhash outhash unihash |
562 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | 557 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), |
@@ -573,28 +568,27 @@ class HashEquivalenceCommonTests(object): | |||
573 | "6b6be7a84ab179b4240c4302518dc3f6", | 568 | "6b6be7a84ab179b4240c4302518dc3f6", |
574 | ) | 569 | ) |
575 | 570 | ||
576 | with ClientPool(self.server_address, 10) as client_pool: | 571 | for taskhash, outhash, unihash in TEST_INPUT: |
577 | for taskhash, outhash, unihash in TEST_INPUT: | 572 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) |
578 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
579 | |||
580 | query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} | ||
581 | for idx, taskhash in enumerate(EXTRA_QUERIES): | ||
582 | query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) | ||
583 | |||
584 | result = client_pool.get_unihashes(query) | ||
585 | |||
586 | self.assertDictEqual(result, { | ||
587 | 0: "218e57509998197d570e2c98512d0105985dffc9", | ||
588 | 1: "218e57509998197d570e2c98512d0105985dffc9", | ||
589 | 2: "218e57509998197d570e2c98512d0105985dffc9", | ||
590 | 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", | ||
591 | 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
592 | 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
593 | 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", | ||
594 | 7: None, | ||
595 | }) | ||
596 | 573 | ||
597 | def test_client_pool_unihash_exists(self): | 574 | |
575 | result = self.client.get_unihash_batch( | ||
576 | [(self.METHOD, data[0]) for data in TEST_INPUT] + | ||
577 | [(self.METHOD, e) for e in EXTRA_QUERIES] | ||
578 | ) | ||
579 | |||
580 | self.assertListEqual(result, [ | ||
581 | "218e57509998197d570e2c98512d0105985dffc9", | ||
582 | "218e57509998197d570e2c98512d0105985dffc9", | ||
583 | "218e57509998197d570e2c98512d0105985dffc9", | ||
584 | "3b5d3d83f07f259e9086fcb422c855286e18a57d", | ||
585 | "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
586 | "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
587 | "05d2a63c81e32f0a36542ca677e8ad852365c538", | ||
588 | None, | ||
589 | ]) | ||
590 | |||
591 | def test_unihash_exists_batch(self): | ||
598 | TEST_INPUT = ( | 592 | TEST_INPUT = ( |
599 | # taskhash outhash unihash | 593 | # taskhash outhash unihash |
600 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | 594 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), |
@@ -614,28 +608,24 @@ class HashEquivalenceCommonTests(object): | |||
614 | result_unihashes = set() | 608 | result_unihashes = set() |
615 | 609 | ||
616 | 610 | ||
617 | with ClientPool(self.server_address, 10) as client_pool: | 611 | for taskhash, outhash, unihash in TEST_INPUT: |
618 | for taskhash, outhash, unihash in TEST_INPUT: | 612 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) |
619 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | 613 | result_unihashes.add(result["unihash"]) |
620 | result_unihashes.add(result["unihash"]) | ||
621 | |||
622 | query = {} | ||
623 | expected = {} | ||
624 | 614 | ||
625 | for _, _, unihash in TEST_INPUT: | 615 | query = [] |
626 | idx = len(query) | 616 | expected = [] |
627 | query[idx] = unihash | ||
628 | expected[idx] = unihash in result_unihashes | ||
629 | 617 | ||
618 | for _, _, unihash in TEST_INPUT: | ||
619 | query.append(unihash) | ||
620 | expected.append(unihash in result_unihashes) | ||
630 | 621 | ||
631 | for unihash in EXTRA_QUERIES: | ||
632 | idx = len(query) | ||
633 | query[idx] = unihash | ||
634 | expected[idx] = False | ||
635 | 622 | ||
636 | result = client_pool.unihashes_exist(query) | 623 | for unihash in EXTRA_QUERIES: |
637 | self.assertDictEqual(result, expected) | 624 | query.append(unihash) |
625 | expected.append(False) | ||
638 | 626 | ||
627 | result = self.client.unihash_exists_batch(query) | ||
628 | self.assertListEqual(result, expected) | ||
639 | 629 | ||
640 | def test_auth_read_perms(self): | 630 | def test_auth_read_perms(self): |
641 | admin_client = self.start_auth_server() | 631 | admin_client = self.start_auth_server() |