#! /usr/bin/env python3 # # Copyright (C) 2023 Garmin Ltd. # # SPDX-License-Identifier: GPL-2.0-only # import sqlite3 import logging from contextlib import closing from . import User logger = logging.getLogger("hashserv.sqlite") UNIHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), ("unihash", "TEXT NOT NULL", ""), ) UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) OUTHASH_TABLE_DEFINITION = ( ("method", "TEXT NOT NULL", "UNIQUE"), ("taskhash", "TEXT NOT NULL", "UNIQUE"), ("outhash", "TEXT NOT NULL", "UNIQUE"), ("created", "DATETIME", ""), # Optional fields ("owner", "TEXT", ""), ("PN", "TEXT", ""), ("PV", "TEXT", ""), ("PR", "TEXT", ""), ("task", "TEXT", ""), ("outhash_siginfo", "TEXT", ""), ) OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) USERS_TABLE_DEFINITION = ( ("username", "TEXT NOT NULL", "UNIQUE"), ("token", "TEXT NOT NULL", ""), ("permissions", "TEXT NOT NULL", ""), ) USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) def _make_table(cursor, name, definition): cursor.execute( """ CREATE TABLE IF NOT EXISTS {name} ( id INTEGER PRIMARY KEY AUTOINCREMENT, {fields} UNIQUE({unique}) ) """.format( name=name, fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), unique=", ".join( name for name, _, flags in definition if "UNIQUE" in flags ), ) ) def map_user(row): if row is None: return None return User( username=row["username"], permissions=set(row["permissions"].split()), ) class DatabaseEngine(object): def __init__(self, dbname, sync): self.dbname = dbname self.logger = logger self.sync = sync async def create(self): db = sqlite3.connect(self.dbname) db.row_factory = sqlite3.Row with closing(db.cursor()) as cursor: _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) _make_table(cursor, "users", USERS_TABLE_DEFINITION) cursor.execute("PRAGMA journal_mode = WAL") cursor.execute( "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF") ) # Drop old indexes cursor.execute("DROP INDEX IF EXISTS taskhash_lookup") cursor.execute("DROP INDEX IF EXISTS outhash_lookup") cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") # TODO: Upgrade from tasks_v2? cursor.execute("DROP TABLE IF EXISTS tasks_v2") # Create new indexes cursor.execute( "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" ) cursor.execute( "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" ) def connect(self, logger): return Database(logger, self.dbname, self.sync) class Database(object): def __init__(self, logger, dbname, sync): self.dbname = dbname self.logger = logger self.db = sqlite3.connect(self.dbname) self.db.row_factory = sqlite3.Row with closing(self.db.cursor()) as cursor: cursor.execute("PRAGMA journal_mode = WAL") cursor.execute( "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") ) cursor.execute("SELECT sqlite_version()") version = [] for v in cursor.fetchone()[0].split("."): try: version.append(int(v)) except ValueError: version.append(v) self.sqlite_version = tuple(version) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): await self.close() async def close(self): self.db.close() async def get_unihash_by_taskhash_full(self, method, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "taskhash": taskhash, }, ) return cursor.fetchone() async def get_unihash_by_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "outhash": outhash, }, ) return cursor.fetchone() async def get_outhash(self, method, outhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT * FROM outhashes_v2 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "outhash": outhash, }, ) return cursor.fetchone() async def get_equivalent_for_outhash(self, method, outhash, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash -- Select any matching output hash except the one we just inserted WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash -- Pick the oldest hash ORDER BY outhashes_v2.created ASC LIMIT 1 """, { "method": method, "outhash": outhash, "taskhash": taskhash, }, ) return cursor.fetchone() async def get_equivalent(self, method, taskhash): with closing(self.db.cursor()) as cursor: cursor.execute( "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", { "method": method, "taskhash": taskhash, }, ) return cursor.fetchone() async def remove(self, condition): def do_remove(columns, table_name, cursor): where = {} for c in columns: if c in condition and condition[c] is not None: where[c] = condition[c] if where: query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( "%s=:%s" % (k, k) for k in where.keys() ) cursor.execute(query, where) return cursor.rowcount return 0 count = 0 with closing(self.db.cursor()) as cursor: count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) self.db.commit() return count async def clean_unused(self, oldest): with closing(self.db.cursor()) as cursor: cursor.execute( """ DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 ) """, { "oldest": oldest, }, ) self.db.commit() return cursor.rowcount async def insert_unihash(self, method, taskhash, unihash): with closing(self.db.cursor()) as cursor: prevrowid = cursor.lastrowid cursor.execute( """ INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) """, { "method": method, "taskhash": taskhash, "unihash": unihash, }, ) self.db.commit() return cursor.lastrowid != prevrowid async def insert_outhash(self, data): data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS} keys = sorted(data.keys()) query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format( fields=", ".join(keys), values=", ".join(":" + k for k in keys), ) with closing(self.db.cursor()) as cursor: prevrowid = cursor.lastrowid cursor.execute(query, data) self.db.commit() return cursor.lastrowid != prevrowid def _get_user(self, username): with closing(self.db.cursor()) as cursor: cursor.execute( """ SELECT username, permissions, token FROM users WHERE username=:username """, { "username": username, }, ) return cursor.fetchone() async def lookup_user_token(self, username): row = self._get_user(username) if row is None: return None, None return map_user(row), row["token"] async def lookup_user(self, username): return map_user(self._get_user(username)) async def set_user_token(self, username, token): with closing(self.db.cursor()) as cursor: cursor.execute( """ UPDATE users SET token=:token WHERE username=:username """, { "username": username, "token": token, }, ) self.db.commit() return cursor.rowcount != 0 async def set_user_perms(self, username, permissions): with closing(self.db.cursor()) as cursor: cursor.execute( """ UPDATE users SET permissions=:permissions WHERE username=:username """, { "username": username, "permissions": " ".join(permissions), }, ) self.db.commit() return cursor.rowcount != 0 async def get_all_users(self): with closing(self.db.cursor()) as cursor: cursor.execute("SELECT username, permissions FROM users") return [map_user(r) for r in cursor.fetchall()] async def new_user(self, username, permissions, token): with closing(self.db.cursor()) as cursor: try: cursor.execute( """ INSERT INTO users (username, token, permissions) VALUES (:username, :token, :permissions) """, { "username": username, "token": token, "permissions": " ".join(permissions), }, ) self.db.commit() return True except sqlite3.IntegrityError: return False async def delete_user(self, username): with closing(self.db.cursor()) as cursor: cursor.execute( """ DELETE FROM users WHERE username=:username """, { "username": username, }, ) self.db.commit() return cursor.rowcount != 0 async def get_usage(self): usage = {} with closing(self.db.cursor()) as cursor: if self.sqlite_version >= (3, 33): table_name = "sqlite_schema" else: table_name = "sqlite_master" cursor.execute( f""" SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' """ ) for row in cursor.fetchall(): cursor.execute( """ SELECT COUNT() FROM %s """ % row["name"], ) usage[row["name"]] = { "rows": cursor.fetchone()[0], } return usage async def get_query_columns(self): columns = set() for name, typ, _ in UNIHASH_TABLE_DEFINITION + OUTHASH_TABLE_DEFINITION: if typ.startswith("TEXT"): columns.add(name) return list(columns)