summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv')
-rw-r--r--bitbake/lib/hashserv/__init__.py31
-rw-r--r--bitbake/lib/hashserv/client.py227
-rw-r--r--bitbake/lib/hashserv/tests.py78
3 files changed, 148 insertions, 188 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 552a33278f..ac891e0174 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -5,39 +5,15 @@
5 5
6import asyncio 6import asyncio
7from contextlib import closing 7from contextlib import closing
8import re
9import itertools 8import itertools
10import json 9import json
11from collections import namedtuple 10from collections import namedtuple
12from urllib.parse import urlparse 11from urllib.parse import urlparse
13 12from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
14UNIX_PREFIX = "unix://"
15WS_PREFIX = "ws://"
16WSS_PREFIX = "wss://"
17
18ADDR_TYPE_UNIX = 0
19ADDR_TYPE_TCP = 1
20ADDR_TYPE_WS = 2
21 13
22User = namedtuple("User", ("username", "permissions")) 14User = namedtuple("User", ("username", "permissions"))
23 15
24 16
25def parse_address(addr):
26 if addr.startswith(UNIX_PREFIX):
27 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],))
28 elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
29 return (ADDR_TYPE_WS, (addr,))
30 else:
31 m = re.match(r"\[(?P<host>[^\]]*)\]:(?P<port>\d+)$", addr)
32 if m is not None:
33 host = m.group("host")
34 port = m.group("port")
35 else:
36 host, port = addr.split(":")
37
38 return (ADDR_TYPE_TCP, (host, int(port)))
39
40
41def create_server( 17def create_server(
42 addr, 18 addr,
43 dbname, 19 dbname,
@@ -50,6 +26,7 @@ def create_server(
50 anon_perms=None, 26 anon_perms=None,
51 admin_username=None, 27 admin_username=None,
52 admin_password=None, 28 admin_password=None,
29 reuseport=False,
53): 30):
54 def sqlite_engine(): 31 def sqlite_engine():
55 from .sqlite import DatabaseEngine 32 from .sqlite import DatabaseEngine
@@ -85,9 +62,9 @@ def create_server(
85 s.start_unix_server(*a) 62 s.start_unix_server(*a)
86 elif typ == ADDR_TYPE_WS: 63 elif typ == ADDR_TYPE_WS:
87 url = urlparse(a[0]) 64 url = urlparse(a[0])
88 s.start_websocket_server(url.hostname, url.port) 65 s.start_websocket_server(url.hostname, url.port, reuseport=reuseport)
89 else: 66 else:
90 s.start_tcp_server(*a) 67 s.start_tcp_server(*a, reuseport=reuseport)
91 68
92 return s 69 return s
93 70
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index b269879ecf..a510f3284f 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -5,6 +5,7 @@
5 5
6import logging 6import logging
7import socket 7import socket
8import asyncio
8import bb.asyncrpc 9import bb.asyncrpc
9import json 10import json
10from . import create_async_client 11from . import create_async_client
@@ -13,6 +14,66 @@ from . import create_async_client
13logger = logging.getLogger("hashserv.client") 14logger = logging.getLogger("hashserv.client")
14 15
15 16
17class Batch(object):
18 def __init__(self):
19 self.done = False
20 self.cond = asyncio.Condition()
21 self.pending = []
22 self.results = []
23 self.sent_count = 0
24
25 async def recv(self, socket):
26 while True:
27 async with self.cond:
28 await self.cond.wait_for(lambda: self.pending or self.done)
29
30 if not self.pending:
31 if self.done:
32 return
33 continue
34
35 r = await socket.recv()
36 self.results.append(r)
37
38 async with self.cond:
39 self.pending.pop(0)
40
41 async def send(self, socket, msgs):
42 try:
43 # In the event of a restart due to a reconnect, all in-flight
44 # messages need to be resent first to keep to result count in sync
45 for m in self.pending:
46 await socket.send(m)
47
48 for m in msgs:
49 # Add the message to the pending list before attempting to send
50 # it so that if the send fails it will be retried
51 async with self.cond:
52 self.pending.append(m)
53 self.cond.notify()
54 self.sent_count += 1
55
56 await socket.send(m)
57
58 finally:
59 async with self.cond:
60 self.done = True
61 self.cond.notify()
62
63 async def process(self, socket, msgs):
64 await asyncio.gather(
65 self.recv(socket),
66 self.send(socket, msgs),
67 )
68
69 if len(self.results) != self.sent_count:
70 raise ValueError(
71 f"Expected result count {len(self.results)}. Expected {self.sent_count}"
72 )
73
74 return self.results
75
76
16class AsyncClient(bb.asyncrpc.AsyncClient): 77class AsyncClient(bb.asyncrpc.AsyncClient):
17 MODE_NORMAL = 0 78 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1 79 MODE_GET_STREAM = 1
@@ -27,9 +88,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
27 88
28 async def setup_connection(self): 89 async def setup_connection(self):
29 await super().setup_connection() 90 await super().setup_connection()
30 cur_mode = self.mode
31 self.mode = self.MODE_NORMAL 91 self.mode = self.MODE_NORMAL
32 await self._set_mode(cur_mode)
33 if self.username: 92 if self.username:
34 # Save off become user temporarily because auth() resets it 93 # Save off become user temporarily because auth() resets it
35 become = self.saved_become_user 94 become = self.saved_become_user
@@ -38,25 +97,52 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
38 if become: 97 if become:
39 await self.become_user(become) 98 await self.become_user(become)
40 99
41 async def send_stream(self, msg): 100 async def send_stream_batch(self, mode, msgs):
101 """
102 Does a "batch" process of stream messages. This sends the query
103 messages as fast as possible, and simultaneously attempts to read the
104 messages back. This helps to mitigate the effects of latency to the
105 hash equivalence server be allowing multiple queries to be "in-flight"
106 at once
107
108 The implementation does more complicated tracking using a count of sent
109 messages so that `msgs` can be a generator function (i.e. its length is
110 unknown)
111
112 """
113
114 b = Batch()
115
42 async def proc(): 116 async def proc():
43 await self.socket.send(msg) 117 nonlocal b
44 return await self.socket.recv() 118
119 await self._set_mode(mode)
120 return await b.process(self.socket, msgs)
45 121
46 return await self._send_wrapper(proc) 122 return await self._send_wrapper(proc)
47 123
124 async def invoke(self, *args, skip_mode=False, **kwargs):
125 # It's OK if connection errors cause a failure here, because the mode
126 # is also reset to normal on a new connection
127 if not skip_mode:
128 await self._set_mode(self.MODE_NORMAL)
129 return await super().invoke(*args, **kwargs)
130
48 async def _set_mode(self, new_mode): 131 async def _set_mode(self, new_mode):
49 async def stream_to_normal(): 132 async def stream_to_normal():
133 # Check if already in normal mode (e.g. due to a connection reset)
134 if self.mode == self.MODE_NORMAL:
135 return "ok"
50 await self.socket.send("END") 136 await self.socket.send("END")
51 return await self.socket.recv() 137 return await self.socket.recv()
52 138
53 async def normal_to_stream(command): 139 async def normal_to_stream(command):
54 r = await self.invoke({command: None}) 140 r = await self.invoke({command: None}, skip_mode=True)
55 if r != "ok": 141 if r != "ok":
142 self.check_invoke_error(r)
56 raise ConnectionError( 143 raise ConnectionError(
57 f"Unable to transition to stream mode: Bad response from server {r!r}" 144 f"Unable to transition to stream mode: Bad response from server {r!r}"
58 ) 145 )
59
60 self.logger.debug("Mode is now %s", command) 146 self.logger.debug("Mode is now %s", command)
61 147
62 if new_mode == self.mode: 148 if new_mode == self.mode:
@@ -84,14 +170,17 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
84 self.mode = new_mode 170 self.mode = new_mode
85 171
86 async def get_unihash(self, method, taskhash): 172 async def get_unihash(self, method, taskhash):
87 await self._set_mode(self.MODE_GET_STREAM) 173 r = await self.get_unihash_batch([(method, taskhash)])
88 r = await self.send_stream("%s %s" % (method, taskhash)) 174 return r[0]
89 if not r: 175
90 return None 176 async def get_unihash_batch(self, args):
91 return r 177 result = await self.send_stream_batch(
178 self.MODE_GET_STREAM,
179 (f"{method} {taskhash}" for method, taskhash in args),
180 )
181 return [r if r else None for r in result]
92 182
93 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 183 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
94 await self._set_mode(self.MODE_NORMAL)
95 m = extra.copy() 184 m = extra.copy()
96 m["taskhash"] = taskhash 185 m["taskhash"] = taskhash
97 m["method"] = method 186 m["method"] = method
@@ -100,7 +189,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
100 return await self.invoke({"report": m}) 189 return await self.invoke({"report": m})
101 190
102 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 191 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
103 await self._set_mode(self.MODE_NORMAL)
104 m = extra.copy() 192 m = extra.copy()
105 m["taskhash"] = taskhash 193 m["taskhash"] = taskhash
106 m["method"] = method 194 m["method"] = method
@@ -108,18 +196,19 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
108 return await self.invoke({"report-equiv": m}) 196 return await self.invoke({"report-equiv": m})
109 197
110 async def get_taskhash(self, method, taskhash, all_properties=False): 198 async def get_taskhash(self, method, taskhash, all_properties=False):
111 await self._set_mode(self.MODE_NORMAL)
112 return await self.invoke( 199 return await self.invoke(
113 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 200 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
114 ) 201 )
115 202
116 async def unihash_exists(self, unihash): 203 async def unihash_exists(self, unihash):
117 await self._set_mode(self.MODE_EXIST_STREAM) 204 r = await self.unihash_exists_batch([unihash])
118 r = await self.send_stream(unihash) 205 return r[0]
119 return r == "true" 206
207 async def unihash_exists_batch(self, unihashes):
208 result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes)
209 return [r == "true" for r in result]
120 210
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 211 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 await self._set_mode(self.MODE_NORMAL)
123 return await self.invoke( 212 return await self.invoke(
124 { 213 {
125 "get-outhash": { 214 "get-outhash": {
@@ -132,27 +221,21 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
132 ) 221 )
133 222
134 async def get_stats(self): 223 async def get_stats(self):
135 await self._set_mode(self.MODE_NORMAL)
136 return await self.invoke({"get-stats": None}) 224 return await self.invoke({"get-stats": None})
137 225
138 async def reset_stats(self): 226 async def reset_stats(self):
139 await self._set_mode(self.MODE_NORMAL)
140 return await self.invoke({"reset-stats": None}) 227 return await self.invoke({"reset-stats": None})
141 228
142 async def backfill_wait(self): 229 async def backfill_wait(self):
143 await self._set_mode(self.MODE_NORMAL)
144 return (await self.invoke({"backfill-wait": None}))["tasks"] 230 return (await self.invoke({"backfill-wait": None}))["tasks"]
145 231
146 async def remove(self, where): 232 async def remove(self, where):
147 await self._set_mode(self.MODE_NORMAL)
148 return await self.invoke({"remove": {"where": where}}) 233 return await self.invoke({"remove": {"where": where}})
149 234
150 async def clean_unused(self, max_age): 235 async def clean_unused(self, max_age):
151 await self._set_mode(self.MODE_NORMAL)
152 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) 236 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
153 237
154 async def auth(self, username, token): 238 async def auth(self, username, token):
155 await self._set_mode(self.MODE_NORMAL)
156 result = await self.invoke({"auth": {"username": username, "token": token}}) 239 result = await self.invoke({"auth": {"username": username, "token": token}})
157 self.username = username 240 self.username = username
158 self.password = token 241 self.password = token
@@ -160,7 +243,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
160 return result 243 return result
161 244
162 async def refresh_token(self, username=None): 245 async def refresh_token(self, username=None):
163 await self._set_mode(self.MODE_NORMAL)
164 m = {} 246 m = {}
165 if username: 247 if username:
166 m["username"] = username 248 m["username"] = username
@@ -174,34 +256,28 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
174 return result 256 return result
175 257
176 async def set_user_perms(self, username, permissions): 258 async def set_user_perms(self, username, permissions):
177 await self._set_mode(self.MODE_NORMAL)
178 return await self.invoke( 259 return await self.invoke(
179 {"set-user-perms": {"username": username, "permissions": permissions}} 260 {"set-user-perms": {"username": username, "permissions": permissions}}
180 ) 261 )
181 262
182 async def get_user(self, username=None): 263 async def get_user(self, username=None):
183 await self._set_mode(self.MODE_NORMAL)
184 m = {} 264 m = {}
185 if username: 265 if username:
186 m["username"] = username 266 m["username"] = username
187 return await self.invoke({"get-user": m}) 267 return await self.invoke({"get-user": m})
188 268
189 async def get_all_users(self): 269 async def get_all_users(self):
190 await self._set_mode(self.MODE_NORMAL)
191 return (await self.invoke({"get-all-users": {}}))["users"] 270 return (await self.invoke({"get-all-users": {}}))["users"]
192 271
193 async def new_user(self, username, permissions): 272 async def new_user(self, username, permissions):
194 await self._set_mode(self.MODE_NORMAL)
195 return await self.invoke( 273 return await self.invoke(
196 {"new-user": {"username": username, "permissions": permissions}} 274 {"new-user": {"username": username, "permissions": permissions}}
197 ) 275 )
198 276
199 async def delete_user(self, username): 277 async def delete_user(self, username):
200 await self._set_mode(self.MODE_NORMAL)
201 return await self.invoke({"delete-user": {"username": username}}) 278 return await self.invoke({"delete-user": {"username": username}})
202 279
203 async def become_user(self, username): 280 async def become_user(self, username):
204 await self._set_mode(self.MODE_NORMAL)
205 result = await self.invoke({"become-user": {"username": username}}) 281 result = await self.invoke({"become-user": {"username": username}})
206 if username == self.username: 282 if username == self.username:
207 self.saved_become_user = None 283 self.saved_become_user = None
@@ -210,15 +286,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
210 return result 286 return result
211 287
212 async def get_db_usage(self): 288 async def get_db_usage(self):
213 await self._set_mode(self.MODE_NORMAL)
214 return (await self.invoke({"get-db-usage": {}}))["usage"] 289 return (await self.invoke({"get-db-usage": {}}))["usage"]
215 290
216 async def get_db_query_columns(self): 291 async def get_db_query_columns(self):
217 await self._set_mode(self.MODE_NORMAL)
218 return (await self.invoke({"get-db-query-columns": {}}))["columns"] 292 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
219 293
220 async def gc_status(self): 294 async def gc_status(self):
221 await self._set_mode(self.MODE_NORMAL)
222 return await self.invoke({"gc-status": {}}) 295 return await self.invoke({"gc-status": {}})
223 296
224 async def gc_mark(self, mark, where): 297 async def gc_mark(self, mark, where):
@@ -231,7 +304,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
231 kept. In addition, any new entries added to the database after this 304 kept. In addition, any new entries added to the database after this
232 command will be automatically marked with "mark" 305 command will be automatically marked with "mark"
233 """ 306 """
234 await self._set_mode(self.MODE_NORMAL)
235 return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) 307 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
236 308
237 async def gc_sweep(self, mark): 309 async def gc_sweep(self, mark):
@@ -242,7 +314,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
242 It is recommended to clean unused outhash entries after running this to 314 It is recommended to clean unused outhash entries after running this to
243 cleanup any dangling outhashes 315 cleanup any dangling outhashes
244 """ 316 """
245 await self._set_mode(self.MODE_NORMAL)
246 return await self.invoke({"gc-sweep": {"mark": mark}}) 317 return await self.invoke({"gc-sweep": {"mark": mark}})
247 318
248 319
@@ -256,10 +327,12 @@ class Client(bb.asyncrpc.Client):
256 "connect_tcp", 327 "connect_tcp",
257 "connect_websocket", 328 "connect_websocket",
258 "get_unihash", 329 "get_unihash",
330 "get_unihash_batch",
259 "report_unihash", 331 "report_unihash",
260 "report_unihash_equiv", 332 "report_unihash_equiv",
261 "get_taskhash", 333 "get_taskhash",
262 "unihash_exists", 334 "unihash_exists",
335 "unihash_exists_batch",
263 "get_outhash", 336 "get_outhash",
264 "get_stats", 337 "get_stats",
265 "reset_stats", 338 "reset_stats",
@@ -283,83 +356,3 @@ class Client(bb.asyncrpc.Client):
283 356
284 def _get_async_client(self): 357 def _get_async_client(self):
285 return AsyncClient(self.username, self.password) 358 return AsyncClient(self.username, self.password)
286
287
288class ClientPool(bb.asyncrpc.ClientPool):
289 def __init__(
290 self,
291 address,
292 max_clients,
293 *,
294 username=None,
295 password=None,
296 become=None,
297 ):
298 super().__init__(max_clients)
299 self.address = address
300 self.username = username
301 self.password = password
302 self.become = become
303
304 async def _new_client(self):
305 client = await create_async_client(
306 self.address,
307 username=self.username,
308 password=self.password,
309 )
310 if self.become:
311 await client.become_user(self.become)
312 return client
313
314 def _run_key_tasks(self, queries, call):
315 results = {key: None for key in queries.keys()}
316
317 def make_task(key, args):
318 async def task(client):
319 nonlocal results
320 unihash = await call(client, args)
321 results[key] = unihash
322
323 return task
324
325 def gen_tasks():
326 for key, args in queries.items():
327 yield make_task(key, args)
328
329 self.run_tasks(gen_tasks())
330 return results
331
332 def get_unihashes(self, queries):
333 """
334 Query multiple unihashes in parallel.
335
336 The queries argument is a dictionary with arbitrary key. The values
337 must be a tuple of (method, taskhash).
338
339 Returns a dictionary with a corresponding key for each input key, and
340 the value is the queried unihash (which might be none if the query
341 failed)
342 """
343
344 async def call(client, args):
345 method, taskhash = args
346 return await client.get_unihash(method, taskhash)
347
348 return self._run_key_tasks(queries, call)
349
350 def unihashes_exist(self, queries):
351 """
352 Query multiple unihash existence checks in parallel.
353
354 The queries argument is a dictionary with arbitrary key. The values
355 must be a unihash.
356
357 Returns a dictionary with a corresponding key for each input key, and
358 the value is True or False if the unihash is known by the server (or
359 None if there was a failure)
360 """
361
362 async def call(client, unihash):
363 return await client.unihash_exists(unihash)
364
365 return self._run_key_tasks(queries, call)
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 0809453cf8..13ccb20ebf 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -8,7 +8,6 @@
8from . import create_server, create_client 8from . import create_server, create_client
9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS 9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS
10from bb.asyncrpc import InvokeError 10from bb.asyncrpc import InvokeError
11from .client import ClientPool
12import hashlib 11import hashlib
13import logging 12import logging
14import multiprocessing 13import multiprocessing
@@ -94,9 +93,6 @@ class HashEquivalenceTestSetup(object):
94 return self.start_client(self.auth_server_address, user["username"], user["token"]) 93 return self.start_client(self.auth_server_address, user["username"], user["token"])
95 94
96 def setUp(self): 95 def setUp(self):
97 if sys.version_info < (3, 5, 0):
98 self.skipTest('Python 3.5 or later required')
99
100 self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') 96 self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv')
101 self.addCleanup(self.temp_dir.cleanup) 97 self.addCleanup(self.temp_dir.cleanup)
102 98
@@ -555,8 +551,7 @@ class HashEquivalenceCommonTests(object):
555 # shares a taskhash with Task 2 551 # shares a taskhash with Task 2
556 self.assertClientGetHash(self.client, taskhash2, unihash2) 552 self.assertClientGetHash(self.client, taskhash2, unihash2)
557 553
558 554 def test_get_unihash_batch(self):
559 def test_client_pool_get_unihashes(self):
560 TEST_INPUT = ( 555 TEST_INPUT = (
561 # taskhash outhash unihash 556 # taskhash outhash unihash
562 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), 557 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
@@ -573,28 +568,27 @@ class HashEquivalenceCommonTests(object):
573 "6b6be7a84ab179b4240c4302518dc3f6", 568 "6b6be7a84ab179b4240c4302518dc3f6",
574 ) 569 )
575 570
576 with ClientPool(self.server_address, 10) as client_pool: 571 for taskhash, outhash, unihash in TEST_INPUT:
577 for taskhash, outhash, unihash in TEST_INPUT: 572 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
578 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
579
580 query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)}
581 for idx, taskhash in enumerate(EXTRA_QUERIES):
582 query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash)
583
584 result = client_pool.get_unihashes(query)
585
586 self.assertDictEqual(result, {
587 0: "218e57509998197d570e2c98512d0105985dffc9",
588 1: "218e57509998197d570e2c98512d0105985dffc9",
589 2: "218e57509998197d570e2c98512d0105985dffc9",
590 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d",
591 4: "f46d3fbb439bd9b921095da657a4de906510d2cd",
592 5: "f46d3fbb439bd9b921095da657a4de906510d2cd",
593 6: "05d2a63c81e32f0a36542ca677e8ad852365c538",
594 7: None,
595 })
596 573
597 def test_client_pool_unihash_exists(self): 574
575 result = self.client.get_unihash_batch(
576 [(self.METHOD, data[0]) for data in TEST_INPUT] +
577 [(self.METHOD, e) for e in EXTRA_QUERIES]
578 )
579
580 self.assertListEqual(result, [
581 "218e57509998197d570e2c98512d0105985dffc9",
582 "218e57509998197d570e2c98512d0105985dffc9",
583 "218e57509998197d570e2c98512d0105985dffc9",
584 "3b5d3d83f07f259e9086fcb422c855286e18a57d",
585 "f46d3fbb439bd9b921095da657a4de906510d2cd",
586 "f46d3fbb439bd9b921095da657a4de906510d2cd",
587 "05d2a63c81e32f0a36542ca677e8ad852365c538",
588 None,
589 ])
590
591 def test_unihash_exists_batch(self):
598 TEST_INPUT = ( 592 TEST_INPUT = (
599 # taskhash outhash unihash 593 # taskhash outhash unihash
600 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), 594 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
@@ -614,28 +608,24 @@ class HashEquivalenceCommonTests(object):
614 result_unihashes = set() 608 result_unihashes = set()
615 609
616 610
617 with ClientPool(self.server_address, 10) as client_pool: 611 for taskhash, outhash, unihash in TEST_INPUT:
618 for taskhash, outhash, unihash in TEST_INPUT: 612 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
619 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) 613 result_unihashes.add(result["unihash"])
620 result_unihashes.add(result["unihash"])
621
622 query = {}
623 expected = {}
624 614
625 for _, _, unihash in TEST_INPUT: 615 query = []
626 idx = len(query) 616 expected = []
627 query[idx] = unihash
628 expected[idx] = unihash in result_unihashes
629 617
618 for _, _, unihash in TEST_INPUT:
619 query.append(unihash)
620 expected.append(unihash in result_unihashes)
630 621
631 for unihash in EXTRA_QUERIES:
632 idx = len(query)
633 query[idx] = unihash
634 expected[idx] = False
635 622
636 result = client_pool.unihashes_exist(query) 623 for unihash in EXTRA_QUERIES:
637 self.assertDictEqual(result, expected) 624 query.append(unihash)
625 expected.append(False)
638 626
627 result = self.client.unihash_exists_batch(query)
628 self.assertListEqual(result, expected)
639 629
640 def test_auth_read_perms(self): 630 def test_auth_read_perms(self):
641 admin_client = self.start_auth_server() 631 admin_client = self.start_auth_server()