summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/sqlite.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/sqlite.py')
-rw-r--r--bitbake/lib/hashserv/sqlite.py562
1 files changed, 562 insertions, 0 deletions
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py
new file mode 100644
index 0000000000..da2e844a03
--- /dev/null
+++ b/bitbake/lib/hashserv/sqlite.py
@@ -0,0 +1,562 @@
1#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7import sqlite3
8import logging
9from contextlib import closing
10from . import User
11
12logger = logging.getLogger("hashserv.sqlite")
13
14UNIHASH_TABLE_DEFINITION = (
15 ("method", "TEXT NOT NULL", "UNIQUE"),
16 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
17 ("unihash", "TEXT NOT NULL", ""),
18 ("gc_mark", "TEXT NOT NULL", ""),
19)
20
21UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
22
23OUTHASH_TABLE_DEFINITION = (
24 ("method", "TEXT NOT NULL", "UNIQUE"),
25 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
26 ("outhash", "TEXT NOT NULL", "UNIQUE"),
27 ("created", "DATETIME", ""),
28 # Optional fields
29 ("owner", "TEXT", ""),
30 ("PN", "TEXT", ""),
31 ("PV", "TEXT", ""),
32 ("PR", "TEXT", ""),
33 ("task", "TEXT", ""),
34 ("outhash_siginfo", "TEXT", ""),
35)
36
37OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
38
39USERS_TABLE_DEFINITION = (
40 ("username", "TEXT NOT NULL", "UNIQUE"),
41 ("token", "TEXT NOT NULL", ""),
42 ("permissions", "TEXT NOT NULL", ""),
43)
44
45USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION)
46
47
48CONFIG_TABLE_DEFINITION = (
49 ("name", "TEXT NOT NULL", "UNIQUE"),
50 ("value", "TEXT", ""),
51)
52
53CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION)
54
55
56def _make_table(cursor, name, definition):
57 cursor.execute(
58 """
59 CREATE TABLE IF NOT EXISTS {name} (
60 id INTEGER PRIMARY KEY AUTOINCREMENT,
61 {fields}
62 UNIQUE({unique})
63 )
64 """.format(
65 name=name,
66 fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
67 unique=", ".join(
68 name for name, _, flags in definition if "UNIQUE" in flags
69 ),
70 )
71 )
72
73
74def map_user(row):
75 if row is None:
76 return None
77 return User(
78 username=row["username"],
79 permissions=set(row["permissions"].split()),
80 )
81
82
83def _make_condition_statement(columns, condition):
84 where = {}
85 for c in columns:
86 if c in condition and condition[c] is not None:
87 where[c] = condition[c]
88
89 return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys())
90
91
92def _get_sqlite_version(cursor):
93 cursor.execute("SELECT sqlite_version()")
94
95 version = []
96 for v in cursor.fetchone()[0].split("."):
97 try:
98 version.append(int(v))
99 except ValueError:
100 version.append(v)
101
102 return tuple(version)
103
104
105def _schema_table_name(version):
106 if version >= (3, 33):
107 return "sqlite_schema"
108
109 return "sqlite_master"
110
111
112class DatabaseEngine(object):
113 def __init__(self, dbname, sync):
114 self.dbname = dbname
115 self.logger = logger
116 self.sync = sync
117
118 async def create(self):
119 db = sqlite3.connect(self.dbname)
120 db.row_factory = sqlite3.Row
121
122 with closing(db.cursor()) as cursor:
123 _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION)
124 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
125 _make_table(cursor, "users", USERS_TABLE_DEFINITION)
126 _make_table(cursor, "config", CONFIG_TABLE_DEFINITION)
127
128 cursor.execute("PRAGMA journal_mode = WAL")
129 cursor.execute(
130 "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF")
131 )
132
133 # Drop old indexes
134 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup")
135 cursor.execute("DROP INDEX IF EXISTS outhash_lookup")
136 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2")
137 cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2")
138 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3")
139
140 # TODO: Upgrade from tasks_v2?
141 cursor.execute("DROP TABLE IF EXISTS tasks_v2")
142
143 # Create new indexes
144 cursor.execute(
145 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)"
146 )
147 cursor.execute(
148 "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)"
149 )
150 cursor.execute(
151 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
152 )
153 cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)")
154
155 sqlite_version = _get_sqlite_version(cursor)
156
157 cursor.execute(
158 f"""
159 SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2'
160 """
161 )
162 if cursor.fetchone():
163 self.logger.info("Upgrading Unihashes V2 -> V3...")
164 cursor.execute(
165 """
166 INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark)
167 SELECT id, method, unihash, taskhash, '' FROM unihashes_v2
168 """
169 )
170 cursor.execute("DROP TABLE unihashes_v2")
171 db.commit()
172 self.logger.info("Upgrade complete")
173
174 def connect(self, logger):
175 return Database(logger, self.dbname, self.sync)
176
177
178class Database(object):
179 def __init__(self, logger, dbname, sync):
180 self.dbname = dbname
181 self.logger = logger
182
183 self.db = sqlite3.connect(self.dbname)
184 self.db.row_factory = sqlite3.Row
185
186 with closing(self.db.cursor()) as cursor:
187 cursor.execute("PRAGMA journal_mode = WAL")
188 cursor.execute(
189 "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF")
190 )
191
192 self.sqlite_version = _get_sqlite_version(cursor)
193
194 async def __aenter__(self):
195 return self
196
197 async def __aexit__(self, exc_type, exc_value, traceback):
198 await self.close()
199
200 async def _set_config(self, cursor, name, value):
201 cursor.execute(
202 """
203 INSERT OR REPLACE INTO config (id, name, value) VALUES
204 ((SELECT id FROM config WHERE name=:name), :name, :value)
205 """,
206 {
207 "name": name,
208 "value": value,
209 },
210 )
211
212 async def _get_config(self, cursor, name):
213 cursor.execute(
214 "SELECT value FROM config WHERE name=:name",
215 {
216 "name": name,
217 },
218 )
219 row = cursor.fetchone()
220 if row is None:
221 return None
222 return row["value"]
223
224 async def close(self):
225 self.db.close()
226
227 async def get_unihash_by_taskhash_full(self, method, taskhash):
228 with closing(self.db.cursor()) as cursor:
229 cursor.execute(
230 """
231 SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2
232 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
233 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
234 ORDER BY outhashes_v2.created ASC
235 LIMIT 1
236 """,
237 {
238 "method": method,
239 "taskhash": taskhash,
240 },
241 )
242 return cursor.fetchone()
243
244 async def get_unihash_by_outhash(self, method, outhash):
245 with closing(self.db.cursor()) as cursor:
246 cursor.execute(
247 """
248 SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2
249 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
250 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
251 ORDER BY outhashes_v2.created ASC
252 LIMIT 1
253 """,
254 {
255 "method": method,
256 "outhash": outhash,
257 },
258 )
259 return cursor.fetchone()
260
261 async def unihash_exists(self, unihash):
262 with closing(self.db.cursor()) as cursor:
263 cursor.execute(
264 """
265 SELECT * FROM unihashes_v3 WHERE unihash=:unihash
266 LIMIT 1
267 """,
268 {
269 "unihash": unihash,
270 },
271 )
272 return cursor.fetchone() is not None
273
274 async def get_outhash(self, method, outhash):
275 with closing(self.db.cursor()) as cursor:
276 cursor.execute(
277 """
278 SELECT * FROM outhashes_v2
279 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
280 ORDER BY outhashes_v2.created ASC
281 LIMIT 1
282 """,
283 {
284 "method": method,
285 "outhash": outhash,
286 },
287 )
288 return cursor.fetchone()
289
290 async def get_equivalent_for_outhash(self, method, outhash, taskhash):
291 with closing(self.db.cursor()) as cursor:
292 cursor.execute(
293 """
294 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2
295 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
296 -- Select any matching output hash except the one we just inserted
297 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
298 -- Pick the oldest hash
299 ORDER BY outhashes_v2.created ASC
300 LIMIT 1
301 """,
302 {
303 "method": method,
304 "outhash": outhash,
305 "taskhash": taskhash,
306 },
307 )
308 return cursor.fetchone()
309
310 async def get_equivalent(self, method, taskhash):
311 with closing(self.db.cursor()) as cursor:
312 cursor.execute(
313 "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash",
314 {
315 "method": method,
316 "taskhash": taskhash,
317 },
318 )
319 return cursor.fetchone()
320
321 async def remove(self, condition):
322 def do_remove(columns, table_name, cursor):
323 where, clause = _make_condition_statement(columns, condition)
324 if where:
325 query = f"DELETE FROM {table_name} WHERE {clause}"
326 cursor.execute(query, where)
327 return cursor.rowcount
328
329 return 0
330
331 count = 0
332 with closing(self.db.cursor()) as cursor:
333 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
334 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor)
335 self.db.commit()
336
337 return count
338
339 async def get_current_gc_mark(self):
340 with closing(self.db.cursor()) as cursor:
341 return await self._get_config(cursor, "gc-mark")
342
343 async def gc_status(self):
344 with closing(self.db.cursor()) as cursor:
345 cursor.execute(
346 """
347 SELECT COUNT() FROM unihashes_v3 WHERE
348 gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
349 """
350 )
351 keep_rows = cursor.fetchone()[0]
352
353 cursor.execute(
354 """
355 SELECT COUNT() FROM unihashes_v3 WHERE
356 gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
357 """
358 )
359 remove_rows = cursor.fetchone()[0]
360
361 current_mark = await self._get_config(cursor, "gc-mark")
362
363 return (keep_rows, remove_rows, current_mark)
364
365 async def gc_mark(self, mark, condition):
366 with closing(self.db.cursor()) as cursor:
367 await self._set_config(cursor, "gc-mark", mark)
368
369 where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition)
370
371 new_rows = 0
372 if where:
373 cursor.execute(
374 f"""
375 UPDATE unihashes_v3 SET
376 gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
377 WHERE {clause}
378 """,
379 where,
380 )
381 new_rows = cursor.rowcount
382
383 self.db.commit()
384 return new_rows
385
386 async def gc_sweep(self):
387 with closing(self.db.cursor()) as cursor:
388 # NOTE: COALESCE is not used in this query so that if the current
389 # mark is NULL, nothing will happen
390 cursor.execute(
391 """
392 DELETE FROM unihashes_v3 WHERE
393 gc_mark!=(SELECT value FROM config WHERE name='gc-mark')
394 """
395 )
396 count = cursor.rowcount
397 await self._set_config(cursor, "gc-mark", None)
398
399 self.db.commit()
400 return count
401
402 async def clean_unused(self, oldest):
403 with closing(self.db.cursor()) as cursor:
404 cursor.execute(
405 """
406 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
407 SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1
408 )
409 """,
410 {
411 "oldest": oldest,
412 },
413 )
414 self.db.commit()
415 return cursor.rowcount
416
417 async def insert_unihash(self, method, taskhash, unihash):
418 with closing(self.db.cursor()) as cursor:
419 prevrowid = cursor.lastrowid
420 cursor.execute(
421 """
422 INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES
423 (
424 :method,
425 :taskhash,
426 :unihash,
427 COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
428 )
429 """,
430 {
431 "method": method,
432 "taskhash": taskhash,
433 "unihash": unihash,
434 },
435 )
436 self.db.commit()
437 return cursor.lastrowid != prevrowid
438
439 async def insert_outhash(self, data):
440 data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}
441 keys = sorted(data.keys())
442 query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format(
443 fields=", ".join(keys),
444 values=", ".join(":" + k for k in keys),
445 )
446 with closing(self.db.cursor()) as cursor:
447 prevrowid = cursor.lastrowid
448 cursor.execute(query, data)
449 self.db.commit()
450 return cursor.lastrowid != prevrowid
451
452 def _get_user(self, username):
453 with closing(self.db.cursor()) as cursor:
454 cursor.execute(
455 """
456 SELECT username, permissions, token FROM users WHERE username=:username
457 """,
458 {
459 "username": username,
460 },
461 )
462 return cursor.fetchone()
463
464 async def lookup_user_token(self, username):
465 row = self._get_user(username)
466 if row is None:
467 return None, None
468 return map_user(row), row["token"]
469
470 async def lookup_user(self, username):
471 return map_user(self._get_user(username))
472
473 async def set_user_token(self, username, token):
474 with closing(self.db.cursor()) as cursor:
475 cursor.execute(
476 """
477 UPDATE users SET token=:token WHERE username=:username
478 """,
479 {
480 "username": username,
481 "token": token,
482 },
483 )
484 self.db.commit()
485 return cursor.rowcount != 0
486
487 async def set_user_perms(self, username, permissions):
488 with closing(self.db.cursor()) as cursor:
489 cursor.execute(
490 """
491 UPDATE users SET permissions=:permissions WHERE username=:username
492 """,
493 {
494 "username": username,
495 "permissions": " ".join(permissions),
496 },
497 )
498 self.db.commit()
499 return cursor.rowcount != 0
500
501 async def get_all_users(self):
502 with closing(self.db.cursor()) as cursor:
503 cursor.execute("SELECT username, permissions FROM users")
504 return [map_user(r) for r in cursor.fetchall()]
505
506 async def new_user(self, username, permissions, token):
507 with closing(self.db.cursor()) as cursor:
508 try:
509 cursor.execute(
510 """
511 INSERT INTO users (username, token, permissions) VALUES (:username, :token, :permissions)
512 """,
513 {
514 "username": username,
515 "token": token,
516 "permissions": " ".join(permissions),
517 },
518 )
519 self.db.commit()
520 return True
521 except sqlite3.IntegrityError:
522 return False
523
524 async def delete_user(self, username):
525 with closing(self.db.cursor()) as cursor:
526 cursor.execute(
527 """
528 DELETE FROM users WHERE username=:username
529 """,
530 {
531 "username": username,
532 },
533 )
534 self.db.commit()
535 return cursor.rowcount != 0
536
537 async def get_usage(self):
538 usage = {}
539 with closing(self.db.cursor()) as cursor:
540 cursor.execute(
541 f"""
542 SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
543 """
544 )
545 for row in cursor.fetchall():
546 cursor.execute(
547 """
548 SELECT COUNT() FROM %s
549 """
550 % row["name"],
551 )
552 usage[row["name"]] = {
553 "rows": cursor.fetchone()[0],
554 }
555 return usage
556
557 async def get_query_columns(self):
558 columns = set()
559 for name, typ, _ in UNIHASH_TABLE_DEFINITION + OUTHASH_TABLE_DEFINITION:
560 if typ.startswith("TEXT"):
561 columns.add(name)
562 return list(columns)