summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/sqlalchemy.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-02-18 15:59:46 -0700
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-02-19 11:58:12 +0000
commit1effd1014d9140905093efe25eeefedb28a10875 (patch)
treeb34fb1d26f020b361d22904695cab6b9a7c1ea50 /bitbake/lib/hashserv/sqlalchemy.py
parent324c9fd666117afb0dd689eaa8551bb02d6a042b (diff)
downloadpoky-1effd1014d9140905093efe25eeefedb28a10875.tar.gz
bitbake: hashserv: Add Unihash Garbage Collection
Adds support for removing unused unihashes from the database. This is done using a "mark and sweep" style of garbage collection where a collection is started by marking which unihashes should be kept in the database, then performing a sweep to remove any unmarked hashes. (Bitbake rev: 433d4a075a1acfbd2a2913061739353a84bb01ed) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/sqlalchemy.py')
-rw-r--r--bitbake/lib/hashserv/sqlalchemy.py226
1 files changed, 188 insertions, 38 deletions
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
index cee04bffb0..89a6b86d9d 100644
--- a/bitbake/lib/hashserv/sqlalchemy.py
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -28,6 +28,7 @@ from sqlalchemy import (
28 delete, 28 delete,
29 update, 29 update,
30 func, 30 func,
31 inspect,
31) 32)
32import sqlalchemy.engine 33import sqlalchemy.engine
33from sqlalchemy.orm import declarative_base 34from sqlalchemy.orm import declarative_base
@@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError
36Base = declarative_base() 37Base = declarative_base()
37 38
38 39
39class UnihashesV2(Base): 40class UnihashesV3(Base):
40 __tablename__ = "unihashes_v2" 41 __tablename__ = "unihashes_v3"
41 id = Column(Integer, primary_key=True, autoincrement=True) 42 id = Column(Integer, primary_key=True, autoincrement=True)
42 method = Column(Text, nullable=False) 43 method = Column(Text, nullable=False)
43 taskhash = Column(Text, nullable=False) 44 taskhash = Column(Text, nullable=False)
44 unihash = Column(Text, nullable=False) 45 unihash = Column(Text, nullable=False)
46 gc_mark = Column(Text, nullable=False)
45 47
46 __table_args__ = ( 48 __table_args__ = (
47 UniqueConstraint("method", "taskhash"), 49 UniqueConstraint("method", "taskhash"),
48 Index("taskhash_lookup_v3", "method", "taskhash"), 50 Index("taskhash_lookup_v4", "method", "taskhash"),
49 ) 51 )
50 52
51 53
@@ -79,6 +81,36 @@ class Users(Base):
79 __table_args__ = (UniqueConstraint("username"),) 81 __table_args__ = (UniqueConstraint("username"),)
80 82
81 83
84class Config(Base):
85 __tablename__ = "config"
86 id = Column(Integer, primary_key=True, autoincrement=True)
87 name = Column(Text, nullable=False)
88 value = Column(Text)
89 __table_args__ = (
90 UniqueConstraint("name"),
91 Index("config_lookup", "name"),
92 )
93
94
95#
96# Old table versions
97#
98DeprecatedBase = declarative_base()
99
100
101class UnihashesV2(DeprecatedBase):
102 __tablename__ = "unihashes_v2"
103 id = Column(Integer, primary_key=True, autoincrement=True)
104 method = Column(Text, nullable=False)
105 taskhash = Column(Text, nullable=False)
106 unihash = Column(Text, nullable=False)
107
108 __table_args__ = (
109 UniqueConstraint("method", "taskhash"),
110 Index("taskhash_lookup_v3", "method", "taskhash"),
111 )
112
113
82class DatabaseEngine(object): 114class DatabaseEngine(object):
83 def __init__(self, url, username=None, password=None): 115 def __init__(self, url, username=None, password=None):
84 self.logger = logging.getLogger("hashserv.sqlalchemy") 116 self.logger = logging.getLogger("hashserv.sqlalchemy")
@@ -91,6 +123,9 @@ class DatabaseEngine(object):
91 self.url = self.url.set(password=password) 123 self.url = self.url.set(password=password)
92 124
93 async def create(self): 125 async def create(self):
126 def check_table_exists(conn, name):
127 return inspect(conn).has_table(name)
128
94 self.logger.info("Using database %s", self.url) 129 self.logger.info("Using database %s", self.url)
95 self.engine = create_async_engine(self.url, poolclass=NullPool) 130 self.engine = create_async_engine(self.url, poolclass=NullPool)
96 131
@@ -99,6 +134,24 @@ class DatabaseEngine(object):
99 self.logger.info("Creating tables...") 134 self.logger.info("Creating tables...")
100 await conn.run_sync(Base.metadata.create_all) 135 await conn.run_sync(Base.metadata.create_all)
101 136
137 if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__):
138 self.logger.info("Upgrading Unihashes V2 -> V3...")
139 statement = insert(UnihashesV3).from_select(
140 ["id", "method", "unihash", "taskhash", "gc_mark"],
141 select(
142 UnihashesV2.id,
143 UnihashesV2.method,
144 UnihashesV2.unihash,
145 UnihashesV2.taskhash,
146 literal("").label("gc_mark"),
147 ),
148 )
149 self.logger.debug("%s", statement)
150 await conn.execute(statement)
151
152 await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__])
153 self.logger.info("Upgrade complete")
154
102 def connect(self, logger): 155 def connect(self, logger):
103 return Database(self.engine, logger) 156 return Database(self.engine, logger)
104 157
@@ -118,6 +171,15 @@ def map_user(row):
118 ) 171 )
119 172
120 173
174def _make_condition_statement(table, condition):
175 where = {}
176 for c in table.__table__.columns:
177 if c.key in condition and condition[c.key] is not None:
178 where[c] = condition[c.key]
179
180 return [(k == v) for k, v in where.items()]
181
182
121class Database(object): 183class Database(object):
122 def __init__(self, engine, logger): 184 def __init__(self, engine, logger):
123 self.engine = engine 185 self.engine = engine
@@ -135,17 +197,52 @@ class Database(object):
135 await self.db.close() 197 await self.db.close()
136 self.db = None 198 self.db = None
137 199
200 async def _execute(self, statement):
201 self.logger.debug("%s", statement)
202 return await self.db.execute(statement)
203
204 async def _set_config(self, name, value):
205 while True:
206 result = await self._execute(
207 update(Config).where(Config.name == name).values(value=value)
208 )
209
210 if result.rowcount == 0:
211 self.logger.debug("Config '%s' not found. Adding it", name)
212 try:
213 await self._execute(insert(Config).values(name=name, value=value))
214 except IntegrityError:
215 # Race. Try again
216 continue
217
218 break
219
220 def _get_config_subquery(self, name, default=None):
221 if default is not None:
222 return func.coalesce(
223 select(Config.value).where(Config.name == name).scalar_subquery(),
224 default,
225 )
226 return select(Config.value).where(Config.name == name).scalar_subquery()
227
228 async def _get_config(self, name):
229 result = await self._execute(select(Config.value).where(Config.name == name))
230 row = result.first()
231 if row is None:
232 return None
233 return row.value
234
138 async def get_unihash_by_taskhash_full(self, method, taskhash): 235 async def get_unihash_by_taskhash_full(self, method, taskhash):
139 statement = ( 236 statement = (
140 select( 237 select(
141 OuthashesV2, 238 OuthashesV2,
142 UnihashesV2.unihash.label("unihash"), 239 UnihashesV3.unihash.label("unihash"),
143 ) 240 )
144 .join( 241 .join(
145 UnihashesV2, 242 UnihashesV3,
146 and_( 243 and_(
147 UnihashesV2.method == OuthashesV2.method, 244 UnihashesV3.method == OuthashesV2.method,
148 UnihashesV2.taskhash == OuthashesV2.taskhash, 245 UnihashesV3.taskhash == OuthashesV2.taskhash,
149 ), 246 ),
150 ) 247 )
151 .where( 248 .where(
@@ -164,12 +261,12 @@ class Database(object):
164 261
165 async def get_unihash_by_outhash(self, method, outhash): 262 async def get_unihash_by_outhash(self, method, outhash):
166 statement = ( 263 statement = (
167 select(OuthashesV2, UnihashesV2.unihash.label("unihash")) 264 select(OuthashesV2, UnihashesV3.unihash.label("unihash"))
168 .join( 265 .join(
169 UnihashesV2, 266 UnihashesV3,
170 and_( 267 and_(
171 UnihashesV2.method == OuthashesV2.method, 268 UnihashesV3.method == OuthashesV2.method,
172 UnihashesV2.taskhash == OuthashesV2.taskhash, 269 UnihashesV3.taskhash == OuthashesV2.taskhash,
173 ), 270 ),
174 ) 271 )
175 .where( 272 .where(
@@ -208,13 +305,13 @@ class Database(object):
208 statement = ( 305 statement = (
209 select( 306 select(
210 OuthashesV2.taskhash.label("taskhash"), 307 OuthashesV2.taskhash.label("taskhash"),
211 UnihashesV2.unihash.label("unihash"), 308 UnihashesV3.unihash.label("unihash"),
212 ) 309 )
213 .join( 310 .join(
214 UnihashesV2, 311 UnihashesV3,
215 and_( 312 and_(
216 UnihashesV2.method == OuthashesV2.method, 313 UnihashesV3.method == OuthashesV2.method,
217 UnihashesV2.taskhash == OuthashesV2.taskhash, 314 UnihashesV3.taskhash == OuthashesV2.taskhash,
218 ), 315 ),
219 ) 316 )
220 .where( 317 .where(
@@ -234,12 +331,12 @@ class Database(object):
234 331
235 async def get_equivalent(self, method, taskhash): 332 async def get_equivalent(self, method, taskhash):
236 statement = select( 333 statement = select(
237 UnihashesV2.unihash, 334 UnihashesV3.unihash,
238 UnihashesV2.method, 335 UnihashesV3.method,
239 UnihashesV2.taskhash, 336 UnihashesV3.taskhash,
240 ).where( 337 ).where(
241 UnihashesV2.method == method, 338 UnihashesV3.method == method,
242 UnihashesV2.taskhash == taskhash, 339 UnihashesV3.taskhash == taskhash,
243 ) 340 )
244 self.logger.debug("%s", statement) 341 self.logger.debug("%s", statement)
245 async with self.db.begin(): 342 async with self.db.begin():
@@ -248,13 +345,9 @@ class Database(object):
248 345
249 async def remove(self, condition): 346 async def remove(self, condition):
250 async def do_remove(table): 347 async def do_remove(table):
251 where = {} 348 where = _make_condition_statement(table, condition)
252 for c in table.__table__.columns:
253 if c.key in condition and condition[c.key] is not None:
254 where[c] = condition[c.key]
255
256 if where: 349 if where:
257 statement = delete(table).where(*[(k == v) for k, v in where.items()]) 350 statement = delete(table).where(*where)
258 self.logger.debug("%s", statement) 351 self.logger.debug("%s", statement)
259 async with self.db.begin(): 352 async with self.db.begin():
260 result = await self.db.execute(statement) 353 result = await self.db.execute(statement)
@@ -263,19 +356,74 @@ class Database(object):
263 return 0 356 return 0
264 357
265 count = 0 358 count = 0
266 count += await do_remove(UnihashesV2) 359 count += await do_remove(UnihashesV3)
267 count += await do_remove(OuthashesV2) 360 count += await do_remove(OuthashesV2)
268 361
269 return count 362 return count
270 363
364 async def get_current_gc_mark(self):
365 async with self.db.begin():
366 return await self._get_config("gc-mark")
367
368 async def gc_status(self):
369 async with self.db.begin():
370 gc_mark_subquery = self._get_config_subquery("gc-mark", "")
371
372 result = await self._execute(
373 select(func.count())
374 .select_from(UnihashesV3)
375 .where(UnihashesV3.gc_mark == gc_mark_subquery)
376 )
377 keep_rows = result.scalar()
378
379 result = await self._execute(
380 select(func.count())
381 .select_from(UnihashesV3)
382 .where(UnihashesV3.gc_mark != gc_mark_subquery)
383 )
384 remove_rows = result.scalar()
385
386 return (keep_rows, remove_rows, await self._get_config("gc-mark"))
387
388 async def gc_mark(self, mark, condition):
389 async with self.db.begin():
390 await self._set_config("gc-mark", mark)
391
392 where = _make_condition_statement(UnihashesV3, condition)
393 if not where:
394 return 0
395
396 result = await self._execute(
397 update(UnihashesV3)
398 .values(gc_mark=self._get_config_subquery("gc-mark", ""))
399 .where(*where)
400 )
401 return result.rowcount
402
403 async def gc_sweep(self):
404 async with self.db.begin():
405 result = await self._execute(
406 delete(UnihashesV3).where(
407 # A sneaky conditional that provides some errant use
408 # protection: If the config mark is NULL, this will not
409 # match any rows because No default is specified in the
410 # select statement
411 UnihashesV3.gc_mark
412 != self._get_config_subquery("gc-mark")
413 )
414 )
415 await self._set_config("gc-mark", None)
416
417 return result.rowcount
418
271 async def clean_unused(self, oldest): 419 async def clean_unused(self, oldest):
272 statement = delete(OuthashesV2).where( 420 statement = delete(OuthashesV2).where(
273 OuthashesV2.created < oldest, 421 OuthashesV2.created < oldest,
274 ~( 422 ~(
275 select(UnihashesV2.id) 423 select(UnihashesV3.id)
276 .where( 424 .where(
277 UnihashesV2.method == OuthashesV2.method, 425 UnihashesV3.method == OuthashesV2.method,
278 UnihashesV2.taskhash == OuthashesV2.taskhash, 426 UnihashesV3.taskhash == OuthashesV2.taskhash,
279 ) 427 )
280 .limit(1) 428 .limit(1)
281 .exists() 429 .exists()
@@ -287,15 +435,17 @@ class Database(object):
287 return result.rowcount 435 return result.rowcount
288 436
289 async def insert_unihash(self, method, taskhash, unihash): 437 async def insert_unihash(self, method, taskhash, unihash):
290 statement = insert(UnihashesV2).values(
291 method=method,
292 taskhash=taskhash,
293 unihash=unihash,
294 )
295 self.logger.debug("%s", statement)
296 try: 438 try:
297 async with self.db.begin(): 439 async with self.db.begin():
298 await self.db.execute(statement) 440 await self._execute(
441 insert(UnihashesV3).values(
442 method=method,
443 taskhash=taskhash,
444 unihash=unihash,
445 gc_mark=self._get_config_subquery("gc-mark", ""),
446 )
447 )
448
299 return True 449 return True
300 except IntegrityError: 450 except IntegrityError:
301 self.logger.debug( 451 self.logger.debug(
@@ -418,7 +568,7 @@ class Database(object):
418 568
419 async def get_query_columns(self): 569 async def get_query_columns(self):
420 columns = set() 570 columns = set()
421 for table in (UnihashesV2, OuthashesV2): 571 for table in (UnihashesV3, OuthashesV2):
422 for c in table.__table__.columns: 572 for c in table.__table__.columns:
423 if not isinstance(c.type, Text): 573 if not isinstance(c.type, Text):
424 continue 574 continue