diff options
Diffstat (limited to 'bitbake/lib/hashserv/sqlalchemy.py')
| -rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 226 |
1 files changed, 188 insertions, 38 deletions
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index cee04bffb0..89a6b86d9d 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
| @@ -28,6 +28,7 @@ from sqlalchemy import ( | |||
| 28 | delete, | 28 | delete, |
| 29 | update, | 29 | update, |
| 30 | func, | 30 | func, |
| 31 | inspect, | ||
| 31 | ) | 32 | ) |
| 32 | import sqlalchemy.engine | 33 | import sqlalchemy.engine |
| 33 | from sqlalchemy.orm import declarative_base | 34 | from sqlalchemy.orm import declarative_base |
| @@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError | |||
| 36 | Base = declarative_base() | 37 | Base = declarative_base() |
| 37 | 38 | ||
| 38 | 39 | ||
| 39 | class UnihashesV2(Base): | 40 | class UnihashesV3(Base): |
| 40 | __tablename__ = "unihashes_v2" | 41 | __tablename__ = "unihashes_v3" |
| 41 | id = Column(Integer, primary_key=True, autoincrement=True) | 42 | id = Column(Integer, primary_key=True, autoincrement=True) |
| 42 | method = Column(Text, nullable=False) | 43 | method = Column(Text, nullable=False) |
| 43 | taskhash = Column(Text, nullable=False) | 44 | taskhash = Column(Text, nullable=False) |
| 44 | unihash = Column(Text, nullable=False) | 45 | unihash = Column(Text, nullable=False) |
| 46 | gc_mark = Column(Text, nullable=False) | ||
| 45 | 47 | ||
| 46 | __table_args__ = ( | 48 | __table_args__ = ( |
| 47 | UniqueConstraint("method", "taskhash"), | 49 | UniqueConstraint("method", "taskhash"), |
| 48 | Index("taskhash_lookup_v3", "method", "taskhash"), | 50 | Index("taskhash_lookup_v4", "method", "taskhash"), |
| 49 | ) | 51 | ) |
| 50 | 52 | ||
| 51 | 53 | ||
| @@ -79,6 +81,36 @@ class Users(Base): | |||
| 79 | __table_args__ = (UniqueConstraint("username"),) | 81 | __table_args__ = (UniqueConstraint("username"),) |
| 80 | 82 | ||
| 81 | 83 | ||
| 84 | class Config(Base): | ||
| 85 | __tablename__ = "config" | ||
| 86 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
| 87 | name = Column(Text, nullable=False) | ||
| 88 | value = Column(Text) | ||
| 89 | __table_args__ = ( | ||
| 90 | UniqueConstraint("name"), | ||
| 91 | Index("config_lookup", "name"), | ||
| 92 | ) | ||
| 93 | |||
| 94 | |||
| 95 | # | ||
| 96 | # Old table versions | ||
| 97 | # | ||
| 98 | DeprecatedBase = declarative_base() | ||
| 99 | |||
| 100 | |||
| 101 | class UnihashesV2(DeprecatedBase): | ||
| 102 | __tablename__ = "unihashes_v2" | ||
| 103 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
| 104 | method = Column(Text, nullable=False) | ||
| 105 | taskhash = Column(Text, nullable=False) | ||
| 106 | unihash = Column(Text, nullable=False) | ||
| 107 | |||
| 108 | __table_args__ = ( | ||
| 109 | UniqueConstraint("method", "taskhash"), | ||
| 110 | Index("taskhash_lookup_v3", "method", "taskhash"), | ||
| 111 | ) | ||
| 112 | |||
| 113 | |||
| 82 | class DatabaseEngine(object): | 114 | class DatabaseEngine(object): |
| 83 | def __init__(self, url, username=None, password=None): | 115 | def __init__(self, url, username=None, password=None): |
| 84 | self.logger = logging.getLogger("hashserv.sqlalchemy") | 116 | self.logger = logging.getLogger("hashserv.sqlalchemy") |
| @@ -91,6 +123,9 @@ class DatabaseEngine(object): | |||
| 91 | self.url = self.url.set(password=password) | 123 | self.url = self.url.set(password=password) |
| 92 | 124 | ||
| 93 | async def create(self): | 125 | async def create(self): |
| 126 | def check_table_exists(conn, name): | ||
| 127 | return inspect(conn).has_table(name) | ||
| 128 | |||
| 94 | self.logger.info("Using database %s", self.url) | 129 | self.logger.info("Using database %s", self.url) |
| 95 | self.engine = create_async_engine(self.url, poolclass=NullPool) | 130 | self.engine = create_async_engine(self.url, poolclass=NullPool) |
| 96 | 131 | ||
| @@ -99,6 +134,24 @@ class DatabaseEngine(object): | |||
| 99 | self.logger.info("Creating tables...") | 134 | self.logger.info("Creating tables...") |
| 100 | await conn.run_sync(Base.metadata.create_all) | 135 | await conn.run_sync(Base.metadata.create_all) |
| 101 | 136 | ||
| 137 | if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__): | ||
| 138 | self.logger.info("Upgrading Unihashes V2 -> V3...") | ||
| 139 | statement = insert(UnihashesV3).from_select( | ||
| 140 | ["id", "method", "unihash", "taskhash", "gc_mark"], | ||
| 141 | select( | ||
| 142 | UnihashesV2.id, | ||
| 143 | UnihashesV2.method, | ||
| 144 | UnihashesV2.unihash, | ||
| 145 | UnihashesV2.taskhash, | ||
| 146 | literal("").label("gc_mark"), | ||
| 147 | ), | ||
| 148 | ) | ||
| 149 | self.logger.debug("%s", statement) | ||
| 150 | await conn.execute(statement) | ||
| 151 | |||
| 152 | await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__]) | ||
| 153 | self.logger.info("Upgrade complete") | ||
| 154 | |||
| 102 | def connect(self, logger): | 155 | def connect(self, logger): |
| 103 | return Database(self.engine, logger) | 156 | return Database(self.engine, logger) |
| 104 | 157 | ||
| @@ -118,6 +171,15 @@ def map_user(row): | |||
| 118 | ) | 171 | ) |
| 119 | 172 | ||
| 120 | 173 | ||
| 174 | def _make_condition_statement(table, condition): | ||
| 175 | where = {} | ||
| 176 | for c in table.__table__.columns: | ||
| 177 | if c.key in condition and condition[c.key] is not None: | ||
| 178 | where[c] = condition[c.key] | ||
| 179 | |||
| 180 | return [(k == v) for k, v in where.items()] | ||
| 181 | |||
| 182 | |||
| 121 | class Database(object): | 183 | class Database(object): |
| 122 | def __init__(self, engine, logger): | 184 | def __init__(self, engine, logger): |
| 123 | self.engine = engine | 185 | self.engine = engine |
| @@ -135,17 +197,52 @@ class Database(object): | |||
| 135 | await self.db.close() | 197 | await self.db.close() |
| 136 | self.db = None | 198 | self.db = None |
| 137 | 199 | ||
| 200 | async def _execute(self, statement): | ||
| 201 | self.logger.debug("%s", statement) | ||
| 202 | return await self.db.execute(statement) | ||
| 203 | |||
| 204 | async def _set_config(self, name, value): | ||
| 205 | while True: | ||
| 206 | result = await self._execute( | ||
| 207 | update(Config).where(Config.name == name).values(value=value) | ||
| 208 | ) | ||
| 209 | |||
| 210 | if result.rowcount == 0: | ||
| 211 | self.logger.debug("Config '%s' not found. Adding it", name) | ||
| 212 | try: | ||
| 213 | await self._execute(insert(Config).values(name=name, value=value)) | ||
| 214 | except IntegrityError: | ||
| 215 | # Race. Try again | ||
| 216 | continue | ||
| 217 | |||
| 218 | break | ||
| 219 | |||
| 220 | def _get_config_subquery(self, name, default=None): | ||
| 221 | if default is not None: | ||
| 222 | return func.coalesce( | ||
| 223 | select(Config.value).where(Config.name == name).scalar_subquery(), | ||
| 224 | default, | ||
| 225 | ) | ||
| 226 | return select(Config.value).where(Config.name == name).scalar_subquery() | ||
| 227 | |||
| 228 | async def _get_config(self, name): | ||
| 229 | result = await self._execute(select(Config.value).where(Config.name == name)) | ||
| 230 | row = result.first() | ||
| 231 | if row is None: | ||
| 232 | return None | ||
| 233 | return row.value | ||
| 234 | |||
| 138 | async def get_unihash_by_taskhash_full(self, method, taskhash): | 235 | async def get_unihash_by_taskhash_full(self, method, taskhash): |
| 139 | statement = ( | 236 | statement = ( |
| 140 | select( | 237 | select( |
| 141 | OuthashesV2, | 238 | OuthashesV2, |
| 142 | UnihashesV2.unihash.label("unihash"), | 239 | UnihashesV3.unihash.label("unihash"), |
| 143 | ) | 240 | ) |
| 144 | .join( | 241 | .join( |
| 145 | UnihashesV2, | 242 | UnihashesV3, |
| 146 | and_( | 243 | and_( |
| 147 | UnihashesV2.method == OuthashesV2.method, | 244 | UnihashesV3.method == OuthashesV2.method, |
| 148 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 245 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 149 | ), | 246 | ), |
| 150 | ) | 247 | ) |
| 151 | .where( | 248 | .where( |
| @@ -164,12 +261,12 @@ class Database(object): | |||
| 164 | 261 | ||
| 165 | async def get_unihash_by_outhash(self, method, outhash): | 262 | async def get_unihash_by_outhash(self, method, outhash): |
| 166 | statement = ( | 263 | statement = ( |
| 167 | select(OuthashesV2, UnihashesV2.unihash.label("unihash")) | 264 | select(OuthashesV2, UnihashesV3.unihash.label("unihash")) |
| 168 | .join( | 265 | .join( |
| 169 | UnihashesV2, | 266 | UnihashesV3, |
| 170 | and_( | 267 | and_( |
| 171 | UnihashesV2.method == OuthashesV2.method, | 268 | UnihashesV3.method == OuthashesV2.method, |
| 172 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 269 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 173 | ), | 270 | ), |
| 174 | ) | 271 | ) |
| 175 | .where( | 272 | .where( |
| @@ -208,13 +305,13 @@ class Database(object): | |||
| 208 | statement = ( | 305 | statement = ( |
| 209 | select( | 306 | select( |
| 210 | OuthashesV2.taskhash.label("taskhash"), | 307 | OuthashesV2.taskhash.label("taskhash"), |
| 211 | UnihashesV2.unihash.label("unihash"), | 308 | UnihashesV3.unihash.label("unihash"), |
| 212 | ) | 309 | ) |
| 213 | .join( | 310 | .join( |
| 214 | UnihashesV2, | 311 | UnihashesV3, |
| 215 | and_( | 312 | and_( |
| 216 | UnihashesV2.method == OuthashesV2.method, | 313 | UnihashesV3.method == OuthashesV2.method, |
| 217 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 314 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 218 | ), | 315 | ), |
| 219 | ) | 316 | ) |
| 220 | .where( | 317 | .where( |
| @@ -234,12 +331,12 @@ class Database(object): | |||
| 234 | 331 | ||
| 235 | async def get_equivalent(self, method, taskhash): | 332 | async def get_equivalent(self, method, taskhash): |
| 236 | statement = select( | 333 | statement = select( |
| 237 | UnihashesV2.unihash, | 334 | UnihashesV3.unihash, |
| 238 | UnihashesV2.method, | 335 | UnihashesV3.method, |
| 239 | UnihashesV2.taskhash, | 336 | UnihashesV3.taskhash, |
| 240 | ).where( | 337 | ).where( |
| 241 | UnihashesV2.method == method, | 338 | UnihashesV3.method == method, |
| 242 | UnihashesV2.taskhash == taskhash, | 339 | UnihashesV3.taskhash == taskhash, |
| 243 | ) | 340 | ) |
| 244 | self.logger.debug("%s", statement) | 341 | self.logger.debug("%s", statement) |
| 245 | async with self.db.begin(): | 342 | async with self.db.begin(): |
| @@ -248,13 +345,9 @@ class Database(object): | |||
| 248 | 345 | ||
| 249 | async def remove(self, condition): | 346 | async def remove(self, condition): |
| 250 | async def do_remove(table): | 347 | async def do_remove(table): |
| 251 | where = {} | 348 | where = _make_condition_statement(table, condition) |
| 252 | for c in table.__table__.columns: | ||
| 253 | if c.key in condition and condition[c.key] is not None: | ||
| 254 | where[c] = condition[c.key] | ||
| 255 | |||
| 256 | if where: | 349 | if where: |
| 257 | statement = delete(table).where(*[(k == v) for k, v in where.items()]) | 350 | statement = delete(table).where(*where) |
| 258 | self.logger.debug("%s", statement) | 351 | self.logger.debug("%s", statement) |
| 259 | async with self.db.begin(): | 352 | async with self.db.begin(): |
| 260 | result = await self.db.execute(statement) | 353 | result = await self.db.execute(statement) |
| @@ -263,19 +356,74 @@ class Database(object): | |||
| 263 | return 0 | 356 | return 0 |
| 264 | 357 | ||
| 265 | count = 0 | 358 | count = 0 |
| 266 | count += await do_remove(UnihashesV2) | 359 | count += await do_remove(UnihashesV3) |
| 267 | count += await do_remove(OuthashesV2) | 360 | count += await do_remove(OuthashesV2) |
| 268 | 361 | ||
| 269 | return count | 362 | return count |
| 270 | 363 | ||
| 364 | async def get_current_gc_mark(self): | ||
| 365 | async with self.db.begin(): | ||
| 366 | return await self._get_config("gc-mark") | ||
| 367 | |||
| 368 | async def gc_status(self): | ||
| 369 | async with self.db.begin(): | ||
| 370 | gc_mark_subquery = self._get_config_subquery("gc-mark", "") | ||
| 371 | |||
| 372 | result = await self._execute( | ||
| 373 | select(func.count()) | ||
| 374 | .select_from(UnihashesV3) | ||
| 375 | .where(UnihashesV3.gc_mark == gc_mark_subquery) | ||
| 376 | ) | ||
| 377 | keep_rows = result.scalar() | ||
| 378 | |||
| 379 | result = await self._execute( | ||
| 380 | select(func.count()) | ||
| 381 | .select_from(UnihashesV3) | ||
| 382 | .where(UnihashesV3.gc_mark != gc_mark_subquery) | ||
| 383 | ) | ||
| 384 | remove_rows = result.scalar() | ||
| 385 | |||
| 386 | return (keep_rows, remove_rows, await self._get_config("gc-mark")) | ||
| 387 | |||
| 388 | async def gc_mark(self, mark, condition): | ||
| 389 | async with self.db.begin(): | ||
| 390 | await self._set_config("gc-mark", mark) | ||
| 391 | |||
| 392 | where = _make_condition_statement(UnihashesV3, condition) | ||
| 393 | if not where: | ||
| 394 | return 0 | ||
| 395 | |||
| 396 | result = await self._execute( | ||
| 397 | update(UnihashesV3) | ||
| 398 | .values(gc_mark=self._get_config_subquery("gc-mark", "")) | ||
| 399 | .where(*where) | ||
| 400 | ) | ||
| 401 | return result.rowcount | ||
| 402 | |||
| 403 | async def gc_sweep(self): | ||
| 404 | async with self.db.begin(): | ||
| 405 | result = await self._execute( | ||
| 406 | delete(UnihashesV3).where( | ||
| 407 | # A sneaky conditional that provides some errant use | ||
| 408 | # protection: If the config mark is NULL, this will not | ||
| 409 | # match any rows because No default is specified in the | ||
| 410 | # select statement | ||
| 411 | UnihashesV3.gc_mark | ||
| 412 | != self._get_config_subquery("gc-mark") | ||
| 413 | ) | ||
| 414 | ) | ||
| 415 | await self._set_config("gc-mark", None) | ||
| 416 | |||
| 417 | return result.rowcount | ||
| 418 | |||
| 271 | async def clean_unused(self, oldest): | 419 | async def clean_unused(self, oldest): |
| 272 | statement = delete(OuthashesV2).where( | 420 | statement = delete(OuthashesV2).where( |
| 273 | OuthashesV2.created < oldest, | 421 | OuthashesV2.created < oldest, |
| 274 | ~( | 422 | ~( |
| 275 | select(UnihashesV2.id) | 423 | select(UnihashesV3.id) |
| 276 | .where( | 424 | .where( |
| 277 | UnihashesV2.method == OuthashesV2.method, | 425 | UnihashesV3.method == OuthashesV2.method, |
| 278 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 426 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 279 | ) | 427 | ) |
| 280 | .limit(1) | 428 | .limit(1) |
| 281 | .exists() | 429 | .exists() |
| @@ -287,15 +435,17 @@ class Database(object): | |||
| 287 | return result.rowcount | 435 | return result.rowcount |
| 288 | 436 | ||
| 289 | async def insert_unihash(self, method, taskhash, unihash): | 437 | async def insert_unihash(self, method, taskhash, unihash): |
| 290 | statement = insert(UnihashesV2).values( | ||
| 291 | method=method, | ||
| 292 | taskhash=taskhash, | ||
| 293 | unihash=unihash, | ||
| 294 | ) | ||
| 295 | self.logger.debug("%s", statement) | ||
| 296 | try: | 438 | try: |
| 297 | async with self.db.begin(): | 439 | async with self.db.begin(): |
| 298 | await self.db.execute(statement) | 440 | await self._execute( |
| 441 | insert(UnihashesV3).values( | ||
| 442 | method=method, | ||
| 443 | taskhash=taskhash, | ||
| 444 | unihash=unihash, | ||
| 445 | gc_mark=self._get_config_subquery("gc-mark", ""), | ||
| 446 | ) | ||
| 447 | ) | ||
| 448 | |||
| 299 | return True | 449 | return True |
| 300 | except IntegrityError: | 450 | except IntegrityError: |
| 301 | self.logger.debug( | 451 | self.logger.debug( |
| @@ -418,7 +568,7 @@ class Database(object): | |||
| 418 | 568 | ||
| 419 | async def get_query_columns(self): | 569 | async def get_query_columns(self): |
| 420 | columns = set() | 570 | columns = set() |
| 421 | for table in (UnihashesV2, OuthashesV2): | 571 | for table in (UnihashesV3, OuthashesV2): |
| 422 | for c in table.__table__.columns: | 572 | for c in table.__table__.columns: |
| 423 | if not isinstance(c.type, Text): | 573 | if not isinstance(c.type, Text): |
| 424 | continue | 574 | continue |
