diff options
Diffstat (limited to 'bitbake/lib/hashserv')
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 6 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 220 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 29 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlite.py | 17 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 120 |
5 files changed, 253 insertions, 139 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 74367eb6b4..ac891e0174 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -13,6 +13,7 @@ from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS | |||
13 | 13 | ||
14 | User = namedtuple("User", ("username", "permissions")) | 14 | User = namedtuple("User", ("username", "permissions")) |
15 | 15 | ||
16 | |||
16 | def create_server( | 17 | def create_server( |
17 | addr, | 18 | addr, |
18 | dbname, | 19 | dbname, |
@@ -25,6 +26,7 @@ def create_server( | |||
25 | anon_perms=None, | 26 | anon_perms=None, |
26 | admin_username=None, | 27 | admin_username=None, |
27 | admin_password=None, | 28 | admin_password=None, |
29 | reuseport=False, | ||
28 | ): | 30 | ): |
29 | def sqlite_engine(): | 31 | def sqlite_engine(): |
30 | from .sqlite import DatabaseEngine | 32 | from .sqlite import DatabaseEngine |
@@ -60,9 +62,9 @@ def create_server( | |||
60 | s.start_unix_server(*a) | 62 | s.start_unix_server(*a) |
61 | elif typ == ADDR_TYPE_WS: | 63 | elif typ == ADDR_TYPE_WS: |
62 | url = urlparse(a[0]) | 64 | url = urlparse(a[0]) |
63 | s.start_websocket_server(url.hostname, url.port) | 65 | s.start_websocket_server(url.hostname, url.port, reuseport=reuseport) |
64 | else: | 66 | else: |
65 | s.start_tcp_server(*a) | 67 | s.start_tcp_server(*a, reuseport=reuseport) |
66 | 68 | ||
67 | return s | 69 | return s |
68 | 70 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 0b254beddd..8cb18050a6 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -5,6 +5,7 @@ | |||
5 | 5 | ||
6 | import logging | 6 | import logging |
7 | import socket | 7 | import socket |
8 | import asyncio | ||
8 | import bb.asyncrpc | 9 | import bb.asyncrpc |
9 | import json | 10 | import json |
10 | from . import create_async_client | 11 | from . import create_async_client |
@@ -13,10 +14,71 @@ from . import create_async_client | |||
13 | logger = logging.getLogger("hashserv.client") | 14 | logger = logging.getLogger("hashserv.client") |
14 | 15 | ||
15 | 16 | ||
17 | class Batch(object): | ||
18 | def __init__(self): | ||
19 | self.done = False | ||
20 | self.cond = asyncio.Condition() | ||
21 | self.pending = [] | ||
22 | self.results = [] | ||
23 | self.sent_count = 0 | ||
24 | |||
25 | async def recv(self, socket): | ||
26 | while True: | ||
27 | async with self.cond: | ||
28 | await self.cond.wait_for(lambda: self.pending or self.done) | ||
29 | |||
30 | if not self.pending: | ||
31 | if self.done: | ||
32 | return | ||
33 | continue | ||
34 | |||
35 | r = await socket.recv() | ||
36 | self.results.append(r) | ||
37 | |||
38 | async with self.cond: | ||
39 | self.pending.pop(0) | ||
40 | |||
41 | async def send(self, socket, msgs): | ||
42 | try: | ||
43 | # In the event of a restart due to a reconnect, all in-flight | ||
44 | # messages need to be resent first to keep to result count in sync | ||
45 | for m in self.pending: | ||
46 | await socket.send(m) | ||
47 | |||
48 | for m in msgs: | ||
49 | # Add the message to the pending list before attempting to send | ||
50 | # it so that if the send fails it will be retried | ||
51 | async with self.cond: | ||
52 | self.pending.append(m) | ||
53 | self.cond.notify() | ||
54 | self.sent_count += 1 | ||
55 | |||
56 | await socket.send(m) | ||
57 | |||
58 | finally: | ||
59 | async with self.cond: | ||
60 | self.done = True | ||
61 | self.cond.notify() | ||
62 | |||
63 | async def process(self, socket, msgs): | ||
64 | await asyncio.gather( | ||
65 | self.recv(socket), | ||
66 | self.send(socket, msgs), | ||
67 | ) | ||
68 | |||
69 | if len(self.results) != self.sent_count: | ||
70 | raise ValueError( | ||
71 | f"Expected result count {len(self.results)}. Expected {self.sent_count}" | ||
72 | ) | ||
73 | |||
74 | return self.results | ||
75 | |||
76 | |||
16 | class AsyncClient(bb.asyncrpc.AsyncClient): | 77 | class AsyncClient(bb.asyncrpc.AsyncClient): |
17 | MODE_NORMAL = 0 | 78 | MODE_NORMAL = 0 |
18 | MODE_GET_STREAM = 1 | 79 | MODE_GET_STREAM = 1 |
19 | MODE_EXIST_STREAM = 2 | 80 | MODE_EXIST_STREAM = 2 |
81 | MODE_MARK_STREAM = 3 | ||
20 | 82 | ||
21 | def __init__(self, username=None, password=None): | 83 | def __init__(self, username=None, password=None): |
22 | super().__init__("OEHASHEQUIV", "1.1", logger) | 84 | super().__init__("OEHASHEQUIV", "1.1", logger) |
@@ -36,32 +98,52 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
36 | if become: | 98 | if become: |
37 | await self.become_user(become) | 99 | await self.become_user(become) |
38 | 100 | ||
39 | async def send_stream(self, mode, msg): | 101 | async def send_stream_batch(self, mode, msgs): |
102 | """ | ||
103 | Does a "batch" process of stream messages. This sends the query | ||
104 | messages as fast as possible, and simultaneously attempts to read the | ||
105 | messages back. This helps to mitigate the effects of latency to the | ||
106 | hash equivalence server be allowing multiple queries to be "in-flight" | ||
107 | at once | ||
108 | |||
109 | The implementation does more complicated tracking using a count of sent | ||
110 | messages so that `msgs` can be a generator function (i.e. its length is | ||
111 | unknown) | ||
112 | |||
113 | """ | ||
114 | |||
115 | b = Batch() | ||
116 | |||
40 | async def proc(): | 117 | async def proc(): |
118 | nonlocal b | ||
119 | |||
41 | await self._set_mode(mode) | 120 | await self._set_mode(mode) |
42 | await self.socket.send(msg) | 121 | return await b.process(self.socket, msgs) |
43 | return await self.socket.recv() | ||
44 | 122 | ||
45 | return await self._send_wrapper(proc) | 123 | return await self._send_wrapper(proc) |
46 | 124 | ||
47 | async def invoke(self, *args, **kwargs): | 125 | async def invoke(self, *args, skip_mode=False, **kwargs): |
48 | # It's OK if connection errors cause a failure here, because the mode | 126 | # It's OK if connection errors cause a failure here, because the mode |
49 | # is also reset to normal on a new connection | 127 | # is also reset to normal on a new connection |
50 | await self._set_mode(self.MODE_NORMAL) | 128 | if not skip_mode: |
129 | await self._set_mode(self.MODE_NORMAL) | ||
51 | return await super().invoke(*args, **kwargs) | 130 | return await super().invoke(*args, **kwargs) |
52 | 131 | ||
53 | async def _set_mode(self, new_mode): | 132 | async def _set_mode(self, new_mode): |
54 | async def stream_to_normal(): | 133 | async def stream_to_normal(): |
134 | # Check if already in normal mode (e.g. due to a connection reset) | ||
135 | if self.mode == self.MODE_NORMAL: | ||
136 | return "ok" | ||
55 | await self.socket.send("END") | 137 | await self.socket.send("END") |
56 | return await self.socket.recv() | 138 | return await self.socket.recv() |
57 | 139 | ||
58 | async def normal_to_stream(command): | 140 | async def normal_to_stream(command): |
59 | r = await self.invoke({command: None}) | 141 | r = await self.invoke({command: None}, skip_mode=True) |
60 | if r != "ok": | 142 | if r != "ok": |
143 | self.check_invoke_error(r) | ||
61 | raise ConnectionError( | 144 | raise ConnectionError( |
62 | f"Unable to transition to stream mode: Bad response from server {r!r}" | 145 | f"Unable to transition to stream mode: Bad response from server {r!r}" |
63 | ) | 146 | ) |
64 | |||
65 | self.logger.debug("Mode is now %s", command) | 147 | self.logger.debug("Mode is now %s", command) |
66 | 148 | ||
67 | if new_mode == self.mode: | 149 | if new_mode == self.mode: |
@@ -83,16 +165,23 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
83 | await normal_to_stream("get-stream") | 165 | await normal_to_stream("get-stream") |
84 | elif new_mode == self.MODE_EXIST_STREAM: | 166 | elif new_mode == self.MODE_EXIST_STREAM: |
85 | await normal_to_stream("exists-stream") | 167 | await normal_to_stream("exists-stream") |
168 | elif new_mode == self.MODE_MARK_STREAM: | ||
169 | await normal_to_stream("gc-mark-stream") | ||
86 | elif new_mode != self.MODE_NORMAL: | 170 | elif new_mode != self.MODE_NORMAL: |
87 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") | 171 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") |
88 | 172 | ||
89 | self.mode = new_mode | 173 | self.mode = new_mode |
90 | 174 | ||
91 | async def get_unihash(self, method, taskhash): | 175 | async def get_unihash(self, method, taskhash): |
92 | r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) | 176 | r = await self.get_unihash_batch([(method, taskhash)]) |
93 | if not r: | 177 | return r[0] |
94 | return None | 178 | |
95 | return r | 179 | async def get_unihash_batch(self, args): |
180 | result = await self.send_stream_batch( | ||
181 | self.MODE_GET_STREAM, | ||
182 | (f"{method} {taskhash}" for method, taskhash in args), | ||
183 | ) | ||
184 | return [r if r else None for r in result] | ||
96 | 185 | ||
97 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): | 186 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): |
98 | m = extra.copy() | 187 | m = extra.copy() |
@@ -115,8 +204,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
115 | ) | 204 | ) |
116 | 205 | ||
117 | async def unihash_exists(self, unihash): | 206 | async def unihash_exists(self, unihash): |
118 | r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) | 207 | r = await self.unihash_exists_batch([unihash]) |
119 | return r == "true" | 208 | return r[0] |
209 | |||
210 | async def unihash_exists_batch(self, unihashes): | ||
211 | result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes) | ||
212 | return [r == "true" for r in result] | ||
120 | 213 | ||
121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 214 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
122 | return await self.invoke( | 215 | return await self.invoke( |
@@ -216,6 +309,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
216 | """ | 309 | """ |
217 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | 310 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) |
218 | 311 | ||
312 | async def gc_mark_stream(self, mark, rows): | ||
313 | """ | ||
314 | Similar to `gc-mark`, but accepts a list of "where" key-value pair | ||
315 | conditions. It utilizes stream mode to mark hashes, which helps reduce | ||
316 | the impact of latency when communicating with the hash equivalence | ||
317 | server. | ||
318 | """ | ||
319 | def row_to_dict(row): | ||
320 | pairs = row.split() | ||
321 | return dict(zip(pairs[::2], pairs[1::2])) | ||
322 | |||
323 | responses = await self.send_stream_batch( | ||
324 | self.MODE_MARK_STREAM, | ||
325 | (json.dumps({"mark": mark, "where": row_to_dict(row)}) for row in rows), | ||
326 | ) | ||
327 | |||
328 | return {"count": sum(int(json.loads(r)["count"]) for r in responses)} | ||
329 | |||
219 | async def gc_sweep(self, mark): | 330 | async def gc_sweep(self, mark): |
220 | """ | 331 | """ |
221 | Finishes garbage collection for "mark". All unihash entries that have | 332 | Finishes garbage collection for "mark". All unihash entries that have |
@@ -237,10 +348,12 @@ class Client(bb.asyncrpc.Client): | |||
237 | "connect_tcp", | 348 | "connect_tcp", |
238 | "connect_websocket", | 349 | "connect_websocket", |
239 | "get_unihash", | 350 | "get_unihash", |
351 | "get_unihash_batch", | ||
240 | "report_unihash", | 352 | "report_unihash", |
241 | "report_unihash_equiv", | 353 | "report_unihash_equiv", |
242 | "get_taskhash", | 354 | "get_taskhash", |
243 | "unihash_exists", | 355 | "unihash_exists", |
356 | "unihash_exists_batch", | ||
244 | "get_outhash", | 357 | "get_outhash", |
245 | "get_stats", | 358 | "get_stats", |
246 | "reset_stats", | 359 | "reset_stats", |
@@ -259,88 +372,9 @@ class Client(bb.asyncrpc.Client): | |||
259 | "get_db_query_columns", | 372 | "get_db_query_columns", |
260 | "gc_status", | 373 | "gc_status", |
261 | "gc_mark", | 374 | "gc_mark", |
375 | "gc_mark_stream", | ||
262 | "gc_sweep", | 376 | "gc_sweep", |
263 | ) | 377 | ) |
264 | 378 | ||
265 | def _get_async_client(self): | 379 | def _get_async_client(self): |
266 | return AsyncClient(self.username, self.password) | 380 | return AsyncClient(self.username, self.password) |
267 | |||
268 | |||
269 | class ClientPool(bb.asyncrpc.ClientPool): | ||
270 | def __init__( | ||
271 | self, | ||
272 | address, | ||
273 | max_clients, | ||
274 | *, | ||
275 | username=None, | ||
276 | password=None, | ||
277 | become=None, | ||
278 | ): | ||
279 | super().__init__(max_clients) | ||
280 | self.address = address | ||
281 | self.username = username | ||
282 | self.password = password | ||
283 | self.become = become | ||
284 | |||
285 | async def _new_client(self): | ||
286 | client = await create_async_client( | ||
287 | self.address, | ||
288 | username=self.username, | ||
289 | password=self.password, | ||
290 | ) | ||
291 | if self.become: | ||
292 | await client.become_user(self.become) | ||
293 | return client | ||
294 | |||
295 | def _run_key_tasks(self, queries, call): | ||
296 | results = {key: None for key in queries.keys()} | ||
297 | |||
298 | def make_task(key, args): | ||
299 | async def task(client): | ||
300 | nonlocal results | ||
301 | unihash = await call(client, args) | ||
302 | results[key] = unihash | ||
303 | |||
304 | return task | ||
305 | |||
306 | def gen_tasks(): | ||
307 | for key, args in queries.items(): | ||
308 | yield make_task(key, args) | ||
309 | |||
310 | self.run_tasks(gen_tasks()) | ||
311 | return results | ||
312 | |||
313 | def get_unihashes(self, queries): | ||
314 | """ | ||
315 | Query multiple unihashes in parallel. | ||
316 | |||
317 | The queries argument is a dictionary with arbitrary key. The values | ||
318 | must be a tuple of (method, taskhash). | ||
319 | |||
320 | Returns a dictionary with a corresponding key for each input key, and | ||
321 | the value is the queried unihash (which might be none if the query | ||
322 | failed) | ||
323 | """ | ||
324 | |||
325 | async def call(client, args): | ||
326 | method, taskhash = args | ||
327 | return await client.get_unihash(method, taskhash) | ||
328 | |||
329 | return self._run_key_tasks(queries, call) | ||
330 | |||
331 | def unihashes_exist(self, queries): | ||
332 | """ | ||
333 | Query multiple unihash existence checks in parallel. | ||
334 | |||
335 | The queries argument is a dictionary with arbitrary key. The values | ||
336 | must be a unihash. | ||
337 | |||
338 | Returns a dictionary with a corresponding key for each input key, and | ||
339 | the value is True or False if the unihash is known by the server (or | ||
340 | None if there was a failure) | ||
341 | """ | ||
342 | |||
343 | async def call(client, unihash): | ||
344 | return await client.unihash_exists(unihash) | ||
345 | |||
346 | return self._run_key_tasks(queries, call) | ||
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 68f64f983b..58f95c7bcd 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -10,6 +10,7 @@ import math | |||
10 | import time | 10 | import time |
11 | import os | 11 | import os |
12 | import base64 | 12 | import base64 |
13 | import json | ||
13 | import hashlib | 14 | import hashlib |
14 | from . import create_async_client | 15 | from . import create_async_client |
15 | import bb.asyncrpc | 16 | import bb.asyncrpc |
@@ -256,6 +257,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
256 | "backfill-wait": self.handle_backfill_wait, | 257 | "backfill-wait": self.handle_backfill_wait, |
257 | "remove": self.handle_remove, | 258 | "remove": self.handle_remove, |
258 | "gc-mark": self.handle_gc_mark, | 259 | "gc-mark": self.handle_gc_mark, |
260 | "gc-mark-stream": self.handle_gc_mark_stream, | ||
259 | "gc-sweep": self.handle_gc_sweep, | 261 | "gc-sweep": self.handle_gc_sweep, |
260 | "gc-status": self.handle_gc_status, | 262 | "gc-status": self.handle_gc_status, |
261 | "clean-unused": self.handle_clean_unused, | 263 | "clean-unused": self.handle_clean_unused, |
@@ -584,6 +586,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
584 | return {"count": await self.db.gc_mark(mark, condition)} | 586 | return {"count": await self.db.gc_mark(mark, condition)} |
585 | 587 | ||
586 | @permissions(DB_ADMIN_PERM) | 588 | @permissions(DB_ADMIN_PERM) |
589 | async def handle_gc_mark_stream(self, request): | ||
590 | async def handler(line): | ||
591 | try: | ||
592 | decoded_line = json.loads(line) | ||
593 | except json.JSONDecodeError as exc: | ||
594 | raise bb.asyncrpc.InvokeError( | ||
595 | "Could not decode JSONL input '%s'" % line | ||
596 | ) from exc | ||
597 | |||
598 | try: | ||
599 | mark = decoded_line["mark"] | ||
600 | condition = decoded_line["where"] | ||
601 | if not isinstance(mark, str): | ||
602 | raise TypeError("Bad mark type %s" % type(mark)) | ||
603 | |||
604 | if not isinstance(condition, dict): | ||
605 | raise TypeError("Bad condition type %s" % type(condition)) | ||
606 | except KeyError as exc: | ||
607 | raise bb.asyncrpc.InvokeError( | ||
608 | "Input line is missing key '%s' " % exc | ||
609 | ) from exc | ||
610 | |||
611 | return json.dumps({"count": await self.db.gc_mark(mark, condition)}) | ||
612 | |||
613 | return await self._stream_handler(handler) | ||
614 | |||
615 | @permissions(DB_ADMIN_PERM) | ||
587 | async def handle_gc_sweep(self, request): | 616 | async def handle_gc_sweep(self, request): |
588 | mark = request["mark"] | 617 | mark = request["mark"] |
589 | 618 | ||
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index da2e844a03..976504d7f4 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py | |||
@@ -4,6 +4,7 @@ | |||
4 | # | 4 | # |
5 | # SPDX-License-Identifier: GPL-2.0-only | 5 | # SPDX-License-Identifier: GPL-2.0-only |
6 | # | 6 | # |
7 | from datetime import datetime, timezone | ||
7 | import sqlite3 | 8 | import sqlite3 |
8 | import logging | 9 | import logging |
9 | from contextlib import closing | 10 | from contextlib import closing |
@@ -53,6 +54,22 @@ CONFIG_TABLE_DEFINITION = ( | |||
53 | CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) | 54 | CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) |
54 | 55 | ||
55 | 56 | ||
57 | def adapt_datetime_iso(val): | ||
58 | """Adapt datetime.datetime to UTC ISO 8601 date.""" | ||
59 | return val.astimezone(timezone.utc).isoformat() | ||
60 | |||
61 | |||
62 | sqlite3.register_adapter(datetime, adapt_datetime_iso) | ||
63 | |||
64 | |||
65 | def convert_datetime(val): | ||
66 | """Convert ISO 8601 datetime to datetime.datetime object.""" | ||
67 | return datetime.fromisoformat(val.decode()) | ||
68 | |||
69 | |||
70 | sqlite3.register_converter("DATETIME", convert_datetime) | ||
71 | |||
72 | |||
56 | def _make_table(cursor, name, definition): | 73 | def _make_table(cursor, name, definition): |
57 | cursor.execute( | 74 | cursor.execute( |
58 | """ | 75 | """ |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 0809453cf8..da3f8e0884 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -8,7 +8,6 @@ | |||
8 | from . import create_server, create_client | 8 | from . import create_server, create_client |
9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS | 9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS |
10 | from bb.asyncrpc import InvokeError | 10 | from bb.asyncrpc import InvokeError |
11 | from .client import ClientPool | ||
12 | import hashlib | 11 | import hashlib |
13 | import logging | 12 | import logging |
14 | import multiprocessing | 13 | import multiprocessing |
@@ -94,9 +93,6 @@ class HashEquivalenceTestSetup(object): | |||
94 | return self.start_client(self.auth_server_address, user["username"], user["token"]) | 93 | return self.start_client(self.auth_server_address, user["username"], user["token"]) |
95 | 94 | ||
96 | def setUp(self): | 95 | def setUp(self): |
97 | if sys.version_info < (3, 5, 0): | ||
98 | self.skipTest('Python 3.5 or later required') | ||
99 | |||
100 | self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') | 96 | self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') |
101 | self.addCleanup(self.temp_dir.cleanup) | 97 | self.addCleanup(self.temp_dir.cleanup) |
102 | 98 | ||
@@ -555,8 +551,7 @@ class HashEquivalenceCommonTests(object): | |||
555 | # shares a taskhash with Task 2 | 551 | # shares a taskhash with Task 2 |
556 | self.assertClientGetHash(self.client, taskhash2, unihash2) | 552 | self.assertClientGetHash(self.client, taskhash2, unihash2) |
557 | 553 | ||
558 | 554 | def test_get_unihash_batch(self): | |
559 | def test_client_pool_get_unihashes(self): | ||
560 | TEST_INPUT = ( | 555 | TEST_INPUT = ( |
561 | # taskhash outhash unihash | 556 | # taskhash outhash unihash |
562 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | 557 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), |
@@ -573,28 +568,27 @@ class HashEquivalenceCommonTests(object): | |||
573 | "6b6be7a84ab179b4240c4302518dc3f6", | 568 | "6b6be7a84ab179b4240c4302518dc3f6", |
574 | ) | 569 | ) |
575 | 570 | ||
576 | with ClientPool(self.server_address, 10) as client_pool: | 571 | for taskhash, outhash, unihash in TEST_INPUT: |
577 | for taskhash, outhash, unihash in TEST_INPUT: | 572 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) |
578 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | 573 | |
579 | 574 | ||
580 | query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} | 575 | result = self.client.get_unihash_batch( |
581 | for idx, taskhash in enumerate(EXTRA_QUERIES): | 576 | [(self.METHOD, data[0]) for data in TEST_INPUT] + |
582 | query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) | 577 | [(self.METHOD, e) for e in EXTRA_QUERIES] |
583 | 578 | ) | |
584 | result = client_pool.get_unihashes(query) | 579 | |
585 | 580 | self.assertListEqual(result, [ | |
586 | self.assertDictEqual(result, { | 581 | "218e57509998197d570e2c98512d0105985dffc9", |
587 | 0: "218e57509998197d570e2c98512d0105985dffc9", | 582 | "218e57509998197d570e2c98512d0105985dffc9", |
588 | 1: "218e57509998197d570e2c98512d0105985dffc9", | 583 | "218e57509998197d570e2c98512d0105985dffc9", |
589 | 2: "218e57509998197d570e2c98512d0105985dffc9", | 584 | "3b5d3d83f07f259e9086fcb422c855286e18a57d", |
590 | 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", | 585 | "f46d3fbb439bd9b921095da657a4de906510d2cd", |
591 | 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", | 586 | "f46d3fbb439bd9b921095da657a4de906510d2cd", |
592 | 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", | 587 | "05d2a63c81e32f0a36542ca677e8ad852365c538", |
593 | 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", | 588 | None, |
594 | 7: None, | 589 | ]) |
595 | }) | ||
596 | 590 | ||
597 | def test_client_pool_unihash_exists(self): | 591 | def test_unihash_exists_batch(self): |
598 | TEST_INPUT = ( | 592 | TEST_INPUT = ( |
599 | # taskhash outhash unihash | 593 | # taskhash outhash unihash |
600 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | 594 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), |
@@ -614,28 +608,24 @@ class HashEquivalenceCommonTests(object): | |||
614 | result_unihashes = set() | 608 | result_unihashes = set() |
615 | 609 | ||
616 | 610 | ||
617 | with ClientPool(self.server_address, 10) as client_pool: | 611 | for taskhash, outhash, unihash in TEST_INPUT: |
618 | for taskhash, outhash, unihash in TEST_INPUT: | 612 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) |
619 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | 613 | result_unihashes.add(result["unihash"]) |
620 | result_unihashes.add(result["unihash"]) | ||
621 | 614 | ||
622 | query = {} | 615 | query = [] |
623 | expected = {} | 616 | expected = [] |
624 | 617 | ||
625 | for _, _, unihash in TEST_INPUT: | 618 | for _, _, unihash in TEST_INPUT: |
626 | idx = len(query) | 619 | query.append(unihash) |
627 | query[idx] = unihash | 620 | expected.append(unihash in result_unihashes) |
628 | expected[idx] = unihash in result_unihashes | ||
629 | 621 | ||
630 | 622 | ||
631 | for unihash in EXTRA_QUERIES: | 623 | for unihash in EXTRA_QUERIES: |
632 | idx = len(query) | 624 | query.append(unihash) |
633 | query[idx] = unihash | 625 | expected.append(False) |
634 | expected[idx] = False | ||
635 | |||
636 | result = client_pool.unihashes_exist(query) | ||
637 | self.assertDictEqual(result, expected) | ||
638 | 626 | ||
627 | result = self.client.unihash_exists_batch(query) | ||
628 | self.assertListEqual(result, expected) | ||
639 | 629 | ||
640 | def test_auth_read_perms(self): | 630 | def test_auth_read_perms(self): |
641 | admin_client = self.start_auth_server() | 631 | admin_client = self.start_auth_server() |
@@ -979,6 +969,48 @@ class HashEquivalenceCommonTests(object): | |||
979 | # First hash is still present | 969 | # First hash is still present |
980 | self.assertClientGetHash(self.client, taskhash, unihash) | 970 | self.assertClientGetHash(self.client, taskhash, unihash) |
981 | 971 | ||
972 | def test_gc_stream(self): | ||
973 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
974 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
975 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
976 | |||
977 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
978 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
979 | |||
980 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
981 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
982 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
983 | |||
984 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
985 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
986 | |||
987 | taskhash3 = 'a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c0' | ||
988 | outhash3 = '7289c414905303700a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c04f9a53c' | ||
989 | unihash3 = '905303700a1117c1f5a7c9ab2f5a39cc6fe5e615' | ||
990 | |||
991 | result = self.client.report_unihash(taskhash3, self.METHOD, outhash3, unihash3) | ||
992 | self.assertClientGetHash(self.client, taskhash3, unihash3) | ||
993 | |||
994 | # Mark the first unihash to be kept | ||
995 | ret = self.client.gc_mark_stream("ABC", (f"unihash {h}" for h in [unihash, unihash2])) | ||
996 | self.assertEqual(ret, {"count": 2}) | ||
997 | |||
998 | ret = self.client.gc_status() | ||
999 | self.assertEqual(ret, {"mark": "ABC", "keep": 2, "remove": 1}) | ||
1000 | |||
1001 | # Third hash is still there; mark doesn't delete hashes | ||
1002 | self.assertClientGetHash(self.client, taskhash3, unihash3) | ||
1003 | |||
1004 | ret = self.client.gc_sweep("ABC") | ||
1005 | self.assertEqual(ret, {"count": 1}) | ||
1006 | |||
1007 | # Hash is gone. Taskhash is returned for second hash | ||
1008 | self.assertClientGetHash(self.client, taskhash3, None) | ||
1009 | # First hash is still present | ||
1010 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
1011 | # Second hash is still present | ||
1012 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
1013 | |||
982 | def test_gc_switch_mark(self): | 1014 | def test_gc_switch_mark(self): |
983 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | 1015 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' |
984 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | 1016 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' |