summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-02-18 15:59:46 -0700
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-02-19 11:58:12 +0000
commit1effd1014d9140905093efe25eeefedb28a10875 (patch)
treeb34fb1d26f020b361d22904695cab6b9a7c1ea50
parent324c9fd666117afb0dd689eaa8551bb02d6a042b (diff)
downloadpoky-1effd1014d9140905093efe25eeefedb28a10875.tar.gz
bitbake: hashserv: Add Unihash Garbage Collection
Adds support for removing unused unihashes from the database. This is done using a "mark and sweep" style of garbage collection where a collection is started by marking which unihashes should be kept in the database, then performing a sweep to remove any unmarked hashes. (Bitbake rev: 433d4a075a1acfbd2a2913061739353a84bb01ed) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-xbitbake/bin/bitbake-hashclient35
-rw-r--r--bitbake/lib/hashserv/client.py31
-rw-r--r--bitbake/lib/hashserv/server.py105
-rw-r--r--bitbake/lib/hashserv/sqlalchemy.py226
-rw-r--r--bitbake/lib/hashserv/sqlite.py205
-rw-r--r--bitbake/lib/hashserv/tests.py198
6 files changed, 684 insertions, 116 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient
index 2cb6338666..f71b87404a 100755
--- a/bitbake/bin/bitbake-hashclient
+++ b/bitbake/bin/bitbake-hashclient
@@ -195,6 +195,28 @@ def main():
195 columns = client.get_db_query_columns() 195 columns = client.get_db_query_columns()
196 print("\n".join(sorted(columns))) 196 print("\n".join(sorted(columns)))
197 197
198 def handle_gc_status(args, client):
199 result = client.gc_status()
200 if not result["mark"]:
201 print("No Garbage collection in progress")
202 return 0
203
204 print("Current Mark: %s" % result["mark"])
205 print("Total hashes to keep: %d" % result["keep"])
206 print("Total hashes to remove: %s" % result["remove"])
207 return 0
208
209 def handle_gc_mark(args, client):
210 where = {k: v for k, v in args.where}
211 result = client.gc_mark(args.mark, where)
212 print("New hashes marked: %d" % result["count"])
213 return 0
214
215 def handle_gc_sweep(args, client):
216 result = client.gc_sweep(args.mark)
217 print("Removed %d rows" % result["count"])
218 return 0
219
198 parser = argparse.ArgumentParser(description='Hash Equivalence Client') 220 parser = argparse.ArgumentParser(description='Hash Equivalence Client')
199 parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') 221 parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")')
200 parser.add_argument('--log', default='WARNING', help='Set logging level') 222 parser.add_argument('--log', default='WARNING', help='Set logging level')
@@ -274,6 +296,19 @@ def main():
274 db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") 296 db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries")
275 db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) 297 db_query_columns_parser.set_defaults(func=handle_get_db_query_columns)
276 298
299 gc_status_parser = subparsers.add_parser("gc-status", help="Show garbage collection status")
300 gc_status_parser.set_defaults(func=handle_gc_status)
301
302 gc_mark_parser = subparsers.add_parser('gc-mark', help="Mark hashes to be kept for garbage collection")
303 gc_mark_parser.add_argument("mark", help="Mark for this garbage collection operation")
304 gc_mark_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[],
305 help="Keep entries in table where KEY == VALUE")
306 gc_mark_parser.set_defaults(func=handle_gc_mark)
307
308 gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked")
309 gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation")
310 gc_sweep_parser.set_defaults(func=handle_gc_sweep)
311
277 args = parser.parse_args() 312 args = parser.parse_args()
278 313
279 logger = logging.getLogger('hashserv') 314 logger = logging.getLogger('hashserv')
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 35a97687fb..e6dc417912 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -194,6 +194,34 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
194 await self._set_mode(self.MODE_NORMAL) 194 await self._set_mode(self.MODE_NORMAL)
195 return (await self.invoke({"get-db-query-columns": {}}))["columns"] 195 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
196 196
197 async def gc_status(self):
198 await self._set_mode(self.MODE_NORMAL)
199 return await self.invoke({"gc-status": {}})
200
201 async def gc_mark(self, mark, where):
202 """
203 Starts a new garbage collection operation identified by "mark". If
204 garbage collection is already in progress with "mark", the collection
205 is continued.
206
207 All unihash entries that match the "where" clause are marked to be
208 kept. In addition, any new entries added to the database after this
209 command will be automatically marked with "mark"
210 """
211 await self._set_mode(self.MODE_NORMAL)
212 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
213
214 async def gc_sweep(self, mark):
215 """
216 Finishes garbage collection for "mark". All unihash entries that have
217 not been marked will be deleted.
218
219 It is recommended to clean unused outhash entries after running this to
220 cleanup any dangling outhashes
221 """
222 await self._set_mode(self.MODE_NORMAL)
223 return await self.invoke({"gc-sweep": {"mark": mark}})
224
197 225
198class Client(bb.asyncrpc.Client): 226class Client(bb.asyncrpc.Client):
199 def __init__(self, username=None, password=None): 227 def __init__(self, username=None, password=None):
@@ -224,6 +252,9 @@ class Client(bb.asyncrpc.Client):
224 "become_user", 252 "become_user",
225 "get_db_usage", 253 "get_db_usage",
226 "get_db_query_columns", 254 "get_db_query_columns",
255 "gc_status",
256 "gc_mark",
257 "gc_sweep",
227 ) 258 )
228 259
229 def _get_async_client(self): 260 def _get_async_client(self):
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index a86507830e..5ed852d1f3 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False):
199 if not self.user_has_permissions(*permissions, allow_anon=allow_anon): 199 if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
200 if not self.user: 200 if not self.user:
201 username = "Anonymous user" 201 username = "Anonymous user"
202 user_perms = self.anon_perms 202 user_perms = self.server.anon_perms
203 else: 203 else:
204 username = self.user.username 204 username = self.user.username
205 user_perms = self.user.permissions 205 user_perms = self.user.permissions
@@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False):
223 223
224 224
225class ServerClient(bb.asyncrpc.AsyncServerConnection): 225class ServerClient(bb.asyncrpc.AsyncServerConnection):
226 def __init__( 226 def __init__(self, socket, server):
227 self, 227 super().__init__(socket, "OEHASHEQUIV", server.logger)
228 socket, 228 self.server = server
229 db_engine,
230 request_stats,
231 backfill_queue,
232 upstream,
233 read_only,
234 anon_perms,
235 ):
236 super().__init__(socket, "OEHASHEQUIV", logger)
237 self.db_engine = db_engine
238 self.request_stats = request_stats
239 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 229 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
240 self.backfill_queue = backfill_queue
241 self.upstream = upstream
242 self.read_only = read_only
243 self.user = None 230 self.user = None
244 self.anon_perms = anon_perms
245 231
246 self.handlers.update( 232 self.handlers.update(
247 { 233 {
@@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
261 } 247 }
262 ) 248 )
263 249
264 if not read_only: 250 if not self.server.read_only:
265 self.handlers.update( 251 self.handlers.update(
266 { 252 {
267 "report-equiv": self.handle_equivreport, 253 "report-equiv": self.handle_equivreport,
268 "reset-stats": self.handle_reset_stats, 254 "reset-stats": self.handle_reset_stats,
269 "backfill-wait": self.handle_backfill_wait, 255 "backfill-wait": self.handle_backfill_wait,
270 "remove": self.handle_remove, 256 "remove": self.handle_remove,
257 "gc-mark": self.handle_gc_mark,
258 "gc-sweep": self.handle_gc_sweep,
259 "gc-status": self.handle_gc_status,
271 "clean-unused": self.handle_clean_unused, 260 "clean-unused": self.handle_clean_unused,
272 "refresh-token": self.handle_refresh_token, 261 "refresh-token": self.handle_refresh_token,
273 "set-user-perms": self.handle_set_perms, 262 "set-user-perms": self.handle_set_perms,
@@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
282 def user_has_permissions(self, *permissions, allow_anon=True): 271 def user_has_permissions(self, *permissions, allow_anon=True):
283 permissions = set(permissions) 272 permissions = set(permissions)
284 if allow_anon: 273 if allow_anon:
285 if ALL_PERM in self.anon_perms: 274 if ALL_PERM in self.server.anon_perms:
286 return True 275 return True
287 276
288 if not permissions - self.anon_perms: 277 if not permissions - self.server.anon_perms:
289 return True 278 return True
290 279
291 if self.user is None: 280 if self.user is None:
@@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
303 return self.proto_version > (1, 0) and self.proto_version <= (1, 1) 292 return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
304 293
305 async def process_requests(self): 294 async def process_requests(self):
306 async with self.db_engine.connect(self.logger) as db: 295 async with self.server.db_engine.connect(self.logger) as db:
307 self.db = db 296 self.db = db
308 if self.upstream is not None: 297 if self.server.upstream is not None:
309 self.upstream_client = await create_async_client(self.upstream) 298 self.upstream_client = await create_async_client(self.server.upstream)
310 else: 299 else:
311 self.upstream_client = None 300 self.upstream_client = None
312 301
@@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
323 if "stream" in k: 312 if "stream" in k:
324 return await self.handlers[k](msg[k]) 313 return await self.handlers[k](msg[k])
325 else: 314 else:
326 with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): 315 with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
327 return await self.handlers[k](msg[k]) 316 return await self.handlers[k](msg[k])
328 317
329 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 318 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
@@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
404 # possible (which is why the request sample is handled manually 393 # possible (which is why the request sample is handled manually
405 # instead of using 'with', and also why logging statements are 394 # instead of using 'with', and also why logging statements are
406 # commented out. 395 # commented out.
407 self.request_sample = self.request_stats.start_sample() 396 self.request_sample = self.server.request_stats.start_sample()
408 request_measure = self.request_sample.measure() 397 request_measure = self.request_sample.measure()
409 request_measure.start() 398 request_measure.start()
410 399
@@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
435 # Post to the backfill queue after writing the result to minimize 424 # Post to the backfill queue after writing the result to minimize
436 # the turn around time on a request 425 # the turn around time on a request
437 if upstream is not None: 426 if upstream is not None:
438 await self.backfill_queue.put((method, taskhash)) 427 await self.server.backfill_queue.put((method, taskhash))
439 428
440 await self.socket.send("ok") 429 await self.socket.send("ok")
441 return self.NO_RESPONSE 430 return self.NO_RESPONSE
@@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
461 # report is made inside the function 450 # report is made inside the function
462 @permissions(READ_PERM) 451 @permissions(READ_PERM)
463 async def handle_report(self, data): 452 async def handle_report(self, data):
464 if self.read_only or not self.user_has_permissions(REPORT_PERM): 453 if self.server.read_only or not self.user_has_permissions(REPORT_PERM):
465 return await self.report_readonly(data) 454 return await self.report_readonly(data)
466 455
467 outhash_data = { 456 outhash_data = {
@@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
538 @permissions(READ_PERM) 527 @permissions(READ_PERM)
539 async def handle_get_stats(self, request): 528 async def handle_get_stats(self, request):
540 return { 529 return {
541 "requests": self.request_stats.todict(), 530 "requests": self.server.request_stats.todict(),
542 } 531 }
543 532
544 @permissions(DB_ADMIN_PERM) 533 @permissions(DB_ADMIN_PERM)
545 async def handle_reset_stats(self, request): 534 async def handle_reset_stats(self, request):
546 d = { 535 d = {
547 "requests": self.request_stats.todict(), 536 "requests": self.server.request_stats.todict(),
548 } 537 }
549 538
550 self.request_stats.reset() 539 self.server.request_stats.reset()
551 return d 540 return d
552 541
553 @permissions(READ_PERM) 542 @permissions(READ_PERM)
554 async def handle_backfill_wait(self, request): 543 async def handle_backfill_wait(self, request):
555 d = { 544 d = {
556 "tasks": self.backfill_queue.qsize(), 545 "tasks": self.server.backfill_queue.qsize(),
557 } 546 }
558 await self.backfill_queue.join() 547 await self.server.backfill_queue.join()
559 return d 548 return d
560 549
561 @permissions(DB_ADMIN_PERM) 550 @permissions(DB_ADMIN_PERM)
@@ -567,6 +556,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
567 return {"count": await self.db.remove(condition)} 556 return {"count": await self.db.remove(condition)}
568 557
569 @permissions(DB_ADMIN_PERM) 558 @permissions(DB_ADMIN_PERM)
559 async def handle_gc_mark(self, request):
560 condition = request["where"]
561 mark = request["mark"]
562
563 if not isinstance(condition, dict):
564 raise TypeError("Bad condition type %s" % type(condition))
565
566 if not isinstance(mark, str):
567 raise TypeError("Bad mark type %s" % type(mark))
568
569 return {"count": await self.db.gc_mark(mark, condition)}
570
571 @permissions(DB_ADMIN_PERM)
572 async def handle_gc_sweep(self, request):
573 mark = request["mark"]
574
575 if not isinstance(mark, str):
576 raise TypeError("Bad mark type %s" % type(mark))
577
578 current_mark = await self.db.get_current_gc_mark()
579
580 if not current_mark or mark != current_mark:
581 raise bb.asyncrpc.InvokeError(
582 f"'{mark}' is not the current mark. Refusing to sweep"
583 )
584
585 count = await self.db.gc_sweep()
586
587 return {"count": count}
588
589 @permissions(DB_ADMIN_PERM)
590 async def handle_gc_status(self, request):
591 (keep_rows, remove_rows, current_mark) = await self.db.gc_status()
592 return {
593 "keep": keep_rows,
594 "remove": remove_rows,
595 "mark": current_mark,
596 }
597
598 @permissions(DB_ADMIN_PERM)
570 async def handle_clean_unused(self, request): 599 async def handle_clean_unused(self, request):
571 max_age = request["max_age_seconds"] 600 max_age = request["max_age_seconds"]
572 oldest = datetime.now() - timedelta(seconds=-max_age) 601 oldest = datetime.now() - timedelta(seconds=-max_age)
@@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer):
779 ) 808 )
780 809
781 def accept_client(self, socket): 810 def accept_client(self, socket):
782 return ServerClient( 811 return ServerClient(socket, self)
783 socket,
784 self.db_engine,
785 self.request_stats,
786 self.backfill_queue,
787 self.upstream,
788 self.read_only,
789 self.anon_perms,
790 )
791 812
792 async def create_admin_user(self): 813 async def create_admin_user(self):
793 admin_permissions = (ALL_PERM,) 814 admin_permissions = (ALL_PERM,)
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
index cee04bffb0..89a6b86d9d 100644
--- a/bitbake/lib/hashserv/sqlalchemy.py
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -28,6 +28,7 @@ from sqlalchemy import (
28 delete, 28 delete,
29 update, 29 update,
30 func, 30 func,
31 inspect,
31) 32)
32import sqlalchemy.engine 33import sqlalchemy.engine
33from sqlalchemy.orm import declarative_base 34from sqlalchemy.orm import declarative_base
@@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError
36Base = declarative_base() 37Base = declarative_base()
37 38
38 39
39class UnihashesV2(Base): 40class UnihashesV3(Base):
40 __tablename__ = "unihashes_v2" 41 __tablename__ = "unihashes_v3"
41 id = Column(Integer, primary_key=True, autoincrement=True) 42 id = Column(Integer, primary_key=True, autoincrement=True)
42 method = Column(Text, nullable=False) 43 method = Column(Text, nullable=False)
43 taskhash = Column(Text, nullable=False) 44 taskhash = Column(Text, nullable=False)
44 unihash = Column(Text, nullable=False) 45 unihash = Column(Text, nullable=False)
46 gc_mark = Column(Text, nullable=False)
45 47
46 __table_args__ = ( 48 __table_args__ = (
47 UniqueConstraint("method", "taskhash"), 49 UniqueConstraint("method", "taskhash"),
48 Index("taskhash_lookup_v3", "method", "taskhash"), 50 Index("taskhash_lookup_v4", "method", "taskhash"),
49 ) 51 )
50 52
51 53
@@ -79,6 +81,36 @@ class Users(Base):
79 __table_args__ = (UniqueConstraint("username"),) 81 __table_args__ = (UniqueConstraint("username"),)
80 82
81 83
84class Config(Base):
85 __tablename__ = "config"
86 id = Column(Integer, primary_key=True, autoincrement=True)
87 name = Column(Text, nullable=False)
88 value = Column(Text)
89 __table_args__ = (
90 UniqueConstraint("name"),
91 Index("config_lookup", "name"),
92 )
93
94
95#
96# Old table versions
97#
98DeprecatedBase = declarative_base()
99
100
101class UnihashesV2(DeprecatedBase):
102 __tablename__ = "unihashes_v2"
103 id = Column(Integer, primary_key=True, autoincrement=True)
104 method = Column(Text, nullable=False)
105 taskhash = Column(Text, nullable=False)
106 unihash = Column(Text, nullable=False)
107
108 __table_args__ = (
109 UniqueConstraint("method", "taskhash"),
110 Index("taskhash_lookup_v3", "method", "taskhash"),
111 )
112
113
82class DatabaseEngine(object): 114class DatabaseEngine(object):
83 def __init__(self, url, username=None, password=None): 115 def __init__(self, url, username=None, password=None):
84 self.logger = logging.getLogger("hashserv.sqlalchemy") 116 self.logger = logging.getLogger("hashserv.sqlalchemy")
@@ -91,6 +123,9 @@ class DatabaseEngine(object):
91 self.url = self.url.set(password=password) 123 self.url = self.url.set(password=password)
92 124
93 async def create(self): 125 async def create(self):
126 def check_table_exists(conn, name):
127 return inspect(conn).has_table(name)
128
94 self.logger.info("Using database %s", self.url) 129 self.logger.info("Using database %s", self.url)
95 self.engine = create_async_engine(self.url, poolclass=NullPool) 130 self.engine = create_async_engine(self.url, poolclass=NullPool)
96 131
@@ -99,6 +134,24 @@ class DatabaseEngine(object):
99 self.logger.info("Creating tables...") 134 self.logger.info("Creating tables...")
100 await conn.run_sync(Base.metadata.create_all) 135 await conn.run_sync(Base.metadata.create_all)
101 136
137 if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__):
138 self.logger.info("Upgrading Unihashes V2 -> V3...")
139 statement = insert(UnihashesV3).from_select(
140 ["id", "method", "unihash", "taskhash", "gc_mark"],
141 select(
142 UnihashesV2.id,
143 UnihashesV2.method,
144 UnihashesV2.unihash,
145 UnihashesV2.taskhash,
146 literal("").label("gc_mark"),
147 ),
148 )
149 self.logger.debug("%s", statement)
150 await conn.execute(statement)
151
152 await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__])
153 self.logger.info("Upgrade complete")
154
102 def connect(self, logger): 155 def connect(self, logger):
103 return Database(self.engine, logger) 156 return Database(self.engine, logger)
104 157
@@ -118,6 +171,15 @@ def map_user(row):
118 ) 171 )
119 172
120 173
174def _make_condition_statement(table, condition):
175 where = {}
176 for c in table.__table__.columns:
177 if c.key in condition and condition[c.key] is not None:
178 where[c] = condition[c.key]
179
180 return [(k == v) for k, v in where.items()]
181
182
121class Database(object): 183class Database(object):
122 def __init__(self, engine, logger): 184 def __init__(self, engine, logger):
123 self.engine = engine 185 self.engine = engine
@@ -135,17 +197,52 @@ class Database(object):
135 await self.db.close() 197 await self.db.close()
136 self.db = None 198 self.db = None
137 199
200 async def _execute(self, statement):
201 self.logger.debug("%s", statement)
202 return await self.db.execute(statement)
203
204 async def _set_config(self, name, value):
205 while True:
206 result = await self._execute(
207 update(Config).where(Config.name == name).values(value=value)
208 )
209
210 if result.rowcount == 0:
211 self.logger.debug("Config '%s' not found. Adding it", name)
212 try:
213 await self._execute(insert(Config).values(name=name, value=value))
214 except IntegrityError:
215 # Race. Try again
216 continue
217
218 break
219
220 def _get_config_subquery(self, name, default=None):
221 if default is not None:
222 return func.coalesce(
223 select(Config.value).where(Config.name == name).scalar_subquery(),
224 default,
225 )
226 return select(Config.value).where(Config.name == name).scalar_subquery()
227
228 async def _get_config(self, name):
229 result = await self._execute(select(Config.value).where(Config.name == name))
230 row = result.first()
231 if row is None:
232 return None
233 return row.value
234
138 async def get_unihash_by_taskhash_full(self, method, taskhash): 235 async def get_unihash_by_taskhash_full(self, method, taskhash):
139 statement = ( 236 statement = (
140 select( 237 select(
141 OuthashesV2, 238 OuthashesV2,
142 UnihashesV2.unihash.label("unihash"), 239 UnihashesV3.unihash.label("unihash"),
143 ) 240 )
144 .join( 241 .join(
145 UnihashesV2, 242 UnihashesV3,
146 and_( 243 and_(
147 UnihashesV2.method == OuthashesV2.method, 244 UnihashesV3.method == OuthashesV2.method,
148 UnihashesV2.taskhash == OuthashesV2.taskhash, 245 UnihashesV3.taskhash == OuthashesV2.taskhash,
149 ), 246 ),
150 ) 247 )
151 .where( 248 .where(
@@ -164,12 +261,12 @@ class Database(object):
164 261
165 async def get_unihash_by_outhash(self, method, outhash): 262 async def get_unihash_by_outhash(self, method, outhash):
166 statement = ( 263 statement = (
167 select(OuthashesV2, UnihashesV2.unihash.label("unihash")) 264 select(OuthashesV2, UnihashesV3.unihash.label("unihash"))
168 .join( 265 .join(
169 UnihashesV2, 266 UnihashesV3,
170 and_( 267 and_(
171 UnihashesV2.method == OuthashesV2.method, 268 UnihashesV3.method == OuthashesV2.method,
172 UnihashesV2.taskhash == OuthashesV2.taskhash, 269 UnihashesV3.taskhash == OuthashesV2.taskhash,
173 ), 270 ),
174 ) 271 )
175 .where( 272 .where(
@@ -208,13 +305,13 @@ class Database(object):
208 statement = ( 305 statement = (
209 select( 306 select(
210 OuthashesV2.taskhash.label("taskhash"), 307 OuthashesV2.taskhash.label("taskhash"),
211 UnihashesV2.unihash.label("unihash"), 308 UnihashesV3.unihash.label("unihash"),
212 ) 309 )
213 .join( 310 .join(
214 UnihashesV2, 311 UnihashesV3,
215 and_( 312 and_(
216 UnihashesV2.method == OuthashesV2.method, 313 UnihashesV3.method == OuthashesV2.method,
217 UnihashesV2.taskhash == OuthashesV2.taskhash, 314 UnihashesV3.taskhash == OuthashesV2.taskhash,
218 ), 315 ),
219 ) 316 )
220 .where( 317 .where(
@@ -234,12 +331,12 @@ class Database(object):
234 331
235 async def get_equivalent(self, method, taskhash): 332 async def get_equivalent(self, method, taskhash):
236 statement = select( 333 statement = select(
237 UnihashesV2.unihash, 334 UnihashesV3.unihash,
238 UnihashesV2.method, 335 UnihashesV3.method,
239 UnihashesV2.taskhash, 336 UnihashesV3.taskhash,
240 ).where( 337 ).where(
241 UnihashesV2.method == method, 338 UnihashesV3.method == method,
242 UnihashesV2.taskhash == taskhash, 339 UnihashesV3.taskhash == taskhash,
243 ) 340 )
244 self.logger.debug("%s", statement) 341 self.logger.debug("%s", statement)
245 async with self.db.begin(): 342 async with self.db.begin():
@@ -248,13 +345,9 @@ class Database(object):
248 345
249 async def remove(self, condition): 346 async def remove(self, condition):
250 async def do_remove(table): 347 async def do_remove(table):
251 where = {} 348 where = _make_condition_statement(table, condition)
252 for c in table.__table__.columns:
253 if c.key in condition and condition[c.key] is not None:
254 where[c] = condition[c.key]
255
256 if where: 349 if where:
257 statement = delete(table).where(*[(k == v) for k, v in where.items()]) 350 statement = delete(table).where(*where)
258 self.logger.debug("%s", statement) 351 self.logger.debug("%s", statement)
259 async with self.db.begin(): 352 async with self.db.begin():
260 result = await self.db.execute(statement) 353 result = await self.db.execute(statement)
@@ -263,19 +356,74 @@ class Database(object):
263 return 0 356 return 0
264 357
265 count = 0 358 count = 0
266 count += await do_remove(UnihashesV2) 359 count += await do_remove(UnihashesV3)
267 count += await do_remove(OuthashesV2) 360 count += await do_remove(OuthashesV2)
268 361
269 return count 362 return count
270 363
364 async def get_current_gc_mark(self):
365 async with self.db.begin():
366 return await self._get_config("gc-mark")
367
368 async def gc_status(self):
369 async with self.db.begin():
370 gc_mark_subquery = self._get_config_subquery("gc-mark", "")
371
372 result = await self._execute(
373 select(func.count())
374 .select_from(UnihashesV3)
375 .where(UnihashesV3.gc_mark == gc_mark_subquery)
376 )
377 keep_rows = result.scalar()
378
379 result = await self._execute(
380 select(func.count())
381 .select_from(UnihashesV3)
382 .where(UnihashesV3.gc_mark != gc_mark_subquery)
383 )
384 remove_rows = result.scalar()
385
386 return (keep_rows, remove_rows, await self._get_config("gc-mark"))
387
388 async def gc_mark(self, mark, condition):
389 async with self.db.begin():
390 await self._set_config("gc-mark", mark)
391
392 where = _make_condition_statement(UnihashesV3, condition)
393 if not where:
394 return 0
395
396 result = await self._execute(
397 update(UnihashesV3)
398 .values(gc_mark=self._get_config_subquery("gc-mark", ""))
399 .where(*where)
400 )
401 return result.rowcount
402
403 async def gc_sweep(self):
404 async with self.db.begin():
405 result = await self._execute(
406 delete(UnihashesV3).where(
407 # A sneaky conditional that provides some errant use
408 # protection: If the config mark is NULL, this will not
409 # match any rows because No default is specified in the
410 # select statement
411 UnihashesV3.gc_mark
412 != self._get_config_subquery("gc-mark")
413 )
414 )
415 await self._set_config("gc-mark", None)
416
417 return result.rowcount
418
271 async def clean_unused(self, oldest): 419 async def clean_unused(self, oldest):
272 statement = delete(OuthashesV2).where( 420 statement = delete(OuthashesV2).where(
273 OuthashesV2.created < oldest, 421 OuthashesV2.created < oldest,
274 ~( 422 ~(
275 select(UnihashesV2.id) 423 select(UnihashesV3.id)
276 .where( 424 .where(
277 UnihashesV2.method == OuthashesV2.method, 425 UnihashesV3.method == OuthashesV2.method,
278 UnihashesV2.taskhash == OuthashesV2.taskhash, 426 UnihashesV3.taskhash == OuthashesV2.taskhash,
279 ) 427 )
280 .limit(1) 428 .limit(1)
281 .exists() 429 .exists()
@@ -287,15 +435,17 @@ class Database(object):
287 return result.rowcount 435 return result.rowcount
288 436
289 async def insert_unihash(self, method, taskhash, unihash): 437 async def insert_unihash(self, method, taskhash, unihash):
290 statement = insert(UnihashesV2).values(
291 method=method,
292 taskhash=taskhash,
293 unihash=unihash,
294 )
295 self.logger.debug("%s", statement)
296 try: 438 try:
297 async with self.db.begin(): 439 async with self.db.begin():
298 await self.db.execute(statement) 440 await self._execute(
441 insert(UnihashesV3).values(
442 method=method,
443 taskhash=taskhash,
444 unihash=unihash,
445 gc_mark=self._get_config_subquery("gc-mark", ""),
446 )
447 )
448
299 return True 449 return True
300 except IntegrityError: 450 except IntegrityError:
301 self.logger.debug( 451 self.logger.debug(
@@ -418,7 +568,7 @@ class Database(object):
418 568
419 async def get_query_columns(self): 569 async def get_query_columns(self):
420 columns = set() 570 columns = set()
421 for table in (UnihashesV2, OuthashesV2): 571 for table in (UnihashesV3, OuthashesV2):
422 for c in table.__table__.columns: 572 for c in table.__table__.columns:
423 if not isinstance(c.type, Text): 573 if not isinstance(c.type, Text):
424 continue 574 continue
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py
index f93cb2c1dd..608490730d 100644
--- a/bitbake/lib/hashserv/sqlite.py
+++ b/bitbake/lib/hashserv/sqlite.py
@@ -15,6 +15,7 @@ UNIHASH_TABLE_DEFINITION = (
15 ("method", "TEXT NOT NULL", "UNIQUE"), 15 ("method", "TEXT NOT NULL", "UNIQUE"),
16 ("taskhash", "TEXT NOT NULL", "UNIQUE"), 16 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
17 ("unihash", "TEXT NOT NULL", ""), 17 ("unihash", "TEXT NOT NULL", ""),
18 ("gc_mark", "TEXT NOT NULL", ""),
18) 19)
19 20
20UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) 21UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
@@ -44,6 +45,14 @@ USERS_TABLE_DEFINITION = (
44USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) 45USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION)
45 46
46 47
48CONFIG_TABLE_DEFINITION = (
49 ("name", "TEXT NOT NULL", "UNIQUE"),
50 ("value", "TEXT", ""),
51)
52
53CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION)
54
55
47def _make_table(cursor, name, definition): 56def _make_table(cursor, name, definition):
48 cursor.execute( 57 cursor.execute(
49 """ 58 """
@@ -71,6 +80,35 @@ def map_user(row):
71 ) 80 )
72 81
73 82
83def _make_condition_statement(columns, condition):
84 where = {}
85 for c in columns:
86 if c in condition and condition[c] is not None:
87 where[c] = condition[c]
88
89 return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys())
90
91
92def _get_sqlite_version(cursor):
93 cursor.execute("SELECT sqlite_version()")
94
95 version = []
96 for v in cursor.fetchone()[0].split("."):
97 try:
98 version.append(int(v))
99 except ValueError:
100 version.append(v)
101
102 return tuple(version)
103
104
105def _schema_table_name(version):
106 if version >= (3, 33):
107 return "sqlite_schema"
108
109 return "sqlite_master"
110
111
74class DatabaseEngine(object): 112class DatabaseEngine(object):
75 def __init__(self, dbname, sync): 113 def __init__(self, dbname, sync):
76 self.dbname = dbname 114 self.dbname = dbname
@@ -82,9 +120,10 @@ class DatabaseEngine(object):
82 db.row_factory = sqlite3.Row 120 db.row_factory = sqlite3.Row
83 121
84 with closing(db.cursor()) as cursor: 122 with closing(db.cursor()) as cursor:
85 _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) 123 _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION)
86 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) 124 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
87 _make_table(cursor, "users", USERS_TABLE_DEFINITION) 125 _make_table(cursor, "users", USERS_TABLE_DEFINITION)
126 _make_table(cursor, "config", CONFIG_TABLE_DEFINITION)
88 127
89 cursor.execute("PRAGMA journal_mode = WAL") 128 cursor.execute("PRAGMA journal_mode = WAL")
90 cursor.execute( 129 cursor.execute(
@@ -96,17 +135,38 @@ class DatabaseEngine(object):
96 cursor.execute("DROP INDEX IF EXISTS outhash_lookup") 135 cursor.execute("DROP INDEX IF EXISTS outhash_lookup")
97 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") 136 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2")
98 cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") 137 cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2")
138 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3")
99 139
100 # TODO: Upgrade from tasks_v2? 140 # TODO: Upgrade from tasks_v2?
101 cursor.execute("DROP TABLE IF EXISTS tasks_v2") 141 cursor.execute("DROP TABLE IF EXISTS tasks_v2")
102 142
103 # Create new indexes 143 # Create new indexes
104 cursor.execute( 144 cursor.execute(
105 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" 145 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)"
106 ) 146 )
107 cursor.execute( 147 cursor.execute(
108 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" 148 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
109 ) 149 )
150 cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)")
151
152 sqlite_version = _get_sqlite_version(cursor)
153
154 cursor.execute(
155 f"""
156 SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2'
157 """
158 )
159 if cursor.fetchone():
160 self.logger.info("Upgrading Unihashes V2 -> V3...")
161 cursor.execute(
162 """
163 INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark)
164 SELECT id, method, unihash, taskhash, '' FROM unihashes_v2
165 """
166 )
167 cursor.execute("DROP TABLE unihashes_v2")
168 db.commit()
169 self.logger.info("Upgrade complete")
110 170
111 def connect(self, logger): 171 def connect(self, logger):
112 return Database(logger, self.dbname, self.sync) 172 return Database(logger, self.dbname, self.sync)
@@ -126,16 +186,7 @@ class Database(object):
126 "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") 186 "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF")
127 ) 187 )
128 188
129 cursor.execute("SELECT sqlite_version()") 189 self.sqlite_version = _get_sqlite_version(cursor)
130
131 version = []
132 for v in cursor.fetchone()[0].split("."):
133 try:
134 version.append(int(v))
135 except ValueError:
136 version.append(v)
137
138 self.sqlite_version = tuple(version)
139 190
140 async def __aenter__(self): 191 async def __aenter__(self):
141 return self 192 return self
@@ -143,6 +194,30 @@ class Database(object):
143 async def __aexit__(self, exc_type, exc_value, traceback): 194 async def __aexit__(self, exc_type, exc_value, traceback):
144 await self.close() 195 await self.close()
145 196
197 async def _set_config(self, cursor, name, value):
198 cursor.execute(
199 """
200 INSERT OR REPLACE INTO config (id, name, value) VALUES
201 ((SELECT id FROM config WHERE name=:name), :name, :value)
202 """,
203 {
204 "name": name,
205 "value": value,
206 },
207 )
208
209 async def _get_config(self, cursor, name):
210 cursor.execute(
211 "SELECT value FROM config WHERE name=:name",
212 {
213 "name": name,
214 },
215 )
216 row = cursor.fetchone()
217 if row is None:
218 return None
219 return row["value"]
220
146 async def close(self): 221 async def close(self):
147 self.db.close() 222 self.db.close()
148 223
@@ -150,8 +225,8 @@ class Database(object):
150 with closing(self.db.cursor()) as cursor: 225 with closing(self.db.cursor()) as cursor:
151 cursor.execute( 226 cursor.execute(
152 """ 227 """
153 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 228 SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2
154 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash 229 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
155 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash 230 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
156 ORDER BY outhashes_v2.created ASC 231 ORDER BY outhashes_v2.created ASC
157 LIMIT 1 232 LIMIT 1
@@ -167,8 +242,8 @@ class Database(object):
167 with closing(self.db.cursor()) as cursor: 242 with closing(self.db.cursor()) as cursor:
168 cursor.execute( 243 cursor.execute(
169 """ 244 """
170 SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 245 SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2
171 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash 246 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
172 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash 247 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
173 ORDER BY outhashes_v2.created ASC 248 ORDER BY outhashes_v2.created ASC
174 LIMIT 1 249 LIMIT 1
@@ -200,8 +275,8 @@ class Database(object):
200 with closing(self.db.cursor()) as cursor: 275 with closing(self.db.cursor()) as cursor:
201 cursor.execute( 276 cursor.execute(
202 """ 277 """
203 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 278 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2
204 INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash 279 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
205 -- Select any matching output hash except the one we just inserted 280 -- Select any matching output hash except the one we just inserted
206 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash 281 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
207 -- Pick the oldest hash 282 -- Pick the oldest hash
@@ -219,7 +294,7 @@ class Database(object):
219 async def get_equivalent(self, method, taskhash): 294 async def get_equivalent(self, method, taskhash):
220 with closing(self.db.cursor()) as cursor: 295 with closing(self.db.cursor()) as cursor:
221 cursor.execute( 296 cursor.execute(
222 "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", 297 "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash",
223 { 298 {
224 "method": method, 299 "method": method,
225 "taskhash": taskhash, 300 "taskhash": taskhash,
@@ -229,15 +304,9 @@ class Database(object):
229 304
230 async def remove(self, condition): 305 async def remove(self, condition):
231 def do_remove(columns, table_name, cursor): 306 def do_remove(columns, table_name, cursor):
232 where = {} 307 where, clause = _make_condition_statement(columns, condition)
233 for c in columns:
234 if c in condition and condition[c] is not None:
235 where[c] = condition[c]
236
237 if where: 308 if where:
238 query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( 309 query = f"DELETE FROM {table_name} WHERE {clause}"
239 "%s=:%s" % (k, k) for k in where.keys()
240 )
241 cursor.execute(query, where) 310 cursor.execute(query, where)
242 return cursor.rowcount 311 return cursor.rowcount
243 312
@@ -246,17 +315,80 @@ class Database(object):
246 count = 0 315 count = 0
247 with closing(self.db.cursor()) as cursor: 316 with closing(self.db.cursor()) as cursor:
248 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) 317 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
249 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) 318 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor)
250 self.db.commit() 319 self.db.commit()
251 320
252 return count 321 return count
253 322
323 async def get_current_gc_mark(self):
324 with closing(self.db.cursor()) as cursor:
325 return await self._get_config(cursor, "gc-mark")
326
327 async def gc_status(self):
328 with closing(self.db.cursor()) as cursor:
329 cursor.execute(
330 """
331 SELECT COUNT() FROM unihashes_v3 WHERE
332 gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
333 """
334 )
335 keep_rows = cursor.fetchone()[0]
336
337 cursor.execute(
338 """
339 SELECT COUNT() FROM unihashes_v3 WHERE
340 gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
341 """
342 )
343 remove_rows = cursor.fetchone()[0]
344
345 current_mark = await self._get_config(cursor, "gc-mark")
346
347 return (keep_rows, remove_rows, current_mark)
348
349 async def gc_mark(self, mark, condition):
350 with closing(self.db.cursor()) as cursor:
351 await self._set_config(cursor, "gc-mark", mark)
352
353 where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition)
354
355 new_rows = 0
356 if where:
357 cursor.execute(
358 f"""
359 UPDATE unihashes_v3 SET
360 gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
361 WHERE {clause}
362 """,
363 where,
364 )
365 new_rows = cursor.rowcount
366
367 self.db.commit()
368 return new_rows
369
370 async def gc_sweep(self):
371 with closing(self.db.cursor()) as cursor:
372 # NOTE: COALESCE is not used in this query so that if the current
373 # mark is NULL, nothing will happen
374 cursor.execute(
375 """
376 DELETE FROM unihashes_v3 WHERE
377 gc_mark!=(SELECT value FROM config WHERE name='gc-mark')
378 """
379 )
380 count = cursor.rowcount
381 await self._set_config(cursor, "gc-mark", None)
382
383 self.db.commit()
384 return count
385
254 async def clean_unused(self, oldest): 386 async def clean_unused(self, oldest):
255 with closing(self.db.cursor()) as cursor: 387 with closing(self.db.cursor()) as cursor:
256 cursor.execute( 388 cursor.execute(
257 """ 389 """
258 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( 390 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
259 SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 391 SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1
260 ) 392 )
261 """, 393 """,
262 { 394 {
@@ -271,7 +403,13 @@ class Database(object):
271 prevrowid = cursor.lastrowid 403 prevrowid = cursor.lastrowid
272 cursor.execute( 404 cursor.execute(
273 """ 405 """
274 INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) 406 INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES
407 (
408 :method,
409 :taskhash,
410 :unihash,
411 COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
412 )
275 """, 413 """,
276 { 414 {
277 "method": method, 415 "method": method,
@@ -383,14 +521,9 @@ class Database(object):
383 async def get_usage(self): 521 async def get_usage(self):
384 usage = {} 522 usage = {}
385 with closing(self.db.cursor()) as cursor: 523 with closing(self.db.cursor()) as cursor:
386 if self.sqlite_version >= (3, 33):
387 table_name = "sqlite_schema"
388 else:
389 table_name = "sqlite_master"
390
391 cursor.execute( 524 cursor.execute(
392 f""" 525 f"""
393 SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' 526 SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
394 """ 527 """
395 ) 528 )
396 for row in cursor.fetchall(): 529 for row in cursor.fetchall():
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 869f7636c5..aeedab3575 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -810,6 +810,27 @@ class HashEquivalenceCommonTests(object):
810 with self.auth_perms("@user-admin") as client: 810 with self.auth_perms("@user-admin") as client:
811 become = client.become_user(client.username) 811 become = client.become_user(client.username)
812 812
813 def test_auth_gc(self):
814 admin_client = self.start_auth_server()
815
816 with self.auth_perms() as client, self.assertRaises(InvokeError):
817 client.gc_mark("ABC", {"unihash": "123"})
818
819 with self.auth_perms() as client, self.assertRaises(InvokeError):
820 client.gc_status()
821
822 with self.auth_perms() as client, self.assertRaises(InvokeError):
823 client.gc_sweep("ABC")
824
825 with self.auth_perms("@db-admin") as client:
826 client.gc_mark("ABC", {"unihash": "123"})
827
828 with self.auth_perms("@db-admin") as client:
829 client.gc_status()
830
831 with self.auth_perms("@db-admin") as client:
832 client.gc_sweep("ABC")
833
813 def test_get_db_usage(self): 834 def test_get_db_usage(self):
814 usage = self.client.get_db_usage() 835 usage = self.client.get_db_usage()
815 836
@@ -837,6 +858,147 @@ class HashEquivalenceCommonTests(object):
837 data = client.get_taskhash(self.METHOD, taskhash, True) 858 data = client.get_taskhash(self.METHOD, taskhash, True)
838 self.assertEqual(data["owner"], user["username"]) 859 self.assertEqual(data["owner"], user["username"])
839 860
861 def test_gc(self):
862 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
863 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
864 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
865
866 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
867 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
868
869 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
870 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
871 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
872
873 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
874 self.assertClientGetHash(self.client, taskhash2, unihash2)
875
876 # Mark the first unihash to be kept
877 ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD})
878 self.assertEqual(ret, {"count": 1})
879
880 ret = self.client.gc_status()
881 self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1})
882
883 # Second hash is still there; mark doesn't delete hashes
884 self.assertClientGetHash(self.client, taskhash2, unihash2)
885
886 ret = self.client.gc_sweep("ABC")
887 self.assertEqual(ret, {"count": 1})
888
889 # Hash is gone. Taskhash is returned for second hash
890 self.assertClientGetHash(self.client, taskhash2, None)
891 # First hash is still present
892 self.assertClientGetHash(self.client, taskhash, unihash)
893
894 def test_gc_switch_mark(self):
895 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
896 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
897 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
898
899 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
900 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
901
902 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
903 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
904 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
905
906 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
907 self.assertClientGetHash(self.client, taskhash2, unihash2)
908
909 # Mark the first unihash to be kept
910 ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD})
911 self.assertEqual(ret, {"count": 1})
912
913 ret = self.client.gc_status()
914 self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1})
915
916 # Second hash is still there; mark doesn't delete hashes
917 self.assertClientGetHash(self.client, taskhash2, unihash2)
918
919 # Switch to a different mark and mark the second hash. This will start
920 # a new collection cycle
921 ret = self.client.gc_mark("DEF", {"unihash": unihash2, "method": self.METHOD})
922 self.assertEqual(ret, {"count": 1})
923
924 ret = self.client.gc_status()
925 self.assertEqual(ret, {"mark": "DEF", "keep": 1, "remove": 1})
926
927 # Both hashes are still present
928 self.assertClientGetHash(self.client, taskhash2, unihash2)
929 self.assertClientGetHash(self.client, taskhash, unihash)
930
931 # Sweep with the new mark
932 ret = self.client.gc_sweep("DEF")
933 self.assertEqual(ret, {"count": 1})
934
935 # First hash is gone, second is kept
936 self.assertClientGetHash(self.client, taskhash2, unihash2)
937 self.assertClientGetHash(self.client, taskhash, None)
938
939 def test_gc_switch_sweep_mark(self):
940 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
941 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
942 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
943
944 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
945 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
946
947 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
948 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
949 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
950
951 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
952 self.assertClientGetHash(self.client, taskhash2, unihash2)
953
954 # Mark the first unihash to be kept
955 ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD})
956 self.assertEqual(ret, {"count": 1})
957
958 ret = self.client.gc_status()
959 self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1})
960
961 # Sweeping with a different mark raises an error
962 with self.assertRaises(InvokeError):
963 self.client.gc_sweep("DEF")
964
965 # Both hashes are present
966 self.assertClientGetHash(self.client, taskhash2, unihash2)
967 self.assertClientGetHash(self.client, taskhash, unihash)
968
969 def test_gc_new_hashes(self):
970 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
971 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
972 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
973
974 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
975 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
976
977 # Start a new garbage collection
978 ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD})
979 self.assertEqual(ret, {"count": 1})
980
981 ret = self.client.gc_status()
982 self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 0})
983
984 # Add second hash. It should inherit the mark from the current garbage
985 # collection operation
986
987 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
988 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
989 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
990
991 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
992 self.assertClientGetHash(self.client, taskhash2, unihash2)
993
994 # Sweep should remove nothing
995 ret = self.client.gc_sweep("ABC")
996 self.assertEqual(ret, {"count": 0})
997
998 # Both hashes are present
999 self.assertClientGetHash(self.client, taskhash2, unihash2)
1000 self.assertClientGetHash(self.client, taskhash, unihash)
1001
840 1002
841class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): 1003class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
842 def get_server_addr(self, server_idx): 1004 def get_server_addr(self, server_idx):
@@ -1086,6 +1248,42 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
1086 "get-db-query-columns", 1248 "get-db-query-columns",
1087 ], check=True) 1249 ], check=True)
1088 1250
1251 def test_gc(self):
1252 taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4'
1253 outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8'
1254 unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646'
1255
1256 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
1257 self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
1258
1259 taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4'
1260 outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4'
1261 unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b'
1262
1263 result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
1264 self.assertClientGetHash(self.client, taskhash2, unihash2)
1265
1266 # Mark the first unihash to be kept
1267 self.run_hashclient([
1268 "--address", self.server_address,
1269 "gc-mark", "ABC",
1270 "--where", "unihash", unihash,
1271 "--where", "method", self.METHOD
1272 ], check=True)
1273
1274 # Second hash is still there; mark doesn't delete hashes
1275 self.assertClientGetHash(self.client, taskhash2, unihash2)
1276
1277 self.run_hashclient([
1278 "--address", self.server_address,
1279 "gc-sweep", "ABC",
1280 ], check=True)
1281
1282 # Hash is gone. Taskhash is returned for second hash
1283 self.assertClientGetHash(self.client, taskhash2, None)
1284 # First hash is still present
1285 self.assertClientGetHash(self.client, taskhash, unihash)
1286
1089 1287
1090class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): 1288class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
1091 def get_server_addr(self, server_idx): 1289 def get_server_addr(self, server_idx):