diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-02-18 15:59:46 -0700 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-02-19 11:58:12 +0000 |
commit | 1effd1014d9140905093efe25eeefedb28a10875 (patch) | |
tree | b34fb1d26f020b361d22904695cab6b9a7c1ea50 /bitbake/lib/hashserv/sqlalchemy.py | |
parent | 324c9fd666117afb0dd689eaa8551bb02d6a042b (diff) | |
download | poky-1effd1014d9140905093efe25eeefedb28a10875.tar.gz |
bitbake: hashserv: Add Unihash Garbage Collection
Adds support for removing unused unihashes from the database. This is
done using a "mark and sweep" style of garbage collection where a
collection is started by marking which unihashes should be kept in the
database, then performing a sweep to remove any unmarked hashes.
(Bitbake rev: 433d4a075a1acfbd2a2913061739353a84bb01ed)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/sqlalchemy.py')
-rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 226 |
1 files changed, 188 insertions, 38 deletions
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index cee04bffb0..89a6b86d9d 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
@@ -28,6 +28,7 @@ from sqlalchemy import ( | |||
28 | delete, | 28 | delete, |
29 | update, | 29 | update, |
30 | func, | 30 | func, |
31 | inspect, | ||
31 | ) | 32 | ) |
32 | import sqlalchemy.engine | 33 | import sqlalchemy.engine |
33 | from sqlalchemy.orm import declarative_base | 34 | from sqlalchemy.orm import declarative_base |
@@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError | |||
36 | Base = declarative_base() | 37 | Base = declarative_base() |
37 | 38 | ||
38 | 39 | ||
39 | class UnihashesV2(Base): | 40 | class UnihashesV3(Base): |
40 | __tablename__ = "unihashes_v2" | 41 | __tablename__ = "unihashes_v3" |
41 | id = Column(Integer, primary_key=True, autoincrement=True) | 42 | id = Column(Integer, primary_key=True, autoincrement=True) |
42 | method = Column(Text, nullable=False) | 43 | method = Column(Text, nullable=False) |
43 | taskhash = Column(Text, nullable=False) | 44 | taskhash = Column(Text, nullable=False) |
44 | unihash = Column(Text, nullable=False) | 45 | unihash = Column(Text, nullable=False) |
46 | gc_mark = Column(Text, nullable=False) | ||
45 | 47 | ||
46 | __table_args__ = ( | 48 | __table_args__ = ( |
47 | UniqueConstraint("method", "taskhash"), | 49 | UniqueConstraint("method", "taskhash"), |
48 | Index("taskhash_lookup_v3", "method", "taskhash"), | 50 | Index("taskhash_lookup_v4", "method", "taskhash"), |
49 | ) | 51 | ) |
50 | 52 | ||
51 | 53 | ||
@@ -79,6 +81,36 @@ class Users(Base): | |||
79 | __table_args__ = (UniqueConstraint("username"),) | 81 | __table_args__ = (UniqueConstraint("username"),) |
80 | 82 | ||
81 | 83 | ||
84 | class Config(Base): | ||
85 | __tablename__ = "config" | ||
86 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
87 | name = Column(Text, nullable=False) | ||
88 | value = Column(Text) | ||
89 | __table_args__ = ( | ||
90 | UniqueConstraint("name"), | ||
91 | Index("config_lookup", "name"), | ||
92 | ) | ||
93 | |||
94 | |||
95 | # | ||
96 | # Old table versions | ||
97 | # | ||
98 | DeprecatedBase = declarative_base() | ||
99 | |||
100 | |||
101 | class UnihashesV2(DeprecatedBase): | ||
102 | __tablename__ = "unihashes_v2" | ||
103 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
104 | method = Column(Text, nullable=False) | ||
105 | taskhash = Column(Text, nullable=False) | ||
106 | unihash = Column(Text, nullable=False) | ||
107 | |||
108 | __table_args__ = ( | ||
109 | UniqueConstraint("method", "taskhash"), | ||
110 | Index("taskhash_lookup_v3", "method", "taskhash"), | ||
111 | ) | ||
112 | |||
113 | |||
82 | class DatabaseEngine(object): | 114 | class DatabaseEngine(object): |
83 | def __init__(self, url, username=None, password=None): | 115 | def __init__(self, url, username=None, password=None): |
84 | self.logger = logging.getLogger("hashserv.sqlalchemy") | 116 | self.logger = logging.getLogger("hashserv.sqlalchemy") |
@@ -91,6 +123,9 @@ class DatabaseEngine(object): | |||
91 | self.url = self.url.set(password=password) | 123 | self.url = self.url.set(password=password) |
92 | 124 | ||
93 | async def create(self): | 125 | async def create(self): |
126 | def check_table_exists(conn, name): | ||
127 | return inspect(conn).has_table(name) | ||
128 | |||
94 | self.logger.info("Using database %s", self.url) | 129 | self.logger.info("Using database %s", self.url) |
95 | self.engine = create_async_engine(self.url, poolclass=NullPool) | 130 | self.engine = create_async_engine(self.url, poolclass=NullPool) |
96 | 131 | ||
@@ -99,6 +134,24 @@ class DatabaseEngine(object): | |||
99 | self.logger.info("Creating tables...") | 134 | self.logger.info("Creating tables...") |
100 | await conn.run_sync(Base.metadata.create_all) | 135 | await conn.run_sync(Base.metadata.create_all) |
101 | 136 | ||
137 | if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__): | ||
138 | self.logger.info("Upgrading Unihashes V2 -> V3...") | ||
139 | statement = insert(UnihashesV3).from_select( | ||
140 | ["id", "method", "unihash", "taskhash", "gc_mark"], | ||
141 | select( | ||
142 | UnihashesV2.id, | ||
143 | UnihashesV2.method, | ||
144 | UnihashesV2.unihash, | ||
145 | UnihashesV2.taskhash, | ||
146 | literal("").label("gc_mark"), | ||
147 | ), | ||
148 | ) | ||
149 | self.logger.debug("%s", statement) | ||
150 | await conn.execute(statement) | ||
151 | |||
152 | await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__]) | ||
153 | self.logger.info("Upgrade complete") | ||
154 | |||
102 | def connect(self, logger): | 155 | def connect(self, logger): |
103 | return Database(self.engine, logger) | 156 | return Database(self.engine, logger) |
104 | 157 | ||
@@ -118,6 +171,15 @@ def map_user(row): | |||
118 | ) | 171 | ) |
119 | 172 | ||
120 | 173 | ||
174 | def _make_condition_statement(table, condition): | ||
175 | where = {} | ||
176 | for c in table.__table__.columns: | ||
177 | if c.key in condition and condition[c.key] is not None: | ||
178 | where[c] = condition[c.key] | ||
179 | |||
180 | return [(k == v) for k, v in where.items()] | ||
181 | |||
182 | |||
121 | class Database(object): | 183 | class Database(object): |
122 | def __init__(self, engine, logger): | 184 | def __init__(self, engine, logger): |
123 | self.engine = engine | 185 | self.engine = engine |
@@ -135,17 +197,52 @@ class Database(object): | |||
135 | await self.db.close() | 197 | await self.db.close() |
136 | self.db = None | 198 | self.db = None |
137 | 199 | ||
200 | async def _execute(self, statement): | ||
201 | self.logger.debug("%s", statement) | ||
202 | return await self.db.execute(statement) | ||
203 | |||
204 | async def _set_config(self, name, value): | ||
205 | while True: | ||
206 | result = await self._execute( | ||
207 | update(Config).where(Config.name == name).values(value=value) | ||
208 | ) | ||
209 | |||
210 | if result.rowcount == 0: | ||
211 | self.logger.debug("Config '%s' not found. Adding it", name) | ||
212 | try: | ||
213 | await self._execute(insert(Config).values(name=name, value=value)) | ||
214 | except IntegrityError: | ||
215 | # Race. Try again | ||
216 | continue | ||
217 | |||
218 | break | ||
219 | |||
220 | def _get_config_subquery(self, name, default=None): | ||
221 | if default is not None: | ||
222 | return func.coalesce( | ||
223 | select(Config.value).where(Config.name == name).scalar_subquery(), | ||
224 | default, | ||
225 | ) | ||
226 | return select(Config.value).where(Config.name == name).scalar_subquery() | ||
227 | |||
228 | async def _get_config(self, name): | ||
229 | result = await self._execute(select(Config.value).where(Config.name == name)) | ||
230 | row = result.first() | ||
231 | if row is None: | ||
232 | return None | ||
233 | return row.value | ||
234 | |||
138 | async def get_unihash_by_taskhash_full(self, method, taskhash): | 235 | async def get_unihash_by_taskhash_full(self, method, taskhash): |
139 | statement = ( | 236 | statement = ( |
140 | select( | 237 | select( |
141 | OuthashesV2, | 238 | OuthashesV2, |
142 | UnihashesV2.unihash.label("unihash"), | 239 | UnihashesV3.unihash.label("unihash"), |
143 | ) | 240 | ) |
144 | .join( | 241 | .join( |
145 | UnihashesV2, | 242 | UnihashesV3, |
146 | and_( | 243 | and_( |
147 | UnihashesV2.method == OuthashesV2.method, | 244 | UnihashesV3.method == OuthashesV2.method, |
148 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 245 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
149 | ), | 246 | ), |
150 | ) | 247 | ) |
151 | .where( | 248 | .where( |
@@ -164,12 +261,12 @@ class Database(object): | |||
164 | 261 | ||
165 | async def get_unihash_by_outhash(self, method, outhash): | 262 | async def get_unihash_by_outhash(self, method, outhash): |
166 | statement = ( | 263 | statement = ( |
167 | select(OuthashesV2, UnihashesV2.unihash.label("unihash")) | 264 | select(OuthashesV2, UnihashesV3.unihash.label("unihash")) |
168 | .join( | 265 | .join( |
169 | UnihashesV2, | 266 | UnihashesV3, |
170 | and_( | 267 | and_( |
171 | UnihashesV2.method == OuthashesV2.method, | 268 | UnihashesV3.method == OuthashesV2.method, |
172 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 269 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
173 | ), | 270 | ), |
174 | ) | 271 | ) |
175 | .where( | 272 | .where( |
@@ -208,13 +305,13 @@ class Database(object): | |||
208 | statement = ( | 305 | statement = ( |
209 | select( | 306 | select( |
210 | OuthashesV2.taskhash.label("taskhash"), | 307 | OuthashesV2.taskhash.label("taskhash"), |
211 | UnihashesV2.unihash.label("unihash"), | 308 | UnihashesV3.unihash.label("unihash"), |
212 | ) | 309 | ) |
213 | .join( | 310 | .join( |
214 | UnihashesV2, | 311 | UnihashesV3, |
215 | and_( | 312 | and_( |
216 | UnihashesV2.method == OuthashesV2.method, | 313 | UnihashesV3.method == OuthashesV2.method, |
217 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 314 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
218 | ), | 315 | ), |
219 | ) | 316 | ) |
220 | .where( | 317 | .where( |
@@ -234,12 +331,12 @@ class Database(object): | |||
234 | 331 | ||
235 | async def get_equivalent(self, method, taskhash): | 332 | async def get_equivalent(self, method, taskhash): |
236 | statement = select( | 333 | statement = select( |
237 | UnihashesV2.unihash, | 334 | UnihashesV3.unihash, |
238 | UnihashesV2.method, | 335 | UnihashesV3.method, |
239 | UnihashesV2.taskhash, | 336 | UnihashesV3.taskhash, |
240 | ).where( | 337 | ).where( |
241 | UnihashesV2.method == method, | 338 | UnihashesV3.method == method, |
242 | UnihashesV2.taskhash == taskhash, | 339 | UnihashesV3.taskhash == taskhash, |
243 | ) | 340 | ) |
244 | self.logger.debug("%s", statement) | 341 | self.logger.debug("%s", statement) |
245 | async with self.db.begin(): | 342 | async with self.db.begin(): |
@@ -248,13 +345,9 @@ class Database(object): | |||
248 | 345 | ||
249 | async def remove(self, condition): | 346 | async def remove(self, condition): |
250 | async def do_remove(table): | 347 | async def do_remove(table): |
251 | where = {} | 348 | where = _make_condition_statement(table, condition) |
252 | for c in table.__table__.columns: | ||
253 | if c.key in condition and condition[c.key] is not None: | ||
254 | where[c] = condition[c.key] | ||
255 | |||
256 | if where: | 349 | if where: |
257 | statement = delete(table).where(*[(k == v) for k, v in where.items()]) | 350 | statement = delete(table).where(*where) |
258 | self.logger.debug("%s", statement) | 351 | self.logger.debug("%s", statement) |
259 | async with self.db.begin(): | 352 | async with self.db.begin(): |
260 | result = await self.db.execute(statement) | 353 | result = await self.db.execute(statement) |
@@ -263,19 +356,74 @@ class Database(object): | |||
263 | return 0 | 356 | return 0 |
264 | 357 | ||
265 | count = 0 | 358 | count = 0 |
266 | count += await do_remove(UnihashesV2) | 359 | count += await do_remove(UnihashesV3) |
267 | count += await do_remove(OuthashesV2) | 360 | count += await do_remove(OuthashesV2) |
268 | 361 | ||
269 | return count | 362 | return count |
270 | 363 | ||
364 | async def get_current_gc_mark(self): | ||
365 | async with self.db.begin(): | ||
366 | return await self._get_config("gc-mark") | ||
367 | |||
368 | async def gc_status(self): | ||
369 | async with self.db.begin(): | ||
370 | gc_mark_subquery = self._get_config_subquery("gc-mark", "") | ||
371 | |||
372 | result = await self._execute( | ||
373 | select(func.count()) | ||
374 | .select_from(UnihashesV3) | ||
375 | .where(UnihashesV3.gc_mark == gc_mark_subquery) | ||
376 | ) | ||
377 | keep_rows = result.scalar() | ||
378 | |||
379 | result = await self._execute( | ||
380 | select(func.count()) | ||
381 | .select_from(UnihashesV3) | ||
382 | .where(UnihashesV3.gc_mark != gc_mark_subquery) | ||
383 | ) | ||
384 | remove_rows = result.scalar() | ||
385 | |||
386 | return (keep_rows, remove_rows, await self._get_config("gc-mark")) | ||
387 | |||
388 | async def gc_mark(self, mark, condition): | ||
389 | async with self.db.begin(): | ||
390 | await self._set_config("gc-mark", mark) | ||
391 | |||
392 | where = _make_condition_statement(UnihashesV3, condition) | ||
393 | if not where: | ||
394 | return 0 | ||
395 | |||
396 | result = await self._execute( | ||
397 | update(UnihashesV3) | ||
398 | .values(gc_mark=self._get_config_subquery("gc-mark", "")) | ||
399 | .where(*where) | ||
400 | ) | ||
401 | return result.rowcount | ||
402 | |||
403 | async def gc_sweep(self): | ||
404 | async with self.db.begin(): | ||
405 | result = await self._execute( | ||
406 | delete(UnihashesV3).where( | ||
407 | # A sneaky conditional that provides some errant use | ||
408 | # protection: If the config mark is NULL, this will not | ||
409 | # match any rows because No default is specified in the | ||
410 | # select statement | ||
411 | UnihashesV3.gc_mark | ||
412 | != self._get_config_subquery("gc-mark") | ||
413 | ) | ||
414 | ) | ||
415 | await self._set_config("gc-mark", None) | ||
416 | |||
417 | return result.rowcount | ||
418 | |||
271 | async def clean_unused(self, oldest): | 419 | async def clean_unused(self, oldest): |
272 | statement = delete(OuthashesV2).where( | 420 | statement = delete(OuthashesV2).where( |
273 | OuthashesV2.created < oldest, | 421 | OuthashesV2.created < oldest, |
274 | ~( | 422 | ~( |
275 | select(UnihashesV2.id) | 423 | select(UnihashesV3.id) |
276 | .where( | 424 | .where( |
277 | UnihashesV2.method == OuthashesV2.method, | 425 | UnihashesV3.method == OuthashesV2.method, |
278 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 426 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
279 | ) | 427 | ) |
280 | .limit(1) | 428 | .limit(1) |
281 | .exists() | 429 | .exists() |
@@ -287,15 +435,17 @@ class Database(object): | |||
287 | return result.rowcount | 435 | return result.rowcount |
288 | 436 | ||
289 | async def insert_unihash(self, method, taskhash, unihash): | 437 | async def insert_unihash(self, method, taskhash, unihash): |
290 | statement = insert(UnihashesV2).values( | ||
291 | method=method, | ||
292 | taskhash=taskhash, | ||
293 | unihash=unihash, | ||
294 | ) | ||
295 | self.logger.debug("%s", statement) | ||
296 | try: | 438 | try: |
297 | async with self.db.begin(): | 439 | async with self.db.begin(): |
298 | await self.db.execute(statement) | 440 | await self._execute( |
441 | insert(UnihashesV3).values( | ||
442 | method=method, | ||
443 | taskhash=taskhash, | ||
444 | unihash=unihash, | ||
445 | gc_mark=self._get_config_subquery("gc-mark", ""), | ||
446 | ) | ||
447 | ) | ||
448 | |||
299 | return True | 449 | return True |
300 | except IntegrityError: | 450 | except IntegrityError: |
301 | self.logger.debug( | 451 | self.logger.debug( |
@@ -418,7 +568,7 @@ class Database(object): | |||
418 | 568 | ||
419 | async def get_query_columns(self): | 569 | async def get_query_columns(self): |
420 | columns = set() | 570 | columns = set() |
421 | for table in (UnihashesV2, OuthashesV2): | 571 | for table in (UnihashesV3, OuthashesV2): |
422 | for c in table.__table__.columns: | 572 | for c in table.__table__.columns: |
423 | if not isinstance(c.type, Text): | 573 | if not isinstance(c.type, Text): |
424 | continue | 574 | continue |