summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv')
-rw-r--r--bitbake/lib/hashserv/__init__.py6
-rw-r--r--bitbake/lib/hashserv/client.py220
-rw-r--r--bitbake/lib/hashserv/server.py29
-rw-r--r--bitbake/lib/hashserv/sqlite.py17
-rw-r--r--bitbake/lib/hashserv/tests.py120
5 files changed, 253 insertions, 139 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 74367eb6b4..ac891e0174 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -13,6 +13,7 @@ from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
13 13
14User = namedtuple("User", ("username", "permissions")) 14User = namedtuple("User", ("username", "permissions"))
15 15
16
16def create_server( 17def create_server(
17 addr, 18 addr,
18 dbname, 19 dbname,
@@ -25,6 +26,7 @@ def create_server(
25 anon_perms=None, 26 anon_perms=None,
26 admin_username=None, 27 admin_username=None,
27 admin_password=None, 28 admin_password=None,
29 reuseport=False,
28): 30):
29 def sqlite_engine(): 31 def sqlite_engine():
30 from .sqlite import DatabaseEngine 32 from .sqlite import DatabaseEngine
@@ -60,9 +62,9 @@ def create_server(
60 s.start_unix_server(*a) 62 s.start_unix_server(*a)
61 elif typ == ADDR_TYPE_WS: 63 elif typ == ADDR_TYPE_WS:
62 url = urlparse(a[0]) 64 url = urlparse(a[0])
63 s.start_websocket_server(url.hostname, url.port) 65 s.start_websocket_server(url.hostname, url.port, reuseport=reuseport)
64 else: 66 else:
65 s.start_tcp_server(*a) 67 s.start_tcp_server(*a, reuseport=reuseport)
66 68
67 return s 69 return s
68 70
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 0b254beddd..8cb18050a6 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -5,6 +5,7 @@
5 5
6import logging 6import logging
7import socket 7import socket
8import asyncio
8import bb.asyncrpc 9import bb.asyncrpc
9import json 10import json
10from . import create_async_client 11from . import create_async_client
@@ -13,10 +14,71 @@ from . import create_async_client
13logger = logging.getLogger("hashserv.client") 14logger = logging.getLogger("hashserv.client")
14 15
15 16
17class Batch(object):
18 def __init__(self):
19 self.done = False
20 self.cond = asyncio.Condition()
21 self.pending = []
22 self.results = []
23 self.sent_count = 0
24
25 async def recv(self, socket):
26 while True:
27 async with self.cond:
28 await self.cond.wait_for(lambda: self.pending or self.done)
29
30 if not self.pending:
31 if self.done:
32 return
33 continue
34
35 r = await socket.recv()
36 self.results.append(r)
37
38 async with self.cond:
39 self.pending.pop(0)
40
41 async def send(self, socket, msgs):
42 try:
43 # In the event of a restart due to a reconnect, all in-flight
44 # messages need to be resent first to keep to result count in sync
45 for m in self.pending:
46 await socket.send(m)
47
48 for m in msgs:
49 # Add the message to the pending list before attempting to send
50 # it so that if the send fails it will be retried
51 async with self.cond:
52 self.pending.append(m)
53 self.cond.notify()
54 self.sent_count += 1
55
56 await socket.send(m)
57
58 finally:
59 async with self.cond:
60 self.done = True
61 self.cond.notify()
62
63 async def process(self, socket, msgs):
64 await asyncio.gather(
65 self.recv(socket),
66 self.send(socket, msgs),
67 )
68
69 if len(self.results) != self.sent_count:
70 raise ValueError(
71 f"Expected result count {len(self.results)}. Expected {self.sent_count}"
72 )
73
74 return self.results
75
76
16class AsyncClient(bb.asyncrpc.AsyncClient): 77class AsyncClient(bb.asyncrpc.AsyncClient):
17 MODE_NORMAL = 0 78 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1 79 MODE_GET_STREAM = 1
19 MODE_EXIST_STREAM = 2 80 MODE_EXIST_STREAM = 2
81 MODE_MARK_STREAM = 3
20 82
21 def __init__(self, username=None, password=None): 83 def __init__(self, username=None, password=None):
22 super().__init__("OEHASHEQUIV", "1.1", logger) 84 super().__init__("OEHASHEQUIV", "1.1", logger)
@@ -36,32 +98,52 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
36 if become: 98 if become:
37 await self.become_user(become) 99 await self.become_user(become)
38 100
39 async def send_stream(self, mode, msg): 101 async def send_stream_batch(self, mode, msgs):
102 """
103 Does a "batch" process of stream messages. This sends the query
104 messages as fast as possible, and simultaneously attempts to read the
105 messages back. This helps to mitigate the effects of latency to the
106 hash equivalence server be allowing multiple queries to be "in-flight"
107 at once
108
109 The implementation does more complicated tracking using a count of sent
110 messages so that `msgs` can be a generator function (i.e. its length is
111 unknown)
112
113 """
114
115 b = Batch()
116
40 async def proc(): 117 async def proc():
118 nonlocal b
119
41 await self._set_mode(mode) 120 await self._set_mode(mode)
42 await self.socket.send(msg) 121 return await b.process(self.socket, msgs)
43 return await self.socket.recv()
44 122
45 return await self._send_wrapper(proc) 123 return await self._send_wrapper(proc)
46 124
47 async def invoke(self, *args, **kwargs): 125 async def invoke(self, *args, skip_mode=False, **kwargs):
48 # It's OK if connection errors cause a failure here, because the mode 126 # It's OK if connection errors cause a failure here, because the mode
49 # is also reset to normal on a new connection 127 # is also reset to normal on a new connection
50 await self._set_mode(self.MODE_NORMAL) 128 if not skip_mode:
129 await self._set_mode(self.MODE_NORMAL)
51 return await super().invoke(*args, **kwargs) 130 return await super().invoke(*args, **kwargs)
52 131
53 async def _set_mode(self, new_mode): 132 async def _set_mode(self, new_mode):
54 async def stream_to_normal(): 133 async def stream_to_normal():
134 # Check if already in normal mode (e.g. due to a connection reset)
135 if self.mode == self.MODE_NORMAL:
136 return "ok"
55 await self.socket.send("END") 137 await self.socket.send("END")
56 return await self.socket.recv() 138 return await self.socket.recv()
57 139
58 async def normal_to_stream(command): 140 async def normal_to_stream(command):
59 r = await self.invoke({command: None}) 141 r = await self.invoke({command: None}, skip_mode=True)
60 if r != "ok": 142 if r != "ok":
143 self.check_invoke_error(r)
61 raise ConnectionError( 144 raise ConnectionError(
62 f"Unable to transition to stream mode: Bad response from server {r!r}" 145 f"Unable to transition to stream mode: Bad response from server {r!r}"
63 ) 146 )
64
65 self.logger.debug("Mode is now %s", command) 147 self.logger.debug("Mode is now %s", command)
66 148
67 if new_mode == self.mode: 149 if new_mode == self.mode:
@@ -83,16 +165,23 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
83 await normal_to_stream("get-stream") 165 await normal_to_stream("get-stream")
84 elif new_mode == self.MODE_EXIST_STREAM: 166 elif new_mode == self.MODE_EXIST_STREAM:
85 await normal_to_stream("exists-stream") 167 await normal_to_stream("exists-stream")
168 elif new_mode == self.MODE_MARK_STREAM:
169 await normal_to_stream("gc-mark-stream")
86 elif new_mode != self.MODE_NORMAL: 170 elif new_mode != self.MODE_NORMAL:
87 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") 171 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
88 172
89 self.mode = new_mode 173 self.mode = new_mode
90 174
91 async def get_unihash(self, method, taskhash): 175 async def get_unihash(self, method, taskhash):
92 r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) 176 r = await self.get_unihash_batch([(method, taskhash)])
93 if not r: 177 return r[0]
94 return None 178
95 return r 179 async def get_unihash_batch(self, args):
180 result = await self.send_stream_batch(
181 self.MODE_GET_STREAM,
182 (f"{method} {taskhash}" for method, taskhash in args),
183 )
184 return [r if r else None for r in result]
96 185
97 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 186 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
98 m = extra.copy() 187 m = extra.copy()
@@ -115,8 +204,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
115 ) 204 )
116 205
117 async def unihash_exists(self, unihash): 206 async def unihash_exists(self, unihash):
118 r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) 207 r = await self.unihash_exists_batch([unihash])
119 return r == "true" 208 return r[0]
209
210 async def unihash_exists_batch(self, unihashes):
211 result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes)
212 return [r == "true" for r in result]
120 213
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 214 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 return await self.invoke( 215 return await self.invoke(
@@ -216,6 +309,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
216 """ 309 """
217 return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) 310 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
218 311
312 async def gc_mark_stream(self, mark, rows):
313 """
314 Similar to `gc-mark`, but accepts a list of "where" key-value pair
315 conditions. It utilizes stream mode to mark hashes, which helps reduce
316 the impact of latency when communicating with the hash equivalence
317 server.
318 """
319 def row_to_dict(row):
320 pairs = row.split()
321 return dict(zip(pairs[::2], pairs[1::2]))
322
323 responses = await self.send_stream_batch(
324 self.MODE_MARK_STREAM,
325 (json.dumps({"mark": mark, "where": row_to_dict(row)}) for row in rows),
326 )
327
328 return {"count": sum(int(json.loads(r)["count"]) for r in responses)}
329
219 async def gc_sweep(self, mark): 330 async def gc_sweep(self, mark):
220 """ 331 """
221 Finishes garbage collection for "mark". All unihash entries that have 332 Finishes garbage collection for "mark". All unihash entries that have
@@ -237,10 +348,12 @@ class Client(bb.asyncrpc.Client):
237 "connect_tcp", 348 "connect_tcp",
238 "connect_websocket", 349 "connect_websocket",
239 "get_unihash", 350 "get_unihash",
351 "get_unihash_batch",
240 "report_unihash", 352 "report_unihash",
241 "report_unihash_equiv", 353 "report_unihash_equiv",
242 "get_taskhash", 354 "get_taskhash",
243 "unihash_exists", 355 "unihash_exists",
356 "unihash_exists_batch",
244 "get_outhash", 357 "get_outhash",
245 "get_stats", 358 "get_stats",
246 "reset_stats", 359 "reset_stats",
@@ -259,88 +372,9 @@ class Client(bb.asyncrpc.Client):
259 "get_db_query_columns", 372 "get_db_query_columns",
260 "gc_status", 373 "gc_status",
261 "gc_mark", 374 "gc_mark",
375 "gc_mark_stream",
262 "gc_sweep", 376 "gc_sweep",
263 ) 377 )
264 378
265 def _get_async_client(self): 379 def _get_async_client(self):
266 return AsyncClient(self.username, self.password) 380 return AsyncClient(self.username, self.password)
267
268
269class ClientPool(bb.asyncrpc.ClientPool):
270 def __init__(
271 self,
272 address,
273 max_clients,
274 *,
275 username=None,
276 password=None,
277 become=None,
278 ):
279 super().__init__(max_clients)
280 self.address = address
281 self.username = username
282 self.password = password
283 self.become = become
284
285 async def _new_client(self):
286 client = await create_async_client(
287 self.address,
288 username=self.username,
289 password=self.password,
290 )
291 if self.become:
292 await client.become_user(self.become)
293 return client
294
295 def _run_key_tasks(self, queries, call):
296 results = {key: None for key in queries.keys()}
297
298 def make_task(key, args):
299 async def task(client):
300 nonlocal results
301 unihash = await call(client, args)
302 results[key] = unihash
303
304 return task
305
306 def gen_tasks():
307 for key, args in queries.items():
308 yield make_task(key, args)
309
310 self.run_tasks(gen_tasks())
311 return results
312
313 def get_unihashes(self, queries):
314 """
315 Query multiple unihashes in parallel.
316
317 The queries argument is a dictionary with arbitrary key. The values
318 must be a tuple of (method, taskhash).
319
320 Returns a dictionary with a corresponding key for each input key, and
321 the value is the queried unihash (which might be none if the query
322 failed)
323 """
324
325 async def call(client, args):
326 method, taskhash = args
327 return await client.get_unihash(method, taskhash)
328
329 return self._run_key_tasks(queries, call)
330
331 def unihashes_exist(self, queries):
332 """
333 Query multiple unihash existence checks in parallel.
334
335 The queries argument is a dictionary with arbitrary key. The values
336 must be a unihash.
337
338 Returns a dictionary with a corresponding key for each input key, and
339 the value is True or False if the unihash is known by the server (or
340 None if there was a failure)
341 """
342
343 async def call(client, unihash):
344 return await client.unihash_exists(unihash)
345
346 return self._run_key_tasks(queries, call)
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 68f64f983b..58f95c7bcd 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -10,6 +10,7 @@ import math
10import time 10import time
11import os 11import os
12import base64 12import base64
13import json
13import hashlib 14import hashlib
14from . import create_async_client 15from . import create_async_client
15import bb.asyncrpc 16import bb.asyncrpc
@@ -256,6 +257,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
256 "backfill-wait": self.handle_backfill_wait, 257 "backfill-wait": self.handle_backfill_wait,
257 "remove": self.handle_remove, 258 "remove": self.handle_remove,
258 "gc-mark": self.handle_gc_mark, 259 "gc-mark": self.handle_gc_mark,
260 "gc-mark-stream": self.handle_gc_mark_stream,
259 "gc-sweep": self.handle_gc_sweep, 261 "gc-sweep": self.handle_gc_sweep,
260 "gc-status": self.handle_gc_status, 262 "gc-status": self.handle_gc_status,
261 "clean-unused": self.handle_clean_unused, 263 "clean-unused": self.handle_clean_unused,
@@ -584,6 +586,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
584 return {"count": await self.db.gc_mark(mark, condition)} 586 return {"count": await self.db.gc_mark(mark, condition)}
585 587
586 @permissions(DB_ADMIN_PERM) 588 @permissions(DB_ADMIN_PERM)
589 async def handle_gc_mark_stream(self, request):
590 async def handler(line):
591 try:
592 decoded_line = json.loads(line)
593 except json.JSONDecodeError as exc:
594 raise bb.asyncrpc.InvokeError(
595 "Could not decode JSONL input '%s'" % line
596 ) from exc
597
598 try:
599 mark = decoded_line["mark"]
600 condition = decoded_line["where"]
601 if not isinstance(mark, str):
602 raise TypeError("Bad mark type %s" % type(mark))
603
604 if not isinstance(condition, dict):
605 raise TypeError("Bad condition type %s" % type(condition))
606 except KeyError as exc:
607 raise bb.asyncrpc.InvokeError(
608 "Input line is missing key '%s' " % exc
609 ) from exc
610
611 return json.dumps({"count": await self.db.gc_mark(mark, condition)})
612
613 return await self._stream_handler(handler)
614
615 @permissions(DB_ADMIN_PERM)
587 async def handle_gc_sweep(self, request): 616 async def handle_gc_sweep(self, request):
588 mark = request["mark"] 617 mark = request["mark"]
589 618
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py
index da2e844a03..976504d7f4 100644
--- a/bitbake/lib/hashserv/sqlite.py
+++ b/bitbake/lib/hashserv/sqlite.py
@@ -4,6 +4,7 @@
4# 4#
5# SPDX-License-Identifier: GPL-2.0-only 5# SPDX-License-Identifier: GPL-2.0-only
6# 6#
7from datetime import datetime, timezone
7import sqlite3 8import sqlite3
8import logging 9import logging
9from contextlib import closing 10from contextlib import closing
@@ -53,6 +54,22 @@ CONFIG_TABLE_DEFINITION = (
53CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) 54CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION)
54 55
55 56
57def adapt_datetime_iso(val):
58 """Adapt datetime.datetime to UTC ISO 8601 date."""
59 return val.astimezone(timezone.utc).isoformat()
60
61
62sqlite3.register_adapter(datetime, adapt_datetime_iso)
63
64
65def convert_datetime(val):
66 """Convert ISO 8601 datetime to datetime.datetime object."""
67 return datetime.fromisoformat(val.decode())
68
69
70sqlite3.register_converter("DATETIME", convert_datetime)
71
72
56def _make_table(cursor, name, definition): 73def _make_table(cursor, name, definition):
57 cursor.execute( 74 cursor.execute(
58 """ 75 """
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 0809453cf8..da3f8e0884 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -8,7 +8,6 @@
8from . import create_server, create_client 8from . import create_server, create_client
9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS 9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS
10from bb.asyncrpc import InvokeError 10from bb.asyncrpc import InvokeError
11from .client import ClientPool
12import hashlib 11import hashlib
13import logging 12import logging
14import multiprocessing 13import multiprocessing
@@ -94,9 +93,6 @@ class HashEquivalenceTestSetup(object):
94 return self.start_client(self.auth_server_address, user["username"], user["token"]) 93 return self.start_client(self.auth_server_address, user["username"], user["token"])
95 94
96 def setUp(self): 95 def setUp(self):
97 if sys.version_info < (3, 5, 0):
98 self.skipTest('Python 3.5 or later required')
99
100 self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') 96 self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv')
101 self.addCleanup(self.temp_dir.cleanup) 97 self.addCleanup(self.temp_dir.cleanup)
102 98
@@ -555,8 +551,7 @@ class HashEquivalenceCommonTests(object):
555 # shares a taskhash with Task 2 551 # shares a taskhash with Task 2
556 self.assertClientGetHash(self.client, taskhash2, unihash2) 552 self.assertClientGetHash(self.client, taskhash2, unihash2)
557 553
558 554 def test_get_unihash_batch(self):
559 def test_client_pool_get_unihashes(self):
560 TEST_INPUT = ( 555 TEST_INPUT = (
561 # taskhash outhash unihash 556 # taskhash outhash unihash
562 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), 557 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
@@ -573,28 +568,27 @@ class HashEquivalenceCommonTests(object):
573 "6b6be7a84ab179b4240c4302518dc3f6", 568 "6b6be7a84ab179b4240c4302518dc3f6",
574 ) 569 )
575 570
576 with ClientPool(self.server_address, 10) as client_pool: 571 for taskhash, outhash, unihash in TEST_INPUT:
577 for taskhash, outhash, unihash in TEST_INPUT: 572 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
578 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) 573
579 574
580 query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} 575 result = self.client.get_unihash_batch(
581 for idx, taskhash in enumerate(EXTRA_QUERIES): 576 [(self.METHOD, data[0]) for data in TEST_INPUT] +
582 query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) 577 [(self.METHOD, e) for e in EXTRA_QUERIES]
583 578 )
584 result = client_pool.get_unihashes(query) 579
585 580 self.assertListEqual(result, [
586 self.assertDictEqual(result, { 581 "218e57509998197d570e2c98512d0105985dffc9",
587 0: "218e57509998197d570e2c98512d0105985dffc9", 582 "218e57509998197d570e2c98512d0105985dffc9",
588 1: "218e57509998197d570e2c98512d0105985dffc9", 583 "218e57509998197d570e2c98512d0105985dffc9",
589 2: "218e57509998197d570e2c98512d0105985dffc9", 584 "3b5d3d83f07f259e9086fcb422c855286e18a57d",
590 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", 585 "f46d3fbb439bd9b921095da657a4de906510d2cd",
591 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", 586 "f46d3fbb439bd9b921095da657a4de906510d2cd",
592 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", 587 "05d2a63c81e32f0a36542ca677e8ad852365c538",
593 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", 588 None,
594 7: None, 589 ])
595 })
596 590
597 def test_client_pool_unihash_exists(self): 591 def test_unihash_exists_batch(self):
598 TEST_INPUT = ( 592 TEST_INPUT = (
599 # taskhash outhash unihash 593 # taskhash outhash unihash
600 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), 594 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
@@ -614,28 +608,24 @@ class HashEquivalenceCommonTests(object):
614 result_unihashes = set() 608 result_unihashes = set()
615 609
616 610
617 with ClientPool(self.server_address, 10) as client_pool: 611 for taskhash, outhash, unihash in TEST_INPUT:
618 for taskhash, outhash, unihash in TEST_INPUT: 612 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
619 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) 613 result_unihashes.add(result["unihash"])
620 result_unihashes.add(result["unihash"])
621 614
622 query = {} 615 query = []
623 expected = {} 616 expected = []
624 617
625 for _, _, unihash in TEST_INPUT: 618 for _, _, unihash in TEST_INPUT:
626 idx = len(query) 619 query.append(unihash)
627 query[idx] = unihash 620 expected.append(unihash in result_unihashes)
628 expected[idx] = unihash in result_unihashes
629 621
630 622
631 for unihash in EXTRA_QUERIES: 623 for unihash in EXTRA_QUERIES:
632 idx = len(query) 624 query.append(unihash)
633 query[idx] = unihash 625 expected.append(False)
634 expected[idx] = False
635
636 result = client_pool.unihashes_exist(query)
637 self.assertDictEqual(result, expected)
638 626
627 result = self.client.unihash_exists_batch(query)
628 self.assertListEqual(result, expected)
639 629
640 def test_auth_read_perms(self): 630 def test_auth_read_perms(self):
641 admin_client = self.start_auth_server() 631 admin_client = self.start_auth_server()
@@ -979,6 +969,48 @@ class HashEquivalenceCommonTests(object):
979 # First hash is still present 969 # First hash is still present
980 self.assertClientGetHash(self.client, taskhash, unihash) 970 self.assertClientGetHash(self.client, taskhash, unihash)
981 971
972 def test_gc_stream(self):
973 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
974 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
975 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
976
977 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
978 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
979
980 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
981 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
982 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
983
984 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
985 self.assertClientGetHash(self.client, taskhash2, unihash2)
986
987 taskhash3 = 'a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c0'
988 outhash3 = '7289c414905303700a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c04f9a53c'
989 unihash3 = '905303700a1117c1f5a7c9ab2f5a39cc6fe5e615'
990
991 result = self.client.report_unihash(taskhash3, self.METHOD, outhash3, unihash3)
992 self.assertClientGetHash(self.client, taskhash3, unihash3)
993
994 # Mark the first unihash to be kept
995 ret = self.client.gc_mark_stream("ABC", (f"unihash {h}" for h in [unihash, unihash2]))
996 self.assertEqual(ret, {"count": 2})
997
998 ret = self.client.gc_status()
999 self.assertEqual(ret, {"mark": "ABC", "keep": 2, "remove": 1})
1000
1001 # Third hash is still there; mark doesn't delete hashes
1002 self.assertClientGetHash(self.client, taskhash3, unihash3)
1003
1004 ret = self.client.gc_sweep("ABC")
1005 self.assertEqual(ret, {"count": 1})
1006
1007 # Hash is gone. Taskhash is returned for second hash
1008 self.assertClientGetHash(self.client, taskhash3, None)
1009 # First hash is still present
1010 self.assertClientGetHash(self.client, taskhash, unihash)
1011 # Second hash is still present
1012 self.assertClientGetHash(self.client, taskhash2, unihash2)
1013
982 def test_gc_switch_mark(self): 1014 def test_gc_switch_mark(self):
983 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' 1015 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
984 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' 1016 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'