diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-02-18 15:59:46 -0700 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-02-19 11:58:12 +0000 |
commit | 1effd1014d9140905093efe25eeefedb28a10875 (patch) | |
tree | b34fb1d26f020b361d22904695cab6b9a7c1ea50 | |
parent | 324c9fd666117afb0dd689eaa8551bb02d6a042b (diff) | |
download | poky-1effd1014d9140905093efe25eeefedb28a10875.tar.gz |
bitbake: hashserv: Add Unihash Garbage Collection
Adds support for removing unused unihashes from the database. This is
done using a "mark and sweep" style of garbage collection where a
collection is started by marking which unihashes should be kept in the
database, then performing a sweep to remove any unmarked hashes.
(Bitbake rev: 433d4a075a1acfbd2a2913061739353a84bb01ed)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-x | bitbake/bin/bitbake-hashclient | 35 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 31 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 105 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 226 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlite.py | 205 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 198 |
6 files changed, 684 insertions, 116 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index 2cb6338666..f71b87404a 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient | |||
@@ -195,6 +195,28 @@ def main(): | |||
195 | columns = client.get_db_query_columns() | 195 | columns = client.get_db_query_columns() |
196 | print("\n".join(sorted(columns))) | 196 | print("\n".join(sorted(columns))) |
197 | 197 | ||
198 | def handle_gc_status(args, client): | ||
199 | result = client.gc_status() | ||
200 | if not result["mark"]: | ||
201 | print("No Garbage collection in progress") | ||
202 | return 0 | ||
203 | |||
204 | print("Current Mark: %s" % result["mark"]) | ||
205 | print("Total hashes to keep: %d" % result["keep"]) | ||
206 | print("Total hashes to remove: %s" % result["remove"]) | ||
207 | return 0 | ||
208 | |||
209 | def handle_gc_mark(args, client): | ||
210 | where = {k: v for k, v in args.where} | ||
211 | result = client.gc_mark(args.mark, where) | ||
212 | print("New hashes marked: %d" % result["count"]) | ||
213 | return 0 | ||
214 | |||
215 | def handle_gc_sweep(args, client): | ||
216 | result = client.gc_sweep(args.mark) | ||
217 | print("Removed %d rows" % result["count"]) | ||
218 | return 0 | ||
219 | |||
198 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') | 220 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') |
199 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') | 221 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') |
200 | parser.add_argument('--log', default='WARNING', help='Set logging level') | 222 | parser.add_argument('--log', default='WARNING', help='Set logging level') |
@@ -274,6 +296,19 @@ def main(): | |||
274 | db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") | 296 | db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") |
275 | db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) | 297 | db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) |
276 | 298 | ||
299 | gc_status_parser = subparsers.add_parser("gc-status", help="Show garbage collection status") | ||
300 | gc_status_parser.set_defaults(func=handle_gc_status) | ||
301 | |||
302 | gc_mark_parser = subparsers.add_parser('gc-mark', help="Mark hashes to be kept for garbage collection") | ||
303 | gc_mark_parser.add_argument("mark", help="Mark for this garbage collection operation") | ||
304 | gc_mark_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[], | ||
305 | help="Keep entries in table where KEY == VALUE") | ||
306 | gc_mark_parser.set_defaults(func=handle_gc_mark) | ||
307 | |||
308 | gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") | ||
309 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") | ||
310 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) | ||
311 | |||
277 | args = parser.parse_args() | 312 | args = parser.parse_args() |
278 | 313 | ||
279 | logger = logging.getLogger('hashserv') | 314 | logger = logging.getLogger('hashserv') |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 35a97687fb..e6dc417912 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -194,6 +194,34 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
194 | await self._set_mode(self.MODE_NORMAL) | 194 | await self._set_mode(self.MODE_NORMAL) |
195 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] | 195 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] |
196 | 196 | ||
197 | async def gc_status(self): | ||
198 | await self._set_mode(self.MODE_NORMAL) | ||
199 | return await self.invoke({"gc-status": {}}) | ||
200 | |||
201 | async def gc_mark(self, mark, where): | ||
202 | """ | ||
203 | Starts a new garbage collection operation identified by "mark". If | ||
204 | garbage collection is already in progress with "mark", the collection | ||
205 | is continued. | ||
206 | |||
207 | All unihash entries that match the "where" clause are marked to be | ||
208 | kept. In addition, any new entries added to the database after this | ||
209 | command will be automatically marked with "mark" | ||
210 | """ | ||
211 | await self._set_mode(self.MODE_NORMAL) | ||
212 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | ||
213 | |||
214 | async def gc_sweep(self, mark): | ||
215 | """ | ||
216 | Finishes garbage collection for "mark". All unihash entries that have | ||
217 | not been marked will be deleted. | ||
218 | |||
219 | It is recommended to clean unused outhash entries after running this to | ||
220 | cleanup any dangling outhashes | ||
221 | """ | ||
222 | await self._set_mode(self.MODE_NORMAL) | ||
223 | return await self.invoke({"gc-sweep": {"mark": mark}}) | ||
224 | |||
197 | 225 | ||
198 | class Client(bb.asyncrpc.Client): | 226 | class Client(bb.asyncrpc.Client): |
199 | def __init__(self, username=None, password=None): | 227 | def __init__(self, username=None, password=None): |
@@ -224,6 +252,9 @@ class Client(bb.asyncrpc.Client): | |||
224 | "become_user", | 252 | "become_user", |
225 | "get_db_usage", | 253 | "get_db_usage", |
226 | "get_db_query_columns", | 254 | "get_db_query_columns", |
255 | "gc_status", | ||
256 | "gc_mark", | ||
257 | "gc_sweep", | ||
227 | ) | 258 | ) |
228 | 259 | ||
229 | def _get_async_client(self): | 260 | def _get_async_client(self): |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a86507830e..5ed852d1f3 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): | 199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): |
200 | if not self.user: | 200 | if not self.user: |
201 | username = "Anonymous user" | 201 | username = "Anonymous user" |
202 | user_perms = self.anon_perms | 202 | user_perms = self.server.anon_perms |
203 | else: | 203 | else: |
204 | username = self.user.username | 204 | username = self.user.username |
205 | user_perms = self.user.permissions | 205 | user_perms = self.user.permissions |
@@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
223 | 223 | ||
224 | 224 | ||
225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
226 | def __init__( | 226 | def __init__(self, socket, server): |
227 | self, | 227 | super().__init__(socket, "OEHASHEQUIV", server.logger) |
228 | socket, | 228 | self.server = server |
229 | db_engine, | ||
230 | request_stats, | ||
231 | backfill_queue, | ||
232 | upstream, | ||
233 | read_only, | ||
234 | anon_perms, | ||
235 | ): | ||
236 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
237 | self.db_engine = db_engine | ||
238 | self.request_stats = request_stats | ||
239 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 229 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
240 | self.backfill_queue = backfill_queue | ||
241 | self.upstream = upstream | ||
242 | self.read_only = read_only | ||
243 | self.user = None | 230 | self.user = None |
244 | self.anon_perms = anon_perms | ||
245 | 231 | ||
246 | self.handlers.update( | 232 | self.handlers.update( |
247 | { | 233 | { |
@@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
261 | } | 247 | } |
262 | ) | 248 | ) |
263 | 249 | ||
264 | if not read_only: | 250 | if not self.server.read_only: |
265 | self.handlers.update( | 251 | self.handlers.update( |
266 | { | 252 | { |
267 | "report-equiv": self.handle_equivreport, | 253 | "report-equiv": self.handle_equivreport, |
268 | "reset-stats": self.handle_reset_stats, | 254 | "reset-stats": self.handle_reset_stats, |
269 | "backfill-wait": self.handle_backfill_wait, | 255 | "backfill-wait": self.handle_backfill_wait, |
270 | "remove": self.handle_remove, | 256 | "remove": self.handle_remove, |
257 | "gc-mark": self.handle_gc_mark, | ||
258 | "gc-sweep": self.handle_gc_sweep, | ||
259 | "gc-status": self.handle_gc_status, | ||
271 | "clean-unused": self.handle_clean_unused, | 260 | "clean-unused": self.handle_clean_unused, |
272 | "refresh-token": self.handle_refresh_token, | 261 | "refresh-token": self.handle_refresh_token, |
273 | "set-user-perms": self.handle_set_perms, | 262 | "set-user-perms": self.handle_set_perms, |
@@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
282 | def user_has_permissions(self, *permissions, allow_anon=True): | 271 | def user_has_permissions(self, *permissions, allow_anon=True): |
283 | permissions = set(permissions) | 272 | permissions = set(permissions) |
284 | if allow_anon: | 273 | if allow_anon: |
285 | if ALL_PERM in self.anon_perms: | 274 | if ALL_PERM in self.server.anon_perms: |
286 | return True | 275 | return True |
287 | 276 | ||
288 | if not permissions - self.anon_perms: | 277 | if not permissions - self.server.anon_perms: |
289 | return True | 278 | return True |
290 | 279 | ||
291 | if self.user is None: | 280 | if self.user is None: |
@@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
303 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) | 292 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
304 | 293 | ||
305 | async def process_requests(self): | 294 | async def process_requests(self): |
306 | async with self.db_engine.connect(self.logger) as db: | 295 | async with self.server.db_engine.connect(self.logger) as db: |
307 | self.db = db | 296 | self.db = db |
308 | if self.upstream is not None: | 297 | if self.server.upstream is not None: |
309 | self.upstream_client = await create_async_client(self.upstream) | 298 | self.upstream_client = await create_async_client(self.server.upstream) |
310 | else: | 299 | else: |
311 | self.upstream_client = None | 300 | self.upstream_client = None |
312 | 301 | ||
@@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
323 | if "stream" in k: | 312 | if "stream" in k: |
324 | return await self.handlers[k](msg[k]) | 313 | return await self.handlers[k](msg[k]) |
325 | else: | 314 | else: |
326 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): | 315 | with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
327 | return await self.handlers[k](msg[k]) | 316 | return await self.handlers[k](msg[k]) |
328 | 317 | ||
329 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 318 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
@@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
404 | # possible (which is why the request sample is handled manually | 393 | # possible (which is why the request sample is handled manually |
405 | # instead of using 'with', and also why logging statements are | 394 | # instead of using 'with', and also why logging statements are |
406 | # commented out. | 395 | # commented out. |
407 | self.request_sample = self.request_stats.start_sample() | 396 | self.request_sample = self.server.request_stats.start_sample() |
408 | request_measure = self.request_sample.measure() | 397 | request_measure = self.request_sample.measure() |
409 | request_measure.start() | 398 | request_measure.start() |
410 | 399 | ||
@@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
435 | # Post to the backfill queue after writing the result to minimize | 424 | # Post to the backfill queue after writing the result to minimize |
436 | # the turn around time on a request | 425 | # the turn around time on a request |
437 | if upstream is not None: | 426 | if upstream is not None: |
438 | await self.backfill_queue.put((method, taskhash)) | 427 | await self.server.backfill_queue.put((method, taskhash)) |
439 | 428 | ||
440 | await self.socket.send("ok") | 429 | await self.socket.send("ok") |
441 | return self.NO_RESPONSE | 430 | return self.NO_RESPONSE |
@@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
461 | # report is made inside the function | 450 | # report is made inside the function |
462 | @permissions(READ_PERM) | 451 | @permissions(READ_PERM) |
463 | async def handle_report(self, data): | 452 | async def handle_report(self, data): |
464 | if self.read_only or not self.user_has_permissions(REPORT_PERM): | 453 | if self.server.read_only or not self.user_has_permissions(REPORT_PERM): |
465 | return await self.report_readonly(data) | 454 | return await self.report_readonly(data) |
466 | 455 | ||
467 | outhash_data = { | 456 | outhash_data = { |
@@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
538 | @permissions(READ_PERM) | 527 | @permissions(READ_PERM) |
539 | async def handle_get_stats(self, request): | 528 | async def handle_get_stats(self, request): |
540 | return { | 529 | return { |
541 | "requests": self.request_stats.todict(), | 530 | "requests": self.server.request_stats.todict(), |
542 | } | 531 | } |
543 | 532 | ||
544 | @permissions(DB_ADMIN_PERM) | 533 | @permissions(DB_ADMIN_PERM) |
545 | async def handle_reset_stats(self, request): | 534 | async def handle_reset_stats(self, request): |
546 | d = { | 535 | d = { |
547 | "requests": self.request_stats.todict(), | 536 | "requests": self.server.request_stats.todict(), |
548 | } | 537 | } |
549 | 538 | ||
550 | self.request_stats.reset() | 539 | self.server.request_stats.reset() |
551 | return d | 540 | return d |
552 | 541 | ||
553 | @permissions(READ_PERM) | 542 | @permissions(READ_PERM) |
554 | async def handle_backfill_wait(self, request): | 543 | async def handle_backfill_wait(self, request): |
555 | d = { | 544 | d = { |
556 | "tasks": self.backfill_queue.qsize(), | 545 | "tasks": self.server.backfill_queue.qsize(), |
557 | } | 546 | } |
558 | await self.backfill_queue.join() | 547 | await self.server.backfill_queue.join() |
559 | return d | 548 | return d |
560 | 549 | ||
561 | @permissions(DB_ADMIN_PERM) | 550 | @permissions(DB_ADMIN_PERM) |
@@ -567,6 +556,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
567 | return {"count": await self.db.remove(condition)} | 556 | return {"count": await self.db.remove(condition)} |
568 | 557 | ||
569 | @permissions(DB_ADMIN_PERM) | 558 | @permissions(DB_ADMIN_PERM) |
559 | async def handle_gc_mark(self, request): | ||
560 | condition = request["where"] | ||
561 | mark = request["mark"] | ||
562 | |||
563 | if not isinstance(condition, dict): | ||
564 | raise TypeError("Bad condition type %s" % type(condition)) | ||
565 | |||
566 | if not isinstance(mark, str): | ||
567 | raise TypeError("Bad mark type %s" % type(mark)) | ||
568 | |||
569 | return {"count": await self.db.gc_mark(mark, condition)} | ||
570 | |||
571 | @permissions(DB_ADMIN_PERM) | ||
572 | async def handle_gc_sweep(self, request): | ||
573 | mark = request["mark"] | ||
574 | |||
575 | if not isinstance(mark, str): | ||
576 | raise TypeError("Bad mark type %s" % type(mark)) | ||
577 | |||
578 | current_mark = await self.db.get_current_gc_mark() | ||
579 | |||
580 | if not current_mark or mark != current_mark: | ||
581 | raise bb.asyncrpc.InvokeError( | ||
582 | f"'{mark}' is not the current mark. Refusing to sweep" | ||
583 | ) | ||
584 | |||
585 | count = await self.db.gc_sweep() | ||
586 | |||
587 | return {"count": count} | ||
588 | |||
589 | @permissions(DB_ADMIN_PERM) | ||
590 | async def handle_gc_status(self, request): | ||
591 | (keep_rows, remove_rows, current_mark) = await self.db.gc_status() | ||
592 | return { | ||
593 | "keep": keep_rows, | ||
594 | "remove": remove_rows, | ||
595 | "mark": current_mark, | ||
596 | } | ||
597 | |||
598 | @permissions(DB_ADMIN_PERM) | ||
570 | async def handle_clean_unused(self, request): | 599 | async def handle_clean_unused(self, request): |
571 | max_age = request["max_age_seconds"] | 600 | max_age = request["max_age_seconds"] |
572 | oldest = datetime.now() - timedelta(seconds=-max_age) | 601 | oldest = datetime.now() - timedelta(seconds=-max_age) |
@@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer): | |||
779 | ) | 808 | ) |
780 | 809 | ||
781 | def accept_client(self, socket): | 810 | def accept_client(self, socket): |
782 | return ServerClient( | 811 | return ServerClient(socket, self) |
783 | socket, | ||
784 | self.db_engine, | ||
785 | self.request_stats, | ||
786 | self.backfill_queue, | ||
787 | self.upstream, | ||
788 | self.read_only, | ||
789 | self.anon_perms, | ||
790 | ) | ||
791 | 812 | ||
792 | async def create_admin_user(self): | 813 | async def create_admin_user(self): |
793 | admin_permissions = (ALL_PERM,) | 814 | admin_permissions = (ALL_PERM,) |
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index cee04bffb0..89a6b86d9d 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
@@ -28,6 +28,7 @@ from sqlalchemy import ( | |||
28 | delete, | 28 | delete, |
29 | update, | 29 | update, |
30 | func, | 30 | func, |
31 | inspect, | ||
31 | ) | 32 | ) |
32 | import sqlalchemy.engine | 33 | import sqlalchemy.engine |
33 | from sqlalchemy.orm import declarative_base | 34 | from sqlalchemy.orm import declarative_base |
@@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError | |||
36 | Base = declarative_base() | 37 | Base = declarative_base() |
37 | 38 | ||
38 | 39 | ||
39 | class UnihashesV2(Base): | 40 | class UnihashesV3(Base): |
40 | __tablename__ = "unihashes_v2" | 41 | __tablename__ = "unihashes_v3" |
41 | id = Column(Integer, primary_key=True, autoincrement=True) | 42 | id = Column(Integer, primary_key=True, autoincrement=True) |
42 | method = Column(Text, nullable=False) | 43 | method = Column(Text, nullable=False) |
43 | taskhash = Column(Text, nullable=False) | 44 | taskhash = Column(Text, nullable=False) |
44 | unihash = Column(Text, nullable=False) | 45 | unihash = Column(Text, nullable=False) |
46 | gc_mark = Column(Text, nullable=False) | ||
45 | 47 | ||
46 | __table_args__ = ( | 48 | __table_args__ = ( |
47 | UniqueConstraint("method", "taskhash"), | 49 | UniqueConstraint("method", "taskhash"), |
48 | Index("taskhash_lookup_v3", "method", "taskhash"), | 50 | Index("taskhash_lookup_v4", "method", "taskhash"), |
49 | ) | 51 | ) |
50 | 52 | ||
51 | 53 | ||
@@ -79,6 +81,36 @@ class Users(Base): | |||
79 | __table_args__ = (UniqueConstraint("username"),) | 81 | __table_args__ = (UniqueConstraint("username"),) |
80 | 82 | ||
81 | 83 | ||
84 | class Config(Base): | ||
85 | __tablename__ = "config" | ||
86 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
87 | name = Column(Text, nullable=False) | ||
88 | value = Column(Text) | ||
89 | __table_args__ = ( | ||
90 | UniqueConstraint("name"), | ||
91 | Index("config_lookup", "name"), | ||
92 | ) | ||
93 | |||
94 | |||
95 | # | ||
96 | # Old table versions | ||
97 | # | ||
98 | DeprecatedBase = declarative_base() | ||
99 | |||
100 | |||
101 | class UnihashesV2(DeprecatedBase): | ||
102 | __tablename__ = "unihashes_v2" | ||
103 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
104 | method = Column(Text, nullable=False) | ||
105 | taskhash = Column(Text, nullable=False) | ||
106 | unihash = Column(Text, nullable=False) | ||
107 | |||
108 | __table_args__ = ( | ||
109 | UniqueConstraint("method", "taskhash"), | ||
110 | Index("taskhash_lookup_v3", "method", "taskhash"), | ||
111 | ) | ||
112 | |||
113 | |||
82 | class DatabaseEngine(object): | 114 | class DatabaseEngine(object): |
83 | def __init__(self, url, username=None, password=None): | 115 | def __init__(self, url, username=None, password=None): |
84 | self.logger = logging.getLogger("hashserv.sqlalchemy") | 116 | self.logger = logging.getLogger("hashserv.sqlalchemy") |
@@ -91,6 +123,9 @@ class DatabaseEngine(object): | |||
91 | self.url = self.url.set(password=password) | 123 | self.url = self.url.set(password=password) |
92 | 124 | ||
93 | async def create(self): | 125 | async def create(self): |
126 | def check_table_exists(conn, name): | ||
127 | return inspect(conn).has_table(name) | ||
128 | |||
94 | self.logger.info("Using database %s", self.url) | 129 | self.logger.info("Using database %s", self.url) |
95 | self.engine = create_async_engine(self.url, poolclass=NullPool) | 130 | self.engine = create_async_engine(self.url, poolclass=NullPool) |
96 | 131 | ||
@@ -99,6 +134,24 @@ class DatabaseEngine(object): | |||
99 | self.logger.info("Creating tables...") | 134 | self.logger.info("Creating tables...") |
100 | await conn.run_sync(Base.metadata.create_all) | 135 | await conn.run_sync(Base.metadata.create_all) |
101 | 136 | ||
137 | if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__): | ||
138 | self.logger.info("Upgrading Unihashes V2 -> V3...") | ||
139 | statement = insert(UnihashesV3).from_select( | ||
140 | ["id", "method", "unihash", "taskhash", "gc_mark"], | ||
141 | select( | ||
142 | UnihashesV2.id, | ||
143 | UnihashesV2.method, | ||
144 | UnihashesV2.unihash, | ||
145 | UnihashesV2.taskhash, | ||
146 | literal("").label("gc_mark"), | ||
147 | ), | ||
148 | ) | ||
149 | self.logger.debug("%s", statement) | ||
150 | await conn.execute(statement) | ||
151 | |||
152 | await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__]) | ||
153 | self.logger.info("Upgrade complete") | ||
154 | |||
102 | def connect(self, logger): | 155 | def connect(self, logger): |
103 | return Database(self.engine, logger) | 156 | return Database(self.engine, logger) |
104 | 157 | ||
@@ -118,6 +171,15 @@ def map_user(row): | |||
118 | ) | 171 | ) |
119 | 172 | ||
120 | 173 | ||
174 | def _make_condition_statement(table, condition): | ||
175 | where = {} | ||
176 | for c in table.__table__.columns: | ||
177 | if c.key in condition and condition[c.key] is not None: | ||
178 | where[c] = condition[c.key] | ||
179 | |||
180 | return [(k == v) for k, v in where.items()] | ||
181 | |||
182 | |||
121 | class Database(object): | 183 | class Database(object): |
122 | def __init__(self, engine, logger): | 184 | def __init__(self, engine, logger): |
123 | self.engine = engine | 185 | self.engine = engine |
@@ -135,17 +197,52 @@ class Database(object): | |||
135 | await self.db.close() | 197 | await self.db.close() |
136 | self.db = None | 198 | self.db = None |
137 | 199 | ||
200 | async def _execute(self, statement): | ||
201 | self.logger.debug("%s", statement) | ||
202 | return await self.db.execute(statement) | ||
203 | |||
204 | async def _set_config(self, name, value): | ||
205 | while True: | ||
206 | result = await self._execute( | ||
207 | update(Config).where(Config.name == name).values(value=value) | ||
208 | ) | ||
209 | |||
210 | if result.rowcount == 0: | ||
211 | self.logger.debug("Config '%s' not found. Adding it", name) | ||
212 | try: | ||
213 | await self._execute(insert(Config).values(name=name, value=value)) | ||
214 | except IntegrityError: | ||
215 | # Race. Try again | ||
216 | continue | ||
217 | |||
218 | break | ||
219 | |||
220 | def _get_config_subquery(self, name, default=None): | ||
221 | if default is not None: | ||
222 | return func.coalesce( | ||
223 | select(Config.value).where(Config.name == name).scalar_subquery(), | ||
224 | default, | ||
225 | ) | ||
226 | return select(Config.value).where(Config.name == name).scalar_subquery() | ||
227 | |||
228 | async def _get_config(self, name): | ||
229 | result = await self._execute(select(Config.value).where(Config.name == name)) | ||
230 | row = result.first() | ||
231 | if row is None: | ||
232 | return None | ||
233 | return row.value | ||
234 | |||
138 | async def get_unihash_by_taskhash_full(self, method, taskhash): | 235 | async def get_unihash_by_taskhash_full(self, method, taskhash): |
139 | statement = ( | 236 | statement = ( |
140 | select( | 237 | select( |
141 | OuthashesV2, | 238 | OuthashesV2, |
142 | UnihashesV2.unihash.label("unihash"), | 239 | UnihashesV3.unihash.label("unihash"), |
143 | ) | 240 | ) |
144 | .join( | 241 | .join( |
145 | UnihashesV2, | 242 | UnihashesV3, |
146 | and_( | 243 | and_( |
147 | UnihashesV2.method == OuthashesV2.method, | 244 | UnihashesV3.method == OuthashesV2.method, |
148 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 245 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
149 | ), | 246 | ), |
150 | ) | 247 | ) |
151 | .where( | 248 | .where( |
@@ -164,12 +261,12 @@ class Database(object): | |||
164 | 261 | ||
165 | async def get_unihash_by_outhash(self, method, outhash): | 262 | async def get_unihash_by_outhash(self, method, outhash): |
166 | statement = ( | 263 | statement = ( |
167 | select(OuthashesV2, UnihashesV2.unihash.label("unihash")) | 264 | select(OuthashesV2, UnihashesV3.unihash.label("unihash")) |
168 | .join( | 265 | .join( |
169 | UnihashesV2, | 266 | UnihashesV3, |
170 | and_( | 267 | and_( |
171 | UnihashesV2.method == OuthashesV2.method, | 268 | UnihashesV3.method == OuthashesV2.method, |
172 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 269 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
173 | ), | 270 | ), |
174 | ) | 271 | ) |
175 | .where( | 272 | .where( |
@@ -208,13 +305,13 @@ class Database(object): | |||
208 | statement = ( | 305 | statement = ( |
209 | select( | 306 | select( |
210 | OuthashesV2.taskhash.label("taskhash"), | 307 | OuthashesV2.taskhash.label("taskhash"), |
211 | UnihashesV2.unihash.label("unihash"), | 308 | UnihashesV3.unihash.label("unihash"), |
212 | ) | 309 | ) |
213 | .join( | 310 | .join( |
214 | UnihashesV2, | 311 | UnihashesV3, |
215 | and_( | 312 | and_( |
216 | UnihashesV2.method == OuthashesV2.method, | 313 | UnihashesV3.method == OuthashesV2.method, |
217 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 314 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
218 | ), | 315 | ), |
219 | ) | 316 | ) |
220 | .where( | 317 | .where( |
@@ -234,12 +331,12 @@ class Database(object): | |||
234 | 331 | ||
235 | async def get_equivalent(self, method, taskhash): | 332 | async def get_equivalent(self, method, taskhash): |
236 | statement = select( | 333 | statement = select( |
237 | UnihashesV2.unihash, | 334 | UnihashesV3.unihash, |
238 | UnihashesV2.method, | 335 | UnihashesV3.method, |
239 | UnihashesV2.taskhash, | 336 | UnihashesV3.taskhash, |
240 | ).where( | 337 | ).where( |
241 | UnihashesV2.method == method, | 338 | UnihashesV3.method == method, |
242 | UnihashesV2.taskhash == taskhash, | 339 | UnihashesV3.taskhash == taskhash, |
243 | ) | 340 | ) |
244 | self.logger.debug("%s", statement) | 341 | self.logger.debug("%s", statement) |
245 | async with self.db.begin(): | 342 | async with self.db.begin(): |
@@ -248,13 +345,9 @@ class Database(object): | |||
248 | 345 | ||
249 | async def remove(self, condition): | 346 | async def remove(self, condition): |
250 | async def do_remove(table): | 347 | async def do_remove(table): |
251 | where = {} | 348 | where = _make_condition_statement(table, condition) |
252 | for c in table.__table__.columns: | ||
253 | if c.key in condition and condition[c.key] is not None: | ||
254 | where[c] = condition[c.key] | ||
255 | |||
256 | if where: | 349 | if where: |
257 | statement = delete(table).where(*[(k == v) for k, v in where.items()]) | 350 | statement = delete(table).where(*where) |
258 | self.logger.debug("%s", statement) | 351 | self.logger.debug("%s", statement) |
259 | async with self.db.begin(): | 352 | async with self.db.begin(): |
260 | result = await self.db.execute(statement) | 353 | result = await self.db.execute(statement) |
@@ -263,19 +356,74 @@ class Database(object): | |||
263 | return 0 | 356 | return 0 |
264 | 357 | ||
265 | count = 0 | 358 | count = 0 |
266 | count += await do_remove(UnihashesV2) | 359 | count += await do_remove(UnihashesV3) |
267 | count += await do_remove(OuthashesV2) | 360 | count += await do_remove(OuthashesV2) |
268 | 361 | ||
269 | return count | 362 | return count |
270 | 363 | ||
364 | async def get_current_gc_mark(self): | ||
365 | async with self.db.begin(): | ||
366 | return await self._get_config("gc-mark") | ||
367 | |||
368 | async def gc_status(self): | ||
369 | async with self.db.begin(): | ||
370 | gc_mark_subquery = self._get_config_subquery("gc-mark", "") | ||
371 | |||
372 | result = await self._execute( | ||
373 | select(func.count()) | ||
374 | .select_from(UnihashesV3) | ||
375 | .where(UnihashesV3.gc_mark == gc_mark_subquery) | ||
376 | ) | ||
377 | keep_rows = result.scalar() | ||
378 | |||
379 | result = await self._execute( | ||
380 | select(func.count()) | ||
381 | .select_from(UnihashesV3) | ||
382 | .where(UnihashesV3.gc_mark != gc_mark_subquery) | ||
383 | ) | ||
384 | remove_rows = result.scalar() | ||
385 | |||
386 | return (keep_rows, remove_rows, await self._get_config("gc-mark")) | ||
387 | |||
388 | async def gc_mark(self, mark, condition): | ||
389 | async with self.db.begin(): | ||
390 | await self._set_config("gc-mark", mark) | ||
391 | |||
392 | where = _make_condition_statement(UnihashesV3, condition) | ||
393 | if not where: | ||
394 | return 0 | ||
395 | |||
396 | result = await self._execute( | ||
397 | update(UnihashesV3) | ||
398 | .values(gc_mark=self._get_config_subquery("gc-mark", "")) | ||
399 | .where(*where) | ||
400 | ) | ||
401 | return result.rowcount | ||
402 | |||
403 | async def gc_sweep(self): | ||
404 | async with self.db.begin(): | ||
405 | result = await self._execute( | ||
406 | delete(UnihashesV3).where( | ||
407 | # A sneaky conditional that provides some errant use | ||
408 | # protection: If the config mark is NULL, this will not | ||
409 | # match any rows because No default is specified in the | ||
410 | # select statement | ||
411 | UnihashesV3.gc_mark | ||
412 | != self._get_config_subquery("gc-mark") | ||
413 | ) | ||
414 | ) | ||
415 | await self._set_config("gc-mark", None) | ||
416 | |||
417 | return result.rowcount | ||
418 | |||
271 | async def clean_unused(self, oldest): | 419 | async def clean_unused(self, oldest): |
272 | statement = delete(OuthashesV2).where( | 420 | statement = delete(OuthashesV2).where( |
273 | OuthashesV2.created < oldest, | 421 | OuthashesV2.created < oldest, |
274 | ~( | 422 | ~( |
275 | select(UnihashesV2.id) | 423 | select(UnihashesV3.id) |
276 | .where( | 424 | .where( |
277 | UnihashesV2.method == OuthashesV2.method, | 425 | UnihashesV3.method == OuthashesV2.method, |
278 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 426 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
279 | ) | 427 | ) |
280 | .limit(1) | 428 | .limit(1) |
281 | .exists() | 429 | .exists() |
@@ -287,15 +435,17 @@ class Database(object): | |||
287 | return result.rowcount | 435 | return result.rowcount |
288 | 436 | ||
289 | async def insert_unihash(self, method, taskhash, unihash): | 437 | async def insert_unihash(self, method, taskhash, unihash): |
290 | statement = insert(UnihashesV2).values( | ||
291 | method=method, | ||
292 | taskhash=taskhash, | ||
293 | unihash=unihash, | ||
294 | ) | ||
295 | self.logger.debug("%s", statement) | ||
296 | try: | 438 | try: |
297 | async with self.db.begin(): | 439 | async with self.db.begin(): |
298 | await self.db.execute(statement) | 440 | await self._execute( |
441 | insert(UnihashesV3).values( | ||
442 | method=method, | ||
443 | taskhash=taskhash, | ||
444 | unihash=unihash, | ||
445 | gc_mark=self._get_config_subquery("gc-mark", ""), | ||
446 | ) | ||
447 | ) | ||
448 | |||
299 | return True | 449 | return True |
300 | except IntegrityError: | 450 | except IntegrityError: |
301 | self.logger.debug( | 451 | self.logger.debug( |
@@ -418,7 +568,7 @@ class Database(object): | |||
418 | 568 | ||
419 | async def get_query_columns(self): | 569 | async def get_query_columns(self): |
420 | columns = set() | 570 | columns = set() |
421 | for table in (UnihashesV2, OuthashesV2): | 571 | for table in (UnihashesV3, OuthashesV2): |
422 | for c in table.__table__.columns: | 572 | for c in table.__table__.columns: |
423 | if not isinstance(c.type, Text): | 573 | if not isinstance(c.type, Text): |
424 | continue | 574 | continue |
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index f93cb2c1dd..608490730d 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py | |||
@@ -15,6 +15,7 @@ UNIHASH_TABLE_DEFINITION = ( | |||
15 | ("method", "TEXT NOT NULL", "UNIQUE"), | 15 | ("method", "TEXT NOT NULL", "UNIQUE"), |
16 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | 16 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
17 | ("unihash", "TEXT NOT NULL", ""), | 17 | ("unihash", "TEXT NOT NULL", ""), |
18 | ("gc_mark", "TEXT NOT NULL", ""), | ||
18 | ) | 19 | ) |
19 | 20 | ||
20 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) | 21 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) |
@@ -44,6 +45,14 @@ USERS_TABLE_DEFINITION = ( | |||
44 | USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) | 45 | USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) |
45 | 46 | ||
46 | 47 | ||
48 | CONFIG_TABLE_DEFINITION = ( | ||
49 | ("name", "TEXT NOT NULL", "UNIQUE"), | ||
50 | ("value", "TEXT", ""), | ||
51 | ) | ||
52 | |||
53 | CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) | ||
54 | |||
55 | |||
47 | def _make_table(cursor, name, definition): | 56 | def _make_table(cursor, name, definition): |
48 | cursor.execute( | 57 | cursor.execute( |
49 | """ | 58 | """ |
@@ -71,6 +80,35 @@ def map_user(row): | |||
71 | ) | 80 | ) |
72 | 81 | ||
73 | 82 | ||
83 | def _make_condition_statement(columns, condition): | ||
84 | where = {} | ||
85 | for c in columns: | ||
86 | if c in condition and condition[c] is not None: | ||
87 | where[c] = condition[c] | ||
88 | |||
89 | return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys()) | ||
90 | |||
91 | |||
92 | def _get_sqlite_version(cursor): | ||
93 | cursor.execute("SELECT sqlite_version()") | ||
94 | |||
95 | version = [] | ||
96 | for v in cursor.fetchone()[0].split("."): | ||
97 | try: | ||
98 | version.append(int(v)) | ||
99 | except ValueError: | ||
100 | version.append(v) | ||
101 | |||
102 | return tuple(version) | ||
103 | |||
104 | |||
105 | def _schema_table_name(version): | ||
106 | if version >= (3, 33): | ||
107 | return "sqlite_schema" | ||
108 | |||
109 | return "sqlite_master" | ||
110 | |||
111 | |||
74 | class DatabaseEngine(object): | 112 | class DatabaseEngine(object): |
75 | def __init__(self, dbname, sync): | 113 | def __init__(self, dbname, sync): |
76 | self.dbname = dbname | 114 | self.dbname = dbname |
@@ -82,9 +120,10 @@ class DatabaseEngine(object): | |||
82 | db.row_factory = sqlite3.Row | 120 | db.row_factory = sqlite3.Row |
83 | 121 | ||
84 | with closing(db.cursor()) as cursor: | 122 | with closing(db.cursor()) as cursor: |
85 | _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) | 123 | _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION) |
86 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) | 124 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) |
87 | _make_table(cursor, "users", USERS_TABLE_DEFINITION) | 125 | _make_table(cursor, "users", USERS_TABLE_DEFINITION) |
126 | _make_table(cursor, "config", CONFIG_TABLE_DEFINITION) | ||
88 | 127 | ||
89 | cursor.execute("PRAGMA journal_mode = WAL") | 128 | cursor.execute("PRAGMA journal_mode = WAL") |
90 | cursor.execute( | 129 | cursor.execute( |
@@ -96,17 +135,38 @@ class DatabaseEngine(object): | |||
96 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup") | 135 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup") |
97 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") | 136 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") |
98 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") | 137 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") |
138 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3") | ||
99 | 139 | ||
100 | # TODO: Upgrade from tasks_v2? | 140 | # TODO: Upgrade from tasks_v2? |
101 | cursor.execute("DROP TABLE IF EXISTS tasks_v2") | 141 | cursor.execute("DROP TABLE IF EXISTS tasks_v2") |
102 | 142 | ||
103 | # Create new indexes | 143 | # Create new indexes |
104 | cursor.execute( | 144 | cursor.execute( |
105 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" | 145 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" |
106 | ) | 146 | ) |
107 | cursor.execute( | 147 | cursor.execute( |
108 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" | 148 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" |
109 | ) | 149 | ) |
150 | cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") | ||
151 | |||
152 | sqlite_version = _get_sqlite_version(cursor) | ||
153 | |||
154 | cursor.execute( | ||
155 | f""" | ||
156 | SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2' | ||
157 | """ | ||
158 | ) | ||
159 | if cursor.fetchone(): | ||
160 | self.logger.info("Upgrading Unihashes V2 -> V3...") | ||
161 | cursor.execute( | ||
162 | """ | ||
163 | INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark) | ||
164 | SELECT id, method, unihash, taskhash, '' FROM unihashes_v2 | ||
165 | """ | ||
166 | ) | ||
167 | cursor.execute("DROP TABLE unihashes_v2") | ||
168 | db.commit() | ||
169 | self.logger.info("Upgrade complete") | ||
110 | 170 | ||
111 | def connect(self, logger): | 171 | def connect(self, logger): |
112 | return Database(logger, self.dbname, self.sync) | 172 | return Database(logger, self.dbname, self.sync) |
@@ -126,16 +186,7 @@ class Database(object): | |||
126 | "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") | 186 | "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") |
127 | ) | 187 | ) |
128 | 188 | ||
129 | cursor.execute("SELECT sqlite_version()") | 189 | self.sqlite_version = _get_sqlite_version(cursor) |
130 | |||
131 | version = [] | ||
132 | for v in cursor.fetchone()[0].split("."): | ||
133 | try: | ||
134 | version.append(int(v)) | ||
135 | except ValueError: | ||
136 | version.append(v) | ||
137 | |||
138 | self.sqlite_version = tuple(version) | ||
139 | 190 | ||
140 | async def __aenter__(self): | 191 | async def __aenter__(self): |
141 | return self | 192 | return self |
@@ -143,6 +194,30 @@ class Database(object): | |||
143 | async def __aexit__(self, exc_type, exc_value, traceback): | 194 | async def __aexit__(self, exc_type, exc_value, traceback): |
144 | await self.close() | 195 | await self.close() |
145 | 196 | ||
197 | async def _set_config(self, cursor, name, value): | ||
198 | cursor.execute( | ||
199 | """ | ||
200 | INSERT OR REPLACE INTO config (id, name, value) VALUES | ||
201 | ((SELECT id FROM config WHERE name=:name), :name, :value) | ||
202 | """, | ||
203 | { | ||
204 | "name": name, | ||
205 | "value": value, | ||
206 | }, | ||
207 | ) | ||
208 | |||
209 | async def _get_config(self, cursor, name): | ||
210 | cursor.execute( | ||
211 | "SELECT value FROM config WHERE name=:name", | ||
212 | { | ||
213 | "name": name, | ||
214 | }, | ||
215 | ) | ||
216 | row = cursor.fetchone() | ||
217 | if row is None: | ||
218 | return None | ||
219 | return row["value"] | ||
220 | |||
146 | async def close(self): | 221 | async def close(self): |
147 | self.db.close() | 222 | self.db.close() |
148 | 223 | ||
@@ -150,8 +225,8 @@ class Database(object): | |||
150 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
151 | cursor.execute( | 226 | cursor.execute( |
152 | """ | 227 | """ |
153 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | 228 | SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 |
154 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | 229 | INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash |
155 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | 230 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash |
156 | ORDER BY outhashes_v2.created ASC | 231 | ORDER BY outhashes_v2.created ASC |
157 | LIMIT 1 | 232 | LIMIT 1 |
@@ -167,8 +242,8 @@ class Database(object): | |||
167 | with closing(self.db.cursor()) as cursor: | 242 | with closing(self.db.cursor()) as cursor: |
168 | cursor.execute( | 243 | cursor.execute( |
169 | """ | 244 | """ |
170 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | 245 | SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 |
171 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | 246 | INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash |
172 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | 247 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash |
173 | ORDER BY outhashes_v2.created ASC | 248 | ORDER BY outhashes_v2.created ASC |
174 | LIMIT 1 | 249 | LIMIT 1 |
@@ -200,8 +275,8 @@ class Database(object): | |||
200 | with closing(self.db.cursor()) as cursor: | 275 | with closing(self.db.cursor()) as cursor: |
201 | cursor.execute( | 276 | cursor.execute( |
202 | """ | 277 | """ |
203 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | 278 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2 |
204 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | 279 | INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash |
205 | -- Select any matching output hash except the one we just inserted | 280 | -- Select any matching output hash except the one we just inserted |
206 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | 281 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash |
207 | -- Pick the oldest hash | 282 | -- Pick the oldest hash |
@@ -219,7 +294,7 @@ class Database(object): | |||
219 | async def get_equivalent(self, method, taskhash): | 294 | async def get_equivalent(self, method, taskhash): |
220 | with closing(self.db.cursor()) as cursor: | 295 | with closing(self.db.cursor()) as cursor: |
221 | cursor.execute( | 296 | cursor.execute( |
222 | "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", | 297 | "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash", |
223 | { | 298 | { |
224 | "method": method, | 299 | "method": method, |
225 | "taskhash": taskhash, | 300 | "taskhash": taskhash, |
@@ -229,15 +304,9 @@ class Database(object): | |||
229 | 304 | ||
230 | async def remove(self, condition): | 305 | async def remove(self, condition): |
231 | def do_remove(columns, table_name, cursor): | 306 | def do_remove(columns, table_name, cursor): |
232 | where = {} | 307 | where, clause = _make_condition_statement(columns, condition) |
233 | for c in columns: | ||
234 | if c in condition and condition[c] is not None: | ||
235 | where[c] = condition[c] | ||
236 | |||
237 | if where: | 308 | if where: |
238 | query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( | 309 | query = f"DELETE FROM {table_name} WHERE {clause}" |
239 | "%s=:%s" % (k, k) for k in where.keys() | ||
240 | ) | ||
241 | cursor.execute(query, where) | 310 | cursor.execute(query, where) |
242 | return cursor.rowcount | 311 | return cursor.rowcount |
243 | 312 | ||
@@ -246,17 +315,80 @@ class Database(object): | |||
246 | count = 0 | 315 | count = 0 |
247 | with closing(self.db.cursor()) as cursor: | 316 | with closing(self.db.cursor()) as cursor: |
248 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) | 317 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) |
249 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 318 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor) |
250 | self.db.commit() | 319 | self.db.commit() |
251 | 320 | ||
252 | return count | 321 | return count |
253 | 322 | ||
323 | async def get_current_gc_mark(self): | ||
324 | with closing(self.db.cursor()) as cursor: | ||
325 | return await self._get_config(cursor, "gc-mark") | ||
326 | |||
327 | async def gc_status(self): | ||
328 | with closing(self.db.cursor()) as cursor: | ||
329 | cursor.execute( | ||
330 | """ | ||
331 | SELECT COUNT() FROM unihashes_v3 WHERE | ||
332 | gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
333 | """ | ||
334 | ) | ||
335 | keep_rows = cursor.fetchone()[0] | ||
336 | |||
337 | cursor.execute( | ||
338 | """ | ||
339 | SELECT COUNT() FROM unihashes_v3 WHERE | ||
340 | gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
341 | """ | ||
342 | ) | ||
343 | remove_rows = cursor.fetchone()[0] | ||
344 | |||
345 | current_mark = await self._get_config(cursor, "gc-mark") | ||
346 | |||
347 | return (keep_rows, remove_rows, current_mark) | ||
348 | |||
349 | async def gc_mark(self, mark, condition): | ||
350 | with closing(self.db.cursor()) as cursor: | ||
351 | await self._set_config(cursor, "gc-mark", mark) | ||
352 | |||
353 | where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition) | ||
354 | |||
355 | new_rows = 0 | ||
356 | if where: | ||
357 | cursor.execute( | ||
358 | f""" | ||
359 | UPDATE unihashes_v3 SET | ||
360 | gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
361 | WHERE {clause} | ||
362 | """, | ||
363 | where, | ||
364 | ) | ||
365 | new_rows = cursor.rowcount | ||
366 | |||
367 | self.db.commit() | ||
368 | return new_rows | ||
369 | |||
370 | async def gc_sweep(self): | ||
371 | with closing(self.db.cursor()) as cursor: | ||
372 | # NOTE: COALESCE is not used in this query so that if the current | ||
373 | # mark is NULL, nothing will happen | ||
374 | cursor.execute( | ||
375 | """ | ||
376 | DELETE FROM unihashes_v3 WHERE | ||
377 | gc_mark!=(SELECT value FROM config WHERE name='gc-mark') | ||
378 | """ | ||
379 | ) | ||
380 | count = cursor.rowcount | ||
381 | await self._set_config(cursor, "gc-mark", None) | ||
382 | |||
383 | self.db.commit() | ||
384 | return count | ||
385 | |||
254 | async def clean_unused(self, oldest): | 386 | async def clean_unused(self, oldest): |
255 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
256 | cursor.execute( | 388 | cursor.execute( |
257 | """ | 389 | """ |
258 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( | 390 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( |
259 | SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 | 391 | SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1 |
260 | ) | 392 | ) |
261 | """, | 393 | """, |
262 | { | 394 | { |
@@ -271,7 +403,13 @@ class Database(object): | |||
271 | prevrowid = cursor.lastrowid | 403 | prevrowid = cursor.lastrowid |
272 | cursor.execute( | 404 | cursor.execute( |
273 | """ | 405 | """ |
274 | INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) | 406 | INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES |
407 | ( | ||
408 | :method, | ||
409 | :taskhash, | ||
410 | :unihash, | ||
411 | COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
412 | ) | ||
275 | """, | 413 | """, |
276 | { | 414 | { |
277 | "method": method, | 415 | "method": method, |
@@ -383,14 +521,9 @@ class Database(object): | |||
383 | async def get_usage(self): | 521 | async def get_usage(self): |
384 | usage = {} | 522 | usage = {} |
385 | with closing(self.db.cursor()) as cursor: | 523 | with closing(self.db.cursor()) as cursor: |
386 | if self.sqlite_version >= (3, 33): | ||
387 | table_name = "sqlite_schema" | ||
388 | else: | ||
389 | table_name = "sqlite_master" | ||
390 | |||
391 | cursor.execute( | 524 | cursor.execute( |
392 | f""" | 525 | f""" |
393 | SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' | 526 | SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' |
394 | """ | 527 | """ |
395 | ) | 528 | ) |
396 | for row in cursor.fetchall(): | 529 | for row in cursor.fetchall(): |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 869f7636c5..aeedab3575 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -810,6 +810,27 @@ class HashEquivalenceCommonTests(object): | |||
810 | with self.auth_perms("@user-admin") as client: | 810 | with self.auth_perms("@user-admin") as client: |
811 | become = client.become_user(client.username) | 811 | become = client.become_user(client.username) |
812 | 812 | ||
813 | def test_auth_gc(self): | ||
814 | admin_client = self.start_auth_server() | ||
815 | |||
816 | with self.auth_perms() as client, self.assertRaises(InvokeError): | ||
817 | client.gc_mark("ABC", {"unihash": "123"}) | ||
818 | |||
819 | with self.auth_perms() as client, self.assertRaises(InvokeError): | ||
820 | client.gc_status() | ||
821 | |||
822 | with self.auth_perms() as client, self.assertRaises(InvokeError): | ||
823 | client.gc_sweep("ABC") | ||
824 | |||
825 | with self.auth_perms("@db-admin") as client: | ||
826 | client.gc_mark("ABC", {"unihash": "123"}) | ||
827 | |||
828 | with self.auth_perms("@db-admin") as client: | ||
829 | client.gc_status() | ||
830 | |||
831 | with self.auth_perms("@db-admin") as client: | ||
832 | client.gc_sweep("ABC") | ||
833 | |||
813 | def test_get_db_usage(self): | 834 | def test_get_db_usage(self): |
814 | usage = self.client.get_db_usage() | 835 | usage = self.client.get_db_usage() |
815 | 836 | ||
@@ -837,6 +858,147 @@ class HashEquivalenceCommonTests(object): | |||
837 | data = client.get_taskhash(self.METHOD, taskhash, True) | 858 | data = client.get_taskhash(self.METHOD, taskhash, True) |
838 | self.assertEqual(data["owner"], user["username"]) | 859 | self.assertEqual(data["owner"], user["username"]) |
839 | 860 | ||
861 | def test_gc(self): | ||
862 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
863 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
864 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
865 | |||
866 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
867 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
868 | |||
869 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
870 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
871 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
872 | |||
873 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
874 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
875 | |||
876 | # Mark the first unihash to be kept | ||
877 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
878 | self.assertEqual(ret, {"count": 1}) | ||
879 | |||
880 | ret = self.client.gc_status() | ||
881 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) | ||
882 | |||
883 | # Second hash is still there; mark doesn't delete hashes | ||
884 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
885 | |||
886 | ret = self.client.gc_sweep("ABC") | ||
887 | self.assertEqual(ret, {"count": 1}) | ||
888 | |||
889 | # Hash is gone. Taskhash is returned for second hash | ||
890 | self.assertClientGetHash(self.client, taskhash2, None) | ||
891 | # First hash is still present | ||
892 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
893 | |||
894 | def test_gc_switch_mark(self): | ||
895 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
896 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
897 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
898 | |||
899 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
900 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
901 | |||
902 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
903 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
904 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
905 | |||
906 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
907 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
908 | |||
909 | # Mark the first unihash to be kept | ||
910 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
911 | self.assertEqual(ret, {"count": 1}) | ||
912 | |||
913 | ret = self.client.gc_status() | ||
914 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) | ||
915 | |||
916 | # Second hash is still there; mark doesn't delete hashes | ||
917 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
918 | |||
919 | # Switch to a different mark and mark the second hash. This will start | ||
920 | # a new collection cycle | ||
921 | ret = self.client.gc_mark("DEF", {"unihash": unihash2, "method": self.METHOD}) | ||
922 | self.assertEqual(ret, {"count": 1}) | ||
923 | |||
924 | ret = self.client.gc_status() | ||
925 | self.assertEqual(ret, {"mark": "DEF", "keep": 1, "remove": 1}) | ||
926 | |||
927 | # Both hashes are still present | ||
928 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
929 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
930 | |||
931 | # Sweep with the new mark | ||
932 | ret = self.client.gc_sweep("DEF") | ||
933 | self.assertEqual(ret, {"count": 1}) | ||
934 | |||
935 | # First hash is gone, second is kept | ||
936 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
937 | self.assertClientGetHash(self.client, taskhash, None) | ||
938 | |||
939 | def test_gc_switch_sweep_mark(self): | ||
940 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
941 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
942 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
943 | |||
944 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
945 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
946 | |||
947 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
948 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
949 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
950 | |||
951 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
952 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
953 | |||
954 | # Mark the first unihash to be kept | ||
955 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
956 | self.assertEqual(ret, {"count": 1}) | ||
957 | |||
958 | ret = self.client.gc_status() | ||
959 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) | ||
960 | |||
961 | # Sweeping with a different mark raises an error | ||
962 | with self.assertRaises(InvokeError): | ||
963 | self.client.gc_sweep("DEF") | ||
964 | |||
965 | # Both hashes are present | ||
966 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
967 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
968 | |||
969 | def test_gc_new_hashes(self): | ||
970 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
971 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
972 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
973 | |||
974 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
975 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
976 | |||
977 | # Start a new garbage collection | ||
978 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
979 | self.assertEqual(ret, {"count": 1}) | ||
980 | |||
981 | ret = self.client.gc_status() | ||
982 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 0}) | ||
983 | |||
984 | # Add second hash. It should inherit the mark from the current garbage | ||
985 | # collection operation | ||
986 | |||
987 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
988 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
989 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
990 | |||
991 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
992 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
993 | |||
994 | # Sweep should remove nothing | ||
995 | ret = self.client.gc_sweep("ABC") | ||
996 | self.assertEqual(ret, {"count": 0}) | ||
997 | |||
998 | # Both hashes are present | ||
999 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
1000 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
1001 | |||
840 | 1002 | ||
841 | class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): | 1003 | class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): |
842 | def get_server_addr(self, server_idx): | 1004 | def get_server_addr(self, server_idx): |
@@ -1086,6 +1248,42 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): | |||
1086 | "get-db-query-columns", | 1248 | "get-db-query-columns", |
1087 | ], check=True) | 1249 | ], check=True) |
1088 | 1250 | ||
1251 | def test_gc(self): | ||
1252 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
1253 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
1254 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
1255 | |||
1256 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
1257 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
1258 | |||
1259 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
1260 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
1261 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
1262 | |||
1263 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
1264 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
1265 | |||
1266 | # Mark the first unihash to be kept | ||
1267 | self.run_hashclient([ | ||
1268 | "--address", self.server_address, | ||
1269 | "gc-mark", "ABC", | ||
1270 | "--where", "unihash", unihash, | ||
1271 | "--where", "method", self.METHOD | ||
1272 | ], check=True) | ||
1273 | |||
1274 | # Second hash is still there; mark doesn't delete hashes | ||
1275 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
1276 | |||
1277 | self.run_hashclient([ | ||
1278 | "--address", self.server_address, | ||
1279 | "gc-sweep", "ABC", | ||
1280 | ], check=True) | ||
1281 | |||
1282 | # Hash is gone. Taskhash is returned for second hash | ||
1283 | self.assertClientGetHash(self.client, taskhash2, None) | ||
1284 | # First hash is still present | ||
1285 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
1286 | |||
1089 | 1287 | ||
1090 | class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | 1288 | class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): |
1091 | def get_server_addr(self, server_idx): | 1289 | def get_server_addr(self, server_idx): |