summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bitbake/lib/hashserv/__init__.py90
-rw-r--r--bitbake/lib/hashserv/server.py491
-rw-r--r--bitbake/lib/hashserv/sqlite.py259
3 files changed, 439 insertions, 401 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 56b9c6bc82..90d8cff15f 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -6,7 +6,6 @@
6import asyncio 6import asyncio
7from contextlib import closing 7from contextlib import closing
8import re 8import re
9import sqlite3
10import itertools 9import itertools
11import json 10import json
12from urllib.parse import urlparse 11from urllib.parse import urlparse
@@ -19,92 +18,34 @@ ADDR_TYPE_UNIX = 0
19ADDR_TYPE_TCP = 1 18ADDR_TYPE_TCP = 1
20ADDR_TYPE_WS = 2 19ADDR_TYPE_WS = 2
21 20
22UNIHASH_TABLE_DEFINITION = (
23 ("method", "TEXT NOT NULL", "UNIQUE"),
24 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
25 ("unihash", "TEXT NOT NULL", ""),
26)
27
28UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
29
30OUTHASH_TABLE_DEFINITION = (
31 ("method", "TEXT NOT NULL", "UNIQUE"),
32 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
33 ("outhash", "TEXT NOT NULL", "UNIQUE"),
34 ("created", "DATETIME", ""),
35
36 # Optional fields
37 ("owner", "TEXT", ""),
38 ("PN", "TEXT", ""),
39 ("PV", "TEXT", ""),
40 ("PR", "TEXT", ""),
41 ("task", "TEXT", ""),
42 ("outhash_siginfo", "TEXT", ""),
43)
44
45OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
46
47def _make_table(cursor, name, definition):
48 cursor.execute('''
49 CREATE TABLE IF NOT EXISTS {name} (
50 id INTEGER PRIMARY KEY AUTOINCREMENT,
51 {fields}
52 UNIQUE({unique})
53 )
54 '''.format(
55 name=name,
56 fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
57 unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags)
58 ))
59
60
61def setup_database(database, sync=True):
62 db = sqlite3.connect(database)
63 db.row_factory = sqlite3.Row
64
65 with closing(db.cursor()) as cursor:
66 _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION)
67 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
68
69 cursor.execute('PRAGMA journal_mode = WAL')
70 cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
71
72 # Drop old indexes
73 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
74 cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
75 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2')
76 cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2')
77
78 # TODO: Upgrade from tasks_v2?
79 cursor.execute('DROP TABLE IF EXISTS tasks_v2')
80
81 # Create new indexes
82 cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)')
83 cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)')
84
85 return db
86
87 21
88def parse_address(addr): 22def parse_address(addr):
89 if addr.startswith(UNIX_PREFIX): 23 if addr.startswith(UNIX_PREFIX):
90 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) 24 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],))
91 elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): 25 elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX):
92 return (ADDR_TYPE_WS, (addr,)) 26 return (ADDR_TYPE_WS, (addr,))
93 else: 27 else:
94 m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) 28 m = re.match(r"\[(?P<host>[^\]]*)\]:(?P<port>\d+)$", addr)
95 if m is not None: 29 if m is not None:
96 host = m.group('host') 30 host = m.group("host")
97 port = m.group('port') 31 port = m.group("port")
98 else: 32 else:
99 host, port = addr.split(':') 33 host, port = addr.split(":")
100 34
101 return (ADDR_TYPE_TCP, (host, int(port))) 35 return (ADDR_TYPE_TCP, (host, int(port)))
102 36
103 37
104def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): 38def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
39 def sqlite_engine():
40 from .sqlite import DatabaseEngine
41
42 return DatabaseEngine(dbname, sync)
43
105 from . import server 44 from . import server
106 db = setup_database(dbname, sync=sync) 45
107 s = server.Server(db, upstream=upstream, read_only=read_only) 46 db_engine = sqlite_engine()
47
48 s = server.Server(db_engine, upstream=upstream, read_only=read_only)
108 49
109 (typ, a) = parse_address(addr) 50 (typ, a) = parse_address(addr)
110 if typ == ADDR_TYPE_UNIX: 51 if typ == ADDR_TYPE_UNIX:
@@ -120,6 +61,7 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
120 61
121def create_client(addr): 62def create_client(addr):
122 from . import client 63 from . import client
64
123 c = client.Client() 65 c = client.Client()
124 66
125 (typ, a) = parse_address(addr) 67 (typ, a) = parse_address(addr)
@@ -132,8 +74,10 @@ def create_client(addr):
132 74
133 return c 75 return c
134 76
77
135async def create_async_client(addr): 78async def create_async_client(addr):
136 from . import client 79 from . import client
80
137 c = client.AsyncClient() 81 c = client.AsyncClient()
138 82
139 (typ, a) = parse_address(addr) 83 (typ, a) = parse_address(addr)
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index e6a3f40577..84cf4f2283 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -3,18 +3,16 @@
3# SPDX-License-Identifier: GPL-2.0-only 3# SPDX-License-Identifier: GPL-2.0-only
4# 4#
5 5
6from contextlib import closing, contextmanager
7from datetime import datetime, timedelta 6from datetime import datetime, timedelta
8import enum
9import asyncio 7import asyncio
10import logging 8import logging
11import math 9import math
12import time 10import time
13from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS 11from . import create_async_client
14import bb.asyncrpc 12import bb.asyncrpc
15 13
16 14
17logger = logging.getLogger('hashserv.server') 15logger = logging.getLogger("hashserv.server")
18 16
19 17
20class Measurement(object): 18class Measurement(object):
@@ -104,229 +102,136 @@ class Stats(object):
104 return math.sqrt(self.s / (self.num - 1)) 102 return math.sqrt(self.s / (self.num - 1))
105 103
106 def todict(self): 104 def todict(self):
107 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} 105 return {
108 106 k: getattr(self, k)
109 107 for k in ("num", "total_time", "max_time", "average", "stdev")
110@enum.unique 108 }
111class Resolve(enum.Enum):
112 FAIL = enum.auto()
113 IGNORE = enum.auto()
114 REPLACE = enum.auto()
115
116
117def insert_table(cursor, table, data, on_conflict):
118 resolve = {
119 Resolve.FAIL: "",
120 Resolve.IGNORE: " OR IGNORE",
121 Resolve.REPLACE: " OR REPLACE",
122 }[on_conflict]
123
124 keys = sorted(data.keys())
125 query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format(
126 resolve=resolve,
127 table=table,
128 fields=", ".join(keys),
129 values=", ".join(":" + k for k in keys),
130 )
131 prevrowid = cursor.lastrowid
132 cursor.execute(query, data)
133 logging.debug(
134 "Inserting %r into %s, %s",
135 data,
136 table,
137 on_conflict
138 )
139 return (cursor.lastrowid, cursor.lastrowid != prevrowid)
140
141def insert_unihash(cursor, data, on_conflict):
142 return insert_table(cursor, "unihashes_v2", data, on_conflict)
143
144def insert_outhash(cursor, data, on_conflict):
145 return insert_table(cursor, "outhashes_v2", data, on_conflict)
146
147async def copy_unihash_from_upstream(client, db, method, taskhash):
148 d = await client.get_taskhash(method, taskhash)
149 if d is not None:
150 with closing(db.cursor()) as cursor:
151 insert_unihash(
152 cursor,
153 {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
154 Resolve.IGNORE,
155 )
156 db.commit()
157 return d
158
159
160class ServerCursor(object):
161 def __init__(self, db, cursor, upstream):
162 self.db = db
163 self.cursor = cursor
164 self.upstream = upstream
165 109
166 110
167class ServerClient(bb.asyncrpc.AsyncServerConnection): 111class ServerClient(bb.asyncrpc.AsyncServerConnection):
168 def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): 112 def __init__(
169 super().__init__(socket, 'OEHASHEQUIV', logger) 113 self,
170 self.db = db 114 socket,
115 db_engine,
116 request_stats,
117 backfill_queue,
118 upstream,
119 read_only,
120 ):
121 super().__init__(socket, "OEHASHEQUIV", logger)
122 self.db_engine = db_engine
171 self.request_stats = request_stats 123 self.request_stats = request_stats
172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 124 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
173 self.backfill_queue = backfill_queue 125 self.backfill_queue = backfill_queue
174 self.upstream = upstream 126 self.upstream = upstream
175 127
176 self.handlers.update({ 128 self.handlers.update(
177 'get': self.handle_get, 129 {
178 'get-outhash': self.handle_get_outhash, 130 "get": self.handle_get,
179 'get-stream': self.handle_get_stream, 131 "get-outhash": self.handle_get_outhash,
180 'get-stats': self.handle_get_stats, 132 "get-stream": self.handle_get_stream,
181 }) 133 "get-stats": self.handle_get_stats,
134 }
135 )
182 136
183 if not read_only: 137 if not read_only:
184 self.handlers.update({ 138 self.handlers.update(
185 'report': self.handle_report, 139 {
186 'report-equiv': self.handle_equivreport, 140 "report": self.handle_report,
187 'reset-stats': self.handle_reset_stats, 141 "report-equiv": self.handle_equivreport,
188 'backfill-wait': self.handle_backfill_wait, 142 "reset-stats": self.handle_reset_stats,
189 'remove': self.handle_remove, 143 "backfill-wait": self.handle_backfill_wait,
190 'clean-unused': self.handle_clean_unused, 144 "remove": self.handle_remove,
191 }) 145 "clean-unused": self.handle_clean_unused,
146 }
147 )
192 148
193 def validate_proto_version(self): 149 def validate_proto_version(self):
194 return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) 150 return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
195 151
196 async def process_requests(self): 152 async def process_requests(self):
197 if self.upstream is not None: 153 async with self.db_engine.connect(self.logger) as db:
198 self.upstream_client = await create_async_client(self.upstream) 154 self.db = db
199 else: 155 if self.upstream is not None:
200 self.upstream_client = None 156 self.upstream_client = await create_async_client(self.upstream)
201 157 else:
202 await super().process_requests() 158 self.upstream_client = None
203 159
204 if self.upstream_client is not None: 160 try:
205 await self.upstream_client.close() 161 await super().process_requests()
162 finally:
163 if self.upstream_client is not None:
164 await self.upstream_client.close()
206 165
207 async def dispatch_message(self, msg): 166 async def dispatch_message(self, msg):
208 for k in self.handlers.keys(): 167 for k in self.handlers.keys():
209 if k in msg: 168 if k in msg:
210 self.logger.debug('Handling %s' % k) 169 self.logger.debug("Handling %s" % k)
211 if 'stream' in k: 170 if "stream" in k:
212 return await self.handlers[k](msg[k]) 171 return await self.handlers[k](msg[k])
213 else: 172 else:
214 with self.request_stats.start_sample() as self.request_sample, \ 173 with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
215 self.request_sample.measure():
216 return await self.handlers[k](msg[k]) 174 return await self.handlers[k](msg[k])
217 175
218 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 176 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
219 177
220 async def handle_get(self, request): 178 async def handle_get(self, request):
221 method = request['method'] 179 method = request["method"]
222 taskhash = request['taskhash'] 180 taskhash = request["taskhash"]
223 fetch_all = request.get('all', False) 181 fetch_all = request.get("all", False)
224 182
225 with closing(self.db.cursor()) as cursor: 183 return await self.get_unihash(method, taskhash, fetch_all)
226 return await self.get_unihash(cursor, method, taskhash, fetch_all)
227 184
228 async def get_unihash(self, cursor, method, taskhash, fetch_all=False): 185 async def get_unihash(self, method, taskhash, fetch_all=False):
229 d = None 186 d = None
230 187
231 if fetch_all: 188 if fetch_all:
232 cursor.execute( 189 row = await self.db.get_unihash_by_taskhash_full(method, taskhash)
233 '''
234 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
235 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
236 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
237 ORDER BY outhashes_v2.created ASC
238 LIMIT 1
239 ''',
240 {
241 'method': method,
242 'taskhash': taskhash,
243 }
244
245 )
246 row = cursor.fetchone()
247
248 if row is not None: 190 if row is not None:
249 d = {k: row[k] for k in row.keys()} 191 d = {k: row[k] for k in row.keys()}
250 elif self.upstream_client is not None: 192 elif self.upstream_client is not None:
251 d = await self.upstream_client.get_taskhash(method, taskhash, True) 193 d = await self.upstream_client.get_taskhash(method, taskhash, True)
252 self.update_unified(cursor, d) 194 await self.update_unified(d)
253 self.db.commit()
254 else: 195 else:
255 row = self.query_equivalent(cursor, method, taskhash) 196 row = await self.db.get_equivalent(method, taskhash)
256 197
257 if row is not None: 198 if row is not None:
258 d = {k: row[k] for k in row.keys()} 199 d = {k: row[k] for k in row.keys()}
259 elif self.upstream_client is not None: 200 elif self.upstream_client is not None:
260 d = await self.upstream_client.get_taskhash(method, taskhash) 201 d = await self.upstream_client.get_taskhash(method, taskhash)
261 d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} 202 await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
262 insert_unihash(cursor, d, Resolve.IGNORE)
263 self.db.commit()
264 203
265 return d 204 return d
266 205
267 async def handle_get_outhash(self, request): 206 async def handle_get_outhash(self, request):
268 method = request['method'] 207 method = request["method"]
269 outhash = request['outhash'] 208 outhash = request["outhash"]
270 taskhash = request['taskhash'] 209 taskhash = request["taskhash"]
271 with_unihash = request.get("with_unihash", True) 210 with_unihash = request.get("with_unihash", True)
272 211
273 with closing(self.db.cursor()) as cursor: 212 return await self.get_outhash(method, outhash, taskhash, with_unihash)
274 return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash)
275 213
276 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): 214 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
277 d = None 215 d = None
278 if with_unihash: 216 if with_unihash:
279 cursor.execute( 217 row = await self.db.get_unihash_by_outhash(method, outhash)
280 '''
281 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
282 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
283 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
284 ORDER BY outhashes_v2.created ASC
285 LIMIT 1
286 ''',
287 {
288 'method': method,
289 'outhash': outhash,
290 }
291 )
292 else: 218 else:
293 cursor.execute( 219 row = await self.db.get_outhash(method, outhash)
294 """
295 SELECT * FROM outhashes_v2
296 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
297 ORDER BY outhashes_v2.created ASC
298 LIMIT 1
299 """,
300 {
301 'method': method,
302 'outhash': outhash,
303 }
304 )
305 row = cursor.fetchone()
306 220
307 if row is not None: 221 if row is not None:
308 d = {k: row[k] for k in row.keys()} 222 d = {k: row[k] for k in row.keys()}
309 elif self.upstream_client is not None: 223 elif self.upstream_client is not None:
310 d = await self.upstream_client.get_outhash(method, outhash, taskhash) 224 d = await self.upstream_client.get_outhash(method, outhash, taskhash)
311 self.update_unified(cursor, d) 225 await self.update_unified(d)
312 self.db.commit()
313 226
314 return d 227 return d
315 228
316 def update_unified(self, cursor, data): 229 async def update_unified(self, data):
317 if data is None: 230 if data is None:
318 return 231 return
319 232
320 insert_unihash( 233 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
321 cursor, 234 await self.db.insert_outhash(data)
322 {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
323 Resolve.IGNORE
324 )
325 insert_outhash(
326 cursor,
327 {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
328 Resolve.IGNORE
329 )
330 235
331 async def handle_get_stream(self, request): 236 async def handle_get_stream(self, request):
332 await self.socket.send_message("ok") 237 await self.socket.send_message("ok")
@@ -347,20 +252,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
347 request_measure = self.request_sample.measure() 252 request_measure = self.request_sample.measure()
348 request_measure.start() 253 request_measure.start()
349 254
350 if l == 'END': 255 if l == "END":
351 break 256 break
352 257
353 (method, taskhash) = l.split() 258 (method, taskhash) = l.split()
354 #self.logger.debug('Looking up %s %s' % (method, taskhash)) 259 # self.logger.debug('Looking up %s %s' % (method, taskhash))
355 cursor = self.db.cursor() 260 row = await self.db.get_equivalent(method, taskhash)
356 try:
357 row = self.query_equivalent(cursor, method, taskhash)
358 finally:
359 cursor.close()
360 261
361 if row is not None: 262 if row is not None:
362 msg = row['unihash'] 263 msg = row["unihash"]
363 #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 264 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
364 elif self.upstream_client is not None: 265 elif self.upstream_client is not None:
365 upstream = await self.upstream_client.get_unihash(method, taskhash) 266 upstream = await self.upstream_client.get_unihash(method, taskhash)
366 if upstream: 267 if upstream:
@@ -384,118 +285,81 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
384 return self.NO_RESPONSE 285 return self.NO_RESPONSE
385 286
386 async def handle_report(self, data): 287 async def handle_report(self, data):
387 with closing(self.db.cursor()) as cursor: 288 outhash_data = {
388 outhash_data = { 289 "method": data["method"],
389 'method': data['method'], 290 "outhash": data["outhash"],
390 'outhash': data['outhash'], 291 "taskhash": data["taskhash"],
391 'taskhash': data['taskhash'], 292 "created": datetime.now(),
392 'created': datetime.now() 293 }
393 }
394 294
395 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): 295 for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"):
396 if k in data: 296 if k in data:
397 outhash_data[k] = data[k] 297 outhash_data[k] = data[k]
398
399 # Insert the new entry, unless it already exists
400 (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
401
402 if inserted:
403 # If this row is new, check if it is equivalent to another
404 # output hash
405 cursor.execute(
406 '''
407 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
408 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
409 -- Select any matching output hash except the one we just inserted
410 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
411 -- Pick the oldest hash
412 ORDER BY outhashes_v2.created ASC
413 LIMIT 1
414 ''',
415 {
416 'method': data['method'],
417 'outhash': data['outhash'],
418 'taskhash': data['taskhash'],
419 }
420 )
421 row = cursor.fetchone()
422 298
423 if row is not None: 299 # Insert the new entry, unless it already exists
424 # A matching output hash was found. Set our taskhash to the 300 if await self.db.insert_outhash(outhash_data):
425 # same unihash since they are equivalent 301 # If this row is new, check if it is equivalent to another
426 unihash = row['unihash'] 302 # output hash
427 resolve = Resolve.IGNORE 303 row = await self.db.get_equivalent_for_outhash(
428 else: 304 data["method"], data["outhash"], data["taskhash"]
429 # No matching output hash was found. This is probably the 305 )
430 # first outhash to be added.
431 unihash = data['unihash']
432 resolve = Resolve.IGNORE
433
434 # Query upstream to see if it has a unihash we can use
435 if self.upstream_client is not None:
436 upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash'])
437 if upstream_data is not None:
438 unihash = upstream_data['unihash']
439
440
441 insert_unihash(
442 cursor,
443 {
444 'method': data['method'],
445 'taskhash': data['taskhash'],
446 'unihash': unihash,
447 },
448 resolve
449 )
450
451 unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash'])
452 if unihash_data is not None:
453 unihash = unihash_data['unihash']
454 else:
455 unihash = data['unihash']
456
457 self.db.commit()
458 306
459 d = { 307 if row is not None:
460 'taskhash': data['taskhash'], 308 # A matching output hash was found. Set our taskhash to the
461 'method': data['method'], 309 # same unihash since they are equivalent
462 'unihash': unihash, 310 unihash = row["unihash"]
463 } 311 else:
312 # No matching output hash was found. This is probably the
313 # first outhash to be added.
314 unihash = data["unihash"]
315
316 # Query upstream to see if it has a unihash we can use
317 if self.upstream_client is not None:
318 upstream_data = await self.upstream_client.get_outhash(
319 data["method"], data["outhash"], data["taskhash"]
320 )
321 if upstream_data is not None:
322 unihash = upstream_data["unihash"]
323
324 await self.db.insert_unihash(data["method"], data["taskhash"], unihash)
325
326 unihash_data = await self.get_unihash(data["method"], data["taskhash"])
327 if unihash_data is not None:
328 unihash = unihash_data["unihash"]
329 else:
330 unihash = data["unihash"]
464 331
465 return d 332 return {
333 "taskhash": data["taskhash"],
334 "method": data["method"],
335 "unihash": unihash,
336 }
466 337
467 async def handle_equivreport(self, data): 338 async def handle_equivreport(self, data):
468 with closing(self.db.cursor()) as cursor: 339 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
469 insert_data = { 340
470 'method': data['method'], 341 # Fetch the unihash that will be reported for the taskhash. If the
471 'taskhash': data['taskhash'], 342 # unihash matches, it means this row was inserted (or the mapping
472 'unihash': data['unihash'], 343 # was already valid)
473 } 344 row = await self.db.get_equivalent(data["method"], data["taskhash"])
474 insert_unihash(cursor, insert_data, Resolve.IGNORE) 345
475 self.db.commit() 346 if row["unihash"] == data["unihash"]:
476 347 self.logger.info(
477 # Fetch the unihash that will be reported for the taskhash. If the 348 "Adding taskhash equivalence for %s with unihash %s",
478 # unihash matches, it means this row was inserted (or the mapping 349 data["taskhash"],
479 # was already valid) 350 row["unihash"],
480 row = self.query_equivalent(cursor, data['method'], data['taskhash']) 351 )
481
482 if row['unihash'] == data['unihash']:
483 self.logger.info('Adding taskhash equivalence for %s with unihash %s',
484 data['taskhash'], row['unihash'])
485
486 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
487
488 return d
489 352
353 return {k: row[k] for k in ("taskhash", "method", "unihash")}
490 354
491 async def handle_get_stats(self, request): 355 async def handle_get_stats(self, request):
492 return { 356 return {
493 'requests': self.request_stats.todict(), 357 "requests": self.request_stats.todict(),
494 } 358 }
495 359
496 async def handle_reset_stats(self, request): 360 async def handle_reset_stats(self, request):
497 d = { 361 d = {
498 'requests': self.request_stats.todict(), 362 "requests": self.request_stats.todict(),
499 } 363 }
500 364
501 self.request_stats.reset() 365 self.request_stats.reset()
@@ -503,7 +367,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
503 367
504 async def handle_backfill_wait(self, request): 368 async def handle_backfill_wait(self, request):
505 d = { 369 d = {
506 'tasks': self.backfill_queue.qsize(), 370 "tasks": self.backfill_queue.qsize(),
507 } 371 }
508 await self.backfill_queue.join() 372 await self.backfill_queue.join()
509 return d 373 return d
@@ -513,92 +377,63 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
513 if not isinstance(condition, dict): 377 if not isinstance(condition, dict):
514 raise TypeError("Bad condition type %s" % type(condition)) 378 raise TypeError("Bad condition type %s" % type(condition))
515 379
516 def do_remove(columns, table_name, cursor): 380 return {"count": await self.db.remove(condition)}
517 nonlocal condition
518 where = {}
519 for c in columns:
520 if c in condition and condition[c] is not None:
521 where[c] = condition[c]
522
523 if where:
524 query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys())
525 cursor.execute(query, where)
526 return cursor.rowcount
527
528 return 0
529
530 count = 0
531 with closing(self.db.cursor()) as cursor:
532 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
533 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
534 self.db.commit()
535
536 return {"count": count}
537 381
538 async def handle_clean_unused(self, request): 382 async def handle_clean_unused(self, request):
539 max_age = request["max_age_seconds"] 383 max_age = request["max_age_seconds"]
540 with closing(self.db.cursor()) as cursor: 384 oldest = datetime.now() - timedelta(seconds=-max_age)
541 cursor.execute( 385 return {"count": await self.db.clean_unused(oldest)}
542 """
543 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
544 SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1
545 )
546 """,
547 {
548 "oldest": datetime.now() - timedelta(seconds=-max_age)
549 }
550 )
551 count = cursor.rowcount
552
553 return {"count": count}
554
555 def query_equivalent(self, cursor, method, taskhash):
556 # This is part of the inner loop and must be as fast as possible
557 cursor.execute(
558 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash',
559 {
560 'method': method,
561 'taskhash': taskhash,
562 }
563 )
564 return cursor.fetchone()
565 386
566 387
567class Server(bb.asyncrpc.AsyncServer): 388class Server(bb.asyncrpc.AsyncServer):
568 def __init__(self, db, upstream=None, read_only=False): 389 def __init__(self, db_engine, upstream=None, read_only=False):
569 if upstream and read_only: 390 if upstream and read_only:
570 raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") 391 raise bb.asyncrpc.ServerError(
392 "Read-only hashserv cannot pull from an upstream server"
393 )
571 394
572 super().__init__(logger) 395 super().__init__(logger)
573 396
574 self.request_stats = Stats() 397 self.request_stats = Stats()
575 self.db = db 398 self.db_engine = db_engine
576 self.upstream = upstream 399 self.upstream = upstream
577 self.read_only = read_only 400 self.read_only = read_only
578 self.backfill_queue = None 401 self.backfill_queue = None
579 402
580 def accept_client(self, socket): 403 def accept_client(self, socket):
581 return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) 404 return ServerClient(
405 socket,
406 self.db_engine,
407 self.request_stats,
408 self.backfill_queue,
409 self.upstream,
410 self.read_only,
411 )
582 412
583 async def backfill_worker_task(self): 413 async def backfill_worker_task(self):
584 client = await create_async_client(self.upstream) 414 async with await create_async_client(
585 try: 415 self.upstream
416 ) as client, self.db_engine.connect(logger) as db:
586 while True: 417 while True:
587 item = await self.backfill_queue.get() 418 item = await self.backfill_queue.get()
588 if item is None: 419 if item is None:
589 self.backfill_queue.task_done() 420 self.backfill_queue.task_done()
590 break 421 break
422
591 method, taskhash = item 423 method, taskhash = item
592 await copy_unihash_from_upstream(client, self.db, method, taskhash) 424 d = await client.get_taskhash(method, taskhash)
425 if d is not None:
426 await db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
593 self.backfill_queue.task_done() 427 self.backfill_queue.task_done()
594 finally:
595 await client.close()
596 428
597 def start(self): 429 def start(self):
598 tasks = super().start() 430 tasks = super().start()
599 if self.upstream: 431 if self.upstream:
600 self.backfill_queue = asyncio.Queue() 432 self.backfill_queue = asyncio.Queue()
601 tasks += [self.backfill_worker_task()] 433 tasks += [self.backfill_worker_task()]
434
435 self.loop.run_until_complete(self.db_engine.create())
436
602 return tasks 437 return tasks
603 438
604 async def stop(self): 439 async def stop(self):
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py
new file mode 100644
index 0000000000..6809c53706
--- /dev/null
+++ b/bitbake/lib/hashserv/sqlite.py
@@ -0,0 +1,259 @@
1#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7import sqlite3
8import logging
9from contextlib import closing
10
11logger = logging.getLogger("hashserv.sqlite")
12
13UNIHASH_TABLE_DEFINITION = (
14 ("method", "TEXT NOT NULL", "UNIQUE"),
15 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
16 ("unihash", "TEXT NOT NULL", ""),
17)
18
19UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
20
21OUTHASH_TABLE_DEFINITION = (
22 ("method", "TEXT NOT NULL", "UNIQUE"),
23 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
24 ("outhash", "TEXT NOT NULL", "UNIQUE"),
25 ("created", "DATETIME", ""),
26 # Optional fields
27 ("owner", "TEXT", ""),
28 ("PN", "TEXT", ""),
29 ("PV", "TEXT", ""),
30 ("PR", "TEXT", ""),
31 ("task", "TEXT", ""),
32 ("outhash_siginfo", "TEXT", ""),
33)
34
35OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
36
37
38def _make_table(cursor, name, definition):
39 cursor.execute(
40 """
41 CREATE TABLE IF NOT EXISTS {name} (
42 id INTEGER PRIMARY KEY AUTOINCREMENT,
43 {fields}
44 UNIQUE({unique})
45 )
46 """.format(
47 name=name,
48 fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
49 unique=", ".join(
50 name for name, _, flags in definition if "UNIQUE" in flags
51 ),
52 )
53 )
54
55
56class DatabaseEngine(object):
57 def __init__(self, dbname, sync):
58 self.dbname = dbname
59 self.logger = logger
60 self.sync = sync
61
62 async def create(self):
63 db = sqlite3.connect(self.dbname)
64 db.row_factory = sqlite3.Row
65
66 with closing(db.cursor()) as cursor:
67 _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION)
68 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
69
70 cursor.execute("PRAGMA journal_mode = WAL")
71 cursor.execute(
72 "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF")
73 )
74
75 # Drop old indexes
76 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup")
77 cursor.execute("DROP INDEX IF EXISTS outhash_lookup")
78 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2")
79 cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2")
80
81 # TODO: Upgrade from tasks_v2?
82 cursor.execute("DROP TABLE IF EXISTS tasks_v2")
83
84 # Create new indexes
85 cursor.execute(
86 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)"
87 )
88 cursor.execute(
89 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
90 )
91
92 def connect(self, logger):
93 return Database(logger, self.dbname)
94
95
96class Database(object):
97 def __init__(self, logger, dbname, sync=True):
98 self.dbname = dbname
99 self.logger = logger
100
101 self.db = sqlite3.connect(self.dbname)
102 self.db.row_factory = sqlite3.Row
103
104 async def __aenter__(self):
105 return self
106
107 async def __aexit__(self, exc_type, exc_value, traceback):
108 await self.close()
109
110 async def close(self):
111 self.db.close()
112
113 async def get_unihash_by_taskhash_full(self, method, taskhash):
114 with closing(self.db.cursor()) as cursor:
115 cursor.execute(
116 """
117 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
118 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
119 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
120 ORDER BY outhashes_v2.created ASC
121 LIMIT 1
122 """,
123 {
124 "method": method,
125 "taskhash": taskhash,
126 },
127 )
128 return cursor.fetchone()
129
130 async def get_unihash_by_outhash(self, method, outhash):
131 with closing(self.db.cursor()) as cursor:
132 cursor.execute(
133 """
134 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
135 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
136 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
137 ORDER BY outhashes_v2.created ASC
138 LIMIT 1
139 """,
140 {
141 "method": method,
142 "outhash": outhash,
143 },
144 )
145 return cursor.fetchone()
146
147 async def get_outhash(self, method, outhash):
148 with closing(self.db.cursor()) as cursor:
149 cursor.execute(
150 """
151 SELECT * FROM outhashes_v2
152 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
153 ORDER BY outhashes_v2.created ASC
154 LIMIT 1
155 """,
156 {
157 "method": method,
158 "outhash": outhash,
159 },
160 )
161 return cursor.fetchone()
162
163 async def get_equivalent_for_outhash(self, method, outhash, taskhash):
164 with closing(self.db.cursor()) as cursor:
165 cursor.execute(
166 """
167 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
168 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
169 -- Select any matching output hash except the one we just inserted
170 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
171 -- Pick the oldest hash
172 ORDER BY outhashes_v2.created ASC
173 LIMIT 1
174 """,
175 {
176 "method": method,
177 "outhash": outhash,
178 "taskhash": taskhash,
179 },
180 )
181 return cursor.fetchone()
182
183 async def get_equivalent(self, method, taskhash):
184 with closing(self.db.cursor()) as cursor:
185 cursor.execute(
186 "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash",
187 {
188 "method": method,
189 "taskhash": taskhash,
190 },
191 )
192 return cursor.fetchone()
193
194 async def remove(self, condition):
195 def do_remove(columns, table_name, cursor):
196 where = {}
197 for c in columns:
198 if c in condition and condition[c] is not None:
199 where[c] = condition[c]
200
201 if where:
202 query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join(
203 "%s=:%s" % (k, k) for k in where.keys()
204 )
205 cursor.execute(query, where)
206 return cursor.rowcount
207
208 return 0
209
210 count = 0
211 with closing(self.db.cursor()) as cursor:
212 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
213 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
214 self.db.commit()
215
216 return count
217
218 async def clean_unused(self, oldest):
219 with closing(self.db.cursor()) as cursor:
220 cursor.execute(
221 """
222 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
223 SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1
224 )
225 """,
226 {
227 "oldest": oldest,
228 },
229 )
230 return cursor.rowcount
231
232 async def insert_unihash(self, method, taskhash, unihash):
233 with closing(self.db.cursor()) as cursor:
234 prevrowid = cursor.lastrowid
235 cursor.execute(
236 """
237 INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash)
238 """,
239 {
240 "method": method,
241 "taskhash": taskhash,
242 "unihash": unihash,
243 },
244 )
245 self.db.commit()
246 return cursor.lastrowid != prevrowid
247
248 async def insert_outhash(self, data):
249 data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}
250 keys = sorted(data.keys())
251 query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format(
252 fields=", ".join(keys),
253 values=", ".join(":" + k for k in keys),
254 )
255 with closing(self.db.cursor()) as cursor:
256 prevrowid = cursor.lastrowid
257 cursor.execute(query, data)
258 self.db.commit()
259 return cursor.lastrowid != prevrowid