summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv')
-rw-r--r--bitbake/lib/hashserv/__init__.py31
-rw-r--r--bitbake/lib/hashserv/client.py249
-rw-r--r--bitbake/lib/hashserv/server.py29
-rw-r--r--bitbake/lib/hashserv/sqlite.py17
-rw-r--r--bitbake/lib/hashserv/tests.py120
5 files changed, 258 insertions, 188 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 552a33278f..ac891e0174 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -5,39 +5,15 @@
5 5
6import asyncio 6import asyncio
7from contextlib import closing 7from contextlib import closing
8import re
9import itertools 8import itertools
10import json 9import json
11from collections import namedtuple 10from collections import namedtuple
12from urllib.parse import urlparse 11from urllib.parse import urlparse
13 12from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
14UNIX_PREFIX = "unix://"
15WS_PREFIX = "ws://"
16WSS_PREFIX = "wss://"
17
18ADDR_TYPE_UNIX = 0
19ADDR_TYPE_TCP = 1
20ADDR_TYPE_WS = 2
21 13
22User = namedtuple("User", ("username", "permissions")) 14User = namedtuple("User", ("username", "permissions"))
23 15
24 16
25def parse_address(addr):
26 if addr.startswith(UNIX_PREFIX):
27 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],))
28 elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
29 return (ADDR_TYPE_WS, (addr,))
30 else:
31 m = re.match(r"\[(?P<host>[^\]]*)\]:(?P<port>\d+)$", addr)
32 if m is not None:
33 host = m.group("host")
34 port = m.group("port")
35 else:
36 host, port = addr.split(":")
37
38 return (ADDR_TYPE_TCP, (host, int(port)))
39
40
41def create_server( 17def create_server(
42 addr, 18 addr,
43 dbname, 19 dbname,
@@ -50,6 +26,7 @@ def create_server(
50 anon_perms=None, 26 anon_perms=None,
51 admin_username=None, 27 admin_username=None,
52 admin_password=None, 28 admin_password=None,
29 reuseport=False,
53): 30):
54 def sqlite_engine(): 31 def sqlite_engine():
55 from .sqlite import DatabaseEngine 32 from .sqlite import DatabaseEngine
@@ -85,9 +62,9 @@ def create_server(
85 s.start_unix_server(*a) 62 s.start_unix_server(*a)
86 elif typ == ADDR_TYPE_WS: 63 elif typ == ADDR_TYPE_WS:
87 url = urlparse(a[0]) 64 url = urlparse(a[0])
88 s.start_websocket_server(url.hostname, url.port) 65 s.start_websocket_server(url.hostname, url.port, reuseport=reuseport)
89 else: 66 else:
90 s.start_tcp_server(*a) 67 s.start_tcp_server(*a, reuseport=reuseport)
91 68
92 return s 69 return s
93 70
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index b269879ecf..8cb18050a6 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -5,6 +5,7 @@
5 5
6import logging 6import logging
7import socket 7import socket
8import asyncio
8import bb.asyncrpc 9import bb.asyncrpc
9import json 10import json
10from . import create_async_client 11from . import create_async_client
@@ -13,10 +14,71 @@ from . import create_async_client
13logger = logging.getLogger("hashserv.client") 14logger = logging.getLogger("hashserv.client")
14 15
15 16
17class Batch(object):
18 def __init__(self):
19 self.done = False
20 self.cond = asyncio.Condition()
21 self.pending = []
22 self.results = []
23 self.sent_count = 0
24
25 async def recv(self, socket):
26 while True:
27 async with self.cond:
28 await self.cond.wait_for(lambda: self.pending or self.done)
29
30 if not self.pending:
31 if self.done:
32 return
33 continue
34
35 r = await socket.recv()
36 self.results.append(r)
37
38 async with self.cond:
39 self.pending.pop(0)
40
41 async def send(self, socket, msgs):
42 try:
43 # In the event of a restart due to a reconnect, all in-flight
44 # messages need to be resent first to keep to result count in sync
45 for m in self.pending:
46 await socket.send(m)
47
48 for m in msgs:
49 # Add the message to the pending list before attempting to send
50 # it so that if the send fails it will be retried
51 async with self.cond:
52 self.pending.append(m)
53 self.cond.notify()
54 self.sent_count += 1
55
56 await socket.send(m)
57
58 finally:
59 async with self.cond:
60 self.done = True
61 self.cond.notify()
62
63 async def process(self, socket, msgs):
64 await asyncio.gather(
65 self.recv(socket),
66 self.send(socket, msgs),
67 )
68
69 if len(self.results) != self.sent_count:
70 raise ValueError(
71 f"Expected result count {len(self.results)}. Expected {self.sent_count}"
72 )
73
74 return self.results
75
76
16class AsyncClient(bb.asyncrpc.AsyncClient): 77class AsyncClient(bb.asyncrpc.AsyncClient):
17 MODE_NORMAL = 0 78 MODE_NORMAL = 0
18 MODE_GET_STREAM = 1 79 MODE_GET_STREAM = 1
19 MODE_EXIST_STREAM = 2 80 MODE_EXIST_STREAM = 2
81 MODE_MARK_STREAM = 3
20 82
21 def __init__(self, username=None, password=None): 83 def __init__(self, username=None, password=None):
22 super().__init__("OEHASHEQUIV", "1.1", logger) 84 super().__init__("OEHASHEQUIV", "1.1", logger)
@@ -27,9 +89,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
27 89
28 async def setup_connection(self): 90 async def setup_connection(self):
29 await super().setup_connection() 91 await super().setup_connection()
30 cur_mode = self.mode
31 self.mode = self.MODE_NORMAL 92 self.mode = self.MODE_NORMAL
32 await self._set_mode(cur_mode)
33 if self.username: 93 if self.username:
34 # Save off become user temporarily because auth() resets it 94 # Save off become user temporarily because auth() resets it
35 become = self.saved_become_user 95 become = self.saved_become_user
@@ -38,25 +98,52 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
38 if become: 98 if become:
39 await self.become_user(become) 99 await self.become_user(become)
40 100
41 async def send_stream(self, msg): 101 async def send_stream_batch(self, mode, msgs):
102 """
103 Does a "batch" process of stream messages. This sends the query
104 messages as fast as possible, and simultaneously attempts to read the
105 messages back. This helps to mitigate the effects of latency to the
106 hash equivalence server be allowing multiple queries to be "in-flight"
107 at once
108
109 The implementation does more complicated tracking using a count of sent
110 messages so that `msgs` can be a generator function (i.e. its length is
111 unknown)
112
113 """
114
115 b = Batch()
116
42 async def proc(): 117 async def proc():
43 await self.socket.send(msg) 118 nonlocal b
44 return await self.socket.recv() 119
120 await self._set_mode(mode)
121 return await b.process(self.socket, msgs)
45 122
46 return await self._send_wrapper(proc) 123 return await self._send_wrapper(proc)
47 124
125 async def invoke(self, *args, skip_mode=False, **kwargs):
126 # It's OK if connection errors cause a failure here, because the mode
127 # is also reset to normal on a new connection
128 if not skip_mode:
129 await self._set_mode(self.MODE_NORMAL)
130 return await super().invoke(*args, **kwargs)
131
48 async def _set_mode(self, new_mode): 132 async def _set_mode(self, new_mode):
49 async def stream_to_normal(): 133 async def stream_to_normal():
134 # Check if already in normal mode (e.g. due to a connection reset)
135 if self.mode == self.MODE_NORMAL:
136 return "ok"
50 await self.socket.send("END") 137 await self.socket.send("END")
51 return await self.socket.recv() 138 return await self.socket.recv()
52 139
53 async def normal_to_stream(command): 140 async def normal_to_stream(command):
54 r = await self.invoke({command: None}) 141 r = await self.invoke({command: None}, skip_mode=True)
55 if r != "ok": 142 if r != "ok":
143 self.check_invoke_error(r)
56 raise ConnectionError( 144 raise ConnectionError(
57 f"Unable to transition to stream mode: Bad response from server {r!r}" 145 f"Unable to transition to stream mode: Bad response from server {r!r}"
58 ) 146 )
59
60 self.logger.debug("Mode is now %s", command) 147 self.logger.debug("Mode is now %s", command)
61 148
62 if new_mode == self.mode: 149 if new_mode == self.mode:
@@ -78,20 +165,25 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
78 await normal_to_stream("get-stream") 165 await normal_to_stream("get-stream")
79 elif new_mode == self.MODE_EXIST_STREAM: 166 elif new_mode == self.MODE_EXIST_STREAM:
80 await normal_to_stream("exists-stream") 167 await normal_to_stream("exists-stream")
168 elif new_mode == self.MODE_MARK_STREAM:
169 await normal_to_stream("gc-mark-stream")
81 elif new_mode != self.MODE_NORMAL: 170 elif new_mode != self.MODE_NORMAL:
82 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") 171 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
83 172
84 self.mode = new_mode 173 self.mode = new_mode
85 174
86 async def get_unihash(self, method, taskhash): 175 async def get_unihash(self, method, taskhash):
87 await self._set_mode(self.MODE_GET_STREAM) 176 r = await self.get_unihash_batch([(method, taskhash)])
88 r = await self.send_stream("%s %s" % (method, taskhash)) 177 return r[0]
89 if not r: 178
90 return None 179 async def get_unihash_batch(self, args):
91 return r 180 result = await self.send_stream_batch(
181 self.MODE_GET_STREAM,
182 (f"{method} {taskhash}" for method, taskhash in args),
183 )
184 return [r if r else None for r in result]
92 185
93 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 186 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
94 await self._set_mode(self.MODE_NORMAL)
95 m = extra.copy() 187 m = extra.copy()
96 m["taskhash"] = taskhash 188 m["taskhash"] = taskhash
97 m["method"] = method 189 m["method"] = method
@@ -100,7 +192,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
100 return await self.invoke({"report": m}) 192 return await self.invoke({"report": m})
101 193
102 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 194 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
103 await self._set_mode(self.MODE_NORMAL)
104 m = extra.copy() 195 m = extra.copy()
105 m["taskhash"] = taskhash 196 m["taskhash"] = taskhash
106 m["method"] = method 197 m["method"] = method
@@ -108,18 +199,19 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
108 return await self.invoke({"report-equiv": m}) 199 return await self.invoke({"report-equiv": m})
109 200
110 async def get_taskhash(self, method, taskhash, all_properties=False): 201 async def get_taskhash(self, method, taskhash, all_properties=False):
111 await self._set_mode(self.MODE_NORMAL)
112 return await self.invoke( 202 return await self.invoke(
113 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 203 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
114 ) 204 )
115 205
116 async def unihash_exists(self, unihash): 206 async def unihash_exists(self, unihash):
117 await self._set_mode(self.MODE_EXIST_STREAM) 207 r = await self.unihash_exists_batch([unihash])
118 r = await self.send_stream(unihash) 208 return r[0]
119 return r == "true" 209
210 async def unihash_exists_batch(self, unihashes):
211 result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes)
212 return [r == "true" for r in result]
120 213
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 214 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 await self._set_mode(self.MODE_NORMAL)
123 return await self.invoke( 215 return await self.invoke(
124 { 216 {
125 "get-outhash": { 217 "get-outhash": {
@@ -132,27 +224,21 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
132 ) 224 )
133 225
134 async def get_stats(self): 226 async def get_stats(self):
135 await self._set_mode(self.MODE_NORMAL)
136 return await self.invoke({"get-stats": None}) 227 return await self.invoke({"get-stats": None})
137 228
138 async def reset_stats(self): 229 async def reset_stats(self):
139 await self._set_mode(self.MODE_NORMAL)
140 return await self.invoke({"reset-stats": None}) 230 return await self.invoke({"reset-stats": None})
141 231
142 async def backfill_wait(self): 232 async def backfill_wait(self):
143 await self._set_mode(self.MODE_NORMAL)
144 return (await self.invoke({"backfill-wait": None}))["tasks"] 233 return (await self.invoke({"backfill-wait": None}))["tasks"]
145 234
146 async def remove(self, where): 235 async def remove(self, where):
147 await self._set_mode(self.MODE_NORMAL)
148 return await self.invoke({"remove": {"where": where}}) 236 return await self.invoke({"remove": {"where": where}})
149 237
150 async def clean_unused(self, max_age): 238 async def clean_unused(self, max_age):
151 await self._set_mode(self.MODE_NORMAL)
152 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) 239 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
153 240
154 async def auth(self, username, token): 241 async def auth(self, username, token):
155 await self._set_mode(self.MODE_NORMAL)
156 result = await self.invoke({"auth": {"username": username, "token": token}}) 242 result = await self.invoke({"auth": {"username": username, "token": token}})
157 self.username = username 243 self.username = username
158 self.password = token 244 self.password = token
@@ -160,7 +246,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
160 return result 246 return result
161 247
162 async def refresh_token(self, username=None): 248 async def refresh_token(self, username=None):
163 await self._set_mode(self.MODE_NORMAL)
164 m = {} 249 m = {}
165 if username: 250 if username:
166 m["username"] = username 251 m["username"] = username
@@ -174,34 +259,28 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
174 return result 259 return result
175 260
176 async def set_user_perms(self, username, permissions): 261 async def set_user_perms(self, username, permissions):
177 await self._set_mode(self.MODE_NORMAL)
178 return await self.invoke( 262 return await self.invoke(
179 {"set-user-perms": {"username": username, "permissions": permissions}} 263 {"set-user-perms": {"username": username, "permissions": permissions}}
180 ) 264 )
181 265
182 async def get_user(self, username=None): 266 async def get_user(self, username=None):
183 await self._set_mode(self.MODE_NORMAL)
184 m = {} 267 m = {}
185 if username: 268 if username:
186 m["username"] = username 269 m["username"] = username
187 return await self.invoke({"get-user": m}) 270 return await self.invoke({"get-user": m})
188 271
189 async def get_all_users(self): 272 async def get_all_users(self):
190 await self._set_mode(self.MODE_NORMAL)
191 return (await self.invoke({"get-all-users": {}}))["users"] 273 return (await self.invoke({"get-all-users": {}}))["users"]
192 274
193 async def new_user(self, username, permissions): 275 async def new_user(self, username, permissions):
194 await self._set_mode(self.MODE_NORMAL)
195 return await self.invoke( 276 return await self.invoke(
196 {"new-user": {"username": username, "permissions": permissions}} 277 {"new-user": {"username": username, "permissions": permissions}}
197 ) 278 )
198 279
199 async def delete_user(self, username): 280 async def delete_user(self, username):
200 await self._set_mode(self.MODE_NORMAL)
201 return await self.invoke({"delete-user": {"username": username}}) 281 return await self.invoke({"delete-user": {"username": username}})
202 282
203 async def become_user(self, username): 283 async def become_user(self, username):
204 await self._set_mode(self.MODE_NORMAL)
205 result = await self.invoke({"become-user": {"username": username}}) 284 result = await self.invoke({"become-user": {"username": username}})
206 if username == self.username: 285 if username == self.username:
207 self.saved_become_user = None 286 self.saved_become_user = None
@@ -210,15 +289,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
210 return result 289 return result
211 290
212 async def get_db_usage(self): 291 async def get_db_usage(self):
213 await self._set_mode(self.MODE_NORMAL)
214 return (await self.invoke({"get-db-usage": {}}))["usage"] 292 return (await self.invoke({"get-db-usage": {}}))["usage"]
215 293
216 async def get_db_query_columns(self): 294 async def get_db_query_columns(self):
217 await self._set_mode(self.MODE_NORMAL)
218 return (await self.invoke({"get-db-query-columns": {}}))["columns"] 295 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
219 296
220 async def gc_status(self): 297 async def gc_status(self):
221 await self._set_mode(self.MODE_NORMAL)
222 return await self.invoke({"gc-status": {}}) 298 return await self.invoke({"gc-status": {}})
223 299
224 async def gc_mark(self, mark, where): 300 async def gc_mark(self, mark, where):
@@ -231,9 +307,26 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
231 kept. In addition, any new entries added to the database after this 307 kept. In addition, any new entries added to the database after this
232 command will be automatically marked with "mark" 308 command will be automatically marked with "mark"
233 """ 309 """
234 await self._set_mode(self.MODE_NORMAL)
235 return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) 310 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
236 311
312 async def gc_mark_stream(self, mark, rows):
313 """
314 Similar to `gc-mark`, but accepts a list of "where" key-value pair
315 conditions. It utilizes stream mode to mark hashes, which helps reduce
316 the impact of latency when communicating with the hash equivalence
317 server.
318 """
319 def row_to_dict(row):
320 pairs = row.split()
321 return dict(zip(pairs[::2], pairs[1::2]))
322
323 responses = await self.send_stream_batch(
324 self.MODE_MARK_STREAM,
325 (json.dumps({"mark": mark, "where": row_to_dict(row)}) for row in rows),
326 )
327
328 return {"count": sum(int(json.loads(r)["count"]) for r in responses)}
329
237 async def gc_sweep(self, mark): 330 async def gc_sweep(self, mark):
238 """ 331 """
239 Finishes garbage collection for "mark". All unihash entries that have 332 Finishes garbage collection for "mark". All unihash entries that have
@@ -242,7 +335,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
242 It is recommended to clean unused outhash entries after running this to 335 It is recommended to clean unused outhash entries after running this to
243 cleanup any dangling outhashes 336 cleanup any dangling outhashes
244 """ 337 """
245 await self._set_mode(self.MODE_NORMAL)
246 return await self.invoke({"gc-sweep": {"mark": mark}}) 338 return await self.invoke({"gc-sweep": {"mark": mark}})
247 339
248 340
@@ -256,10 +348,12 @@ class Client(bb.asyncrpc.Client):
256 "connect_tcp", 348 "connect_tcp",
257 "connect_websocket", 349 "connect_websocket",
258 "get_unihash", 350 "get_unihash",
351 "get_unihash_batch",
259 "report_unihash", 352 "report_unihash",
260 "report_unihash_equiv", 353 "report_unihash_equiv",
261 "get_taskhash", 354 "get_taskhash",
262 "unihash_exists", 355 "unihash_exists",
356 "unihash_exists_batch",
263 "get_outhash", 357 "get_outhash",
264 "get_stats", 358 "get_stats",
265 "reset_stats", 359 "reset_stats",
@@ -278,88 +372,9 @@ class Client(bb.asyncrpc.Client):
278 "get_db_query_columns", 372 "get_db_query_columns",
279 "gc_status", 373 "gc_status",
280 "gc_mark", 374 "gc_mark",
375 "gc_mark_stream",
281 "gc_sweep", 376 "gc_sweep",
282 ) 377 )
283 378
284 def _get_async_client(self): 379 def _get_async_client(self):
285 return AsyncClient(self.username, self.password) 380 return AsyncClient(self.username, self.password)
286
287
288class ClientPool(bb.asyncrpc.ClientPool):
289 def __init__(
290 self,
291 address,
292 max_clients,
293 *,
294 username=None,
295 password=None,
296 become=None,
297 ):
298 super().__init__(max_clients)
299 self.address = address
300 self.username = username
301 self.password = password
302 self.become = become
303
304 async def _new_client(self):
305 client = await create_async_client(
306 self.address,
307 username=self.username,
308 password=self.password,
309 )
310 if self.become:
311 await client.become_user(self.become)
312 return client
313
314 def _run_key_tasks(self, queries, call):
315 results = {key: None for key in queries.keys()}
316
317 def make_task(key, args):
318 async def task(client):
319 nonlocal results
320 unihash = await call(client, args)
321 results[key] = unihash
322
323 return task
324
325 def gen_tasks():
326 for key, args in queries.items():
327 yield make_task(key, args)
328
329 self.run_tasks(gen_tasks())
330 return results
331
332 def get_unihashes(self, queries):
333 """
334 Query multiple unihashes in parallel.
335
336 The queries argument is a dictionary with arbitrary key. The values
337 must be a tuple of (method, taskhash).
338
339 Returns a dictionary with a corresponding key for each input key, and
340 the value is the queried unihash (which might be none if the query
341 failed)
342 """
343
344 async def call(client, args):
345 method, taskhash = args
346 return await client.get_unihash(method, taskhash)
347
348 return self._run_key_tasks(queries, call)
349
350 def unihashes_exist(self, queries):
351 """
352 Query multiple unihash existence checks in parallel.
353
354 The queries argument is a dictionary with arbitrary key. The values
355 must be a unihash.
356
357 Returns a dictionary with a corresponding key for each input key, and
358 the value is True or False if the unihash is known by the server (or
359 None if there was a failure)
360 """
361
362 async def call(client, unihash):
363 return await client.unihash_exists(unihash)
364
365 return self._run_key_tasks(queries, call)
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 68f64f983b..58f95c7bcd 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -10,6 +10,7 @@ import math
10import time 10import time
11import os 11import os
12import base64 12import base64
13import json
13import hashlib 14import hashlib
14from . import create_async_client 15from . import create_async_client
15import bb.asyncrpc 16import bb.asyncrpc
@@ -256,6 +257,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
256 "backfill-wait": self.handle_backfill_wait, 257 "backfill-wait": self.handle_backfill_wait,
257 "remove": self.handle_remove, 258 "remove": self.handle_remove,
258 "gc-mark": self.handle_gc_mark, 259 "gc-mark": self.handle_gc_mark,
260 "gc-mark-stream": self.handle_gc_mark_stream,
259 "gc-sweep": self.handle_gc_sweep, 261 "gc-sweep": self.handle_gc_sweep,
260 "gc-status": self.handle_gc_status, 262 "gc-status": self.handle_gc_status,
261 "clean-unused": self.handle_clean_unused, 263 "clean-unused": self.handle_clean_unused,
@@ -584,6 +586,33 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
584 return {"count": await self.db.gc_mark(mark, condition)} 586 return {"count": await self.db.gc_mark(mark, condition)}
585 587
586 @permissions(DB_ADMIN_PERM) 588 @permissions(DB_ADMIN_PERM)
589 async def handle_gc_mark_stream(self, request):
590 async def handler(line):
591 try:
592 decoded_line = json.loads(line)
593 except json.JSONDecodeError as exc:
594 raise bb.asyncrpc.InvokeError(
595 "Could not decode JSONL input '%s'" % line
596 ) from exc
597
598 try:
599 mark = decoded_line["mark"]
600 condition = decoded_line["where"]
601 if not isinstance(mark, str):
602 raise TypeError("Bad mark type %s" % type(mark))
603
604 if not isinstance(condition, dict):
605 raise TypeError("Bad condition type %s" % type(condition))
606 except KeyError as exc:
607 raise bb.asyncrpc.InvokeError(
608 "Input line is missing key '%s' " % exc
609 ) from exc
610
611 return json.dumps({"count": await self.db.gc_mark(mark, condition)})
612
613 return await self._stream_handler(handler)
614
615 @permissions(DB_ADMIN_PERM)
587 async def handle_gc_sweep(self, request): 616 async def handle_gc_sweep(self, request):
588 mark = request["mark"] 617 mark = request["mark"]
589 618
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py
index da2e844a03..976504d7f4 100644
--- a/bitbake/lib/hashserv/sqlite.py
+++ b/bitbake/lib/hashserv/sqlite.py
@@ -4,6 +4,7 @@
4# 4#
5# SPDX-License-Identifier: GPL-2.0-only 5# SPDX-License-Identifier: GPL-2.0-only
6# 6#
7from datetime import datetime, timezone
7import sqlite3 8import sqlite3
8import logging 9import logging
9from contextlib import closing 10from contextlib import closing
@@ -53,6 +54,22 @@ CONFIG_TABLE_DEFINITION = (
53CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) 54CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION)
54 55
55 56
57def adapt_datetime_iso(val):
58 """Adapt datetime.datetime to UTC ISO 8601 date."""
59 return val.astimezone(timezone.utc).isoformat()
60
61
62sqlite3.register_adapter(datetime, adapt_datetime_iso)
63
64
65def convert_datetime(val):
66 """Convert ISO 8601 datetime to datetime.datetime object."""
67 return datetime.fromisoformat(val.decode())
68
69
70sqlite3.register_converter("DATETIME", convert_datetime)
71
72
56def _make_table(cursor, name, definition): 73def _make_table(cursor, name, definition):
57 cursor.execute( 74 cursor.execute(
58 """ 75 """
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 0809453cf8..da3f8e0884 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -8,7 +8,6 @@
8from . import create_server, create_client 8from . import create_server, create_client
9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS 9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS
10from bb.asyncrpc import InvokeError 10from bb.asyncrpc import InvokeError
11from .client import ClientPool
12import hashlib 11import hashlib
13import logging 12import logging
14import multiprocessing 13import multiprocessing
@@ -94,9 +93,6 @@ class HashEquivalenceTestSetup(object):
94 return self.start_client(self.auth_server_address, user["username"], user["token"]) 93 return self.start_client(self.auth_server_address, user["username"], user["token"])
95 94
96 def setUp(self): 95 def setUp(self):
97 if sys.version_info < (3, 5, 0):
98 self.skipTest('Python 3.5 or later required')
99
100 self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') 96 self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv')
101 self.addCleanup(self.temp_dir.cleanup) 97 self.addCleanup(self.temp_dir.cleanup)
102 98
@@ -555,8 +551,7 @@ class HashEquivalenceCommonTests(object):
555 # shares a taskhash with Task 2 551 # shares a taskhash with Task 2
556 self.assertClientGetHash(self.client, taskhash2, unihash2) 552 self.assertClientGetHash(self.client, taskhash2, unihash2)
557 553
558 554 def test_get_unihash_batch(self):
559 def test_client_pool_get_unihashes(self):
560 TEST_INPUT = ( 555 TEST_INPUT = (
561 # taskhash outhash unihash 556 # taskhash outhash unihash
562 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), 557 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
@@ -573,28 +568,27 @@ class HashEquivalenceCommonTests(object):
573 "6b6be7a84ab179b4240c4302518dc3f6", 568 "6b6be7a84ab179b4240c4302518dc3f6",
574 ) 569 )
575 570
576 with ClientPool(self.server_address, 10) as client_pool: 571 for taskhash, outhash, unihash in TEST_INPUT:
577 for taskhash, outhash, unihash in TEST_INPUT: 572 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
578 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) 573
579 574
580 query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} 575 result = self.client.get_unihash_batch(
581 for idx, taskhash in enumerate(EXTRA_QUERIES): 576 [(self.METHOD, data[0]) for data in TEST_INPUT] +
582 query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) 577 [(self.METHOD, e) for e in EXTRA_QUERIES]
583 578 )
584 result = client_pool.get_unihashes(query) 579
585 580 self.assertListEqual(result, [
586 self.assertDictEqual(result, { 581 "218e57509998197d570e2c98512d0105985dffc9",
587 0: "218e57509998197d570e2c98512d0105985dffc9", 582 "218e57509998197d570e2c98512d0105985dffc9",
588 1: "218e57509998197d570e2c98512d0105985dffc9", 583 "218e57509998197d570e2c98512d0105985dffc9",
589 2: "218e57509998197d570e2c98512d0105985dffc9", 584 "3b5d3d83f07f259e9086fcb422c855286e18a57d",
590 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", 585 "f46d3fbb439bd9b921095da657a4de906510d2cd",
591 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", 586 "f46d3fbb439bd9b921095da657a4de906510d2cd",
592 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", 587 "05d2a63c81e32f0a36542ca677e8ad852365c538",
593 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", 588 None,
594 7: None, 589 ])
595 })
596 590
597 def test_client_pool_unihash_exists(self): 591 def test_unihash_exists_batch(self):
598 TEST_INPUT = ( 592 TEST_INPUT = (
599 # taskhash outhash unihash 593 # taskhash outhash unihash
600 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), 594 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
@@ -614,28 +608,24 @@ class HashEquivalenceCommonTests(object):
614 result_unihashes = set() 608 result_unihashes = set()
615 609
616 610
617 with ClientPool(self.server_address, 10) as client_pool: 611 for taskhash, outhash, unihash in TEST_INPUT:
618 for taskhash, outhash, unihash in TEST_INPUT: 612 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
619 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) 613 result_unihashes.add(result["unihash"])
620 result_unihashes.add(result["unihash"])
621 614
622 query = {} 615 query = []
623 expected = {} 616 expected = []
624 617
625 for _, _, unihash in TEST_INPUT: 618 for _, _, unihash in TEST_INPUT:
626 idx = len(query) 619 query.append(unihash)
627 query[idx] = unihash 620 expected.append(unihash in result_unihashes)
628 expected[idx] = unihash in result_unihashes
629 621
630 622
631 for unihash in EXTRA_QUERIES: 623 for unihash in EXTRA_QUERIES:
632 idx = len(query) 624 query.append(unihash)
633 query[idx] = unihash 625 expected.append(False)
634 expected[idx] = False
635
636 result = client_pool.unihashes_exist(query)
637 self.assertDictEqual(result, expected)
638 626
627 result = self.client.unihash_exists_batch(query)
628 self.assertListEqual(result, expected)
639 629
640 def test_auth_read_perms(self): 630 def test_auth_read_perms(self):
641 admin_client = self.start_auth_server() 631 admin_client = self.start_auth_server()
@@ -979,6 +969,48 @@ class HashEquivalenceCommonTests(object):
979 # First hash is still present 969 # First hash is still present
980 self.assertClientGetHash(self.client, taskhash, unihash) 970 self.assertClientGetHash(self.client, taskhash, unihash)
981 971
972 def test_gc_stream(self):
973 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
974 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
975 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
976
977 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
978 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
979
980 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
981 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
982 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
983
984 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
985 self.assertClientGetHash(self.client, taskhash2, unihash2)
986
987 taskhash3 = 'a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c0'
988 outhash3 = '7289c414905303700a1117c1f5a7c9ab2f5a39cc6fe5e6152169d09c04f9a53c'
989 unihash3 = '905303700a1117c1f5a7c9ab2f5a39cc6fe5e615'
990
991 result = self.client.report_unihash(taskhash3, self.METHOD, outhash3, unihash3)
992 self.assertClientGetHash(self.client, taskhash3, unihash3)
993
994 # Mark the first unihash to be kept
995 ret = self.client.gc_mark_stream("ABC", (f"unihash {h}" for h in [unihash, unihash2]))
996 self.assertEqual(ret, {"count": 2})
997
998 ret = self.client.gc_status()
999 self.assertEqual(ret, {"mark": "ABC", "keep": 2, "remove": 1})
1000
1001 # Third hash is still there; mark doesn't delete hashes
1002 self.assertClientGetHash(self.client, taskhash3, unihash3)
1003
1004 ret = self.client.gc_sweep("ABC")
1005 self.assertEqual(ret, {"count": 1})
1006
1007 # Hash is gone. Taskhash is returned for second hash
1008 self.assertClientGetHash(self.client, taskhash3, None)
1009 # First hash is still present
1010 self.assertClientGetHash(self.client, taskhash, unihash)
1011 # Second hash is still present
1012 self.assertClientGetHash(self.client, taskhash2, unihash2)
1013
982 def test_gc_switch_mark(self): 1014 def test_gc_switch_mark(self):
983 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' 1015 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
984 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' 1016 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'