diff options
| author | Joshua Watt <JPEWhacker@gmail.com> | 2024-02-18 15:59:46 -0700 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-02-19 11:58:12 +0000 |
| commit | 1effd1014d9140905093efe25eeefedb28a10875 (patch) | |
| tree | b34fb1d26f020b361d22904695cab6b9a7c1ea50 | |
| parent | 324c9fd666117afb0dd689eaa8551bb02d6a042b (diff) | |
| download | poky-1effd1014d9140905093efe25eeefedb28a10875.tar.gz | |
bitbake: hashserv: Add Unihash Garbage Collection
Adds support for removing unused unihashes from the database. This is
done using a "mark and sweep" style of garbage collection where a
collection is started by marking which unihashes should be kept in the
database, then performing a sweep to remove any unmarked hashes.
(Bitbake rev: 433d4a075a1acfbd2a2913061739353a84bb01ed)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
| -rwxr-xr-x | bitbake/bin/bitbake-hashclient | 35 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 31 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 105 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 226 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/sqlite.py | 205 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 198 |
6 files changed, 684 insertions, 116 deletions
diff --git a/bitbake/bin/bitbake-hashclient b/bitbake/bin/bitbake-hashclient index 2cb6338666..f71b87404a 100755 --- a/bitbake/bin/bitbake-hashclient +++ b/bitbake/bin/bitbake-hashclient | |||
| @@ -195,6 +195,28 @@ def main(): | |||
| 195 | columns = client.get_db_query_columns() | 195 | columns = client.get_db_query_columns() |
| 196 | print("\n".join(sorted(columns))) | 196 | print("\n".join(sorted(columns))) |
| 197 | 197 | ||
| 198 | def handle_gc_status(args, client): | ||
| 199 | result = client.gc_status() | ||
| 200 | if not result["mark"]: | ||
| 201 | print("No Garbage collection in progress") | ||
| 202 | return 0 | ||
| 203 | |||
| 204 | print("Current Mark: %s" % result["mark"]) | ||
| 205 | print("Total hashes to keep: %d" % result["keep"]) | ||
| 206 | print("Total hashes to remove: %s" % result["remove"]) | ||
| 207 | return 0 | ||
| 208 | |||
| 209 | def handle_gc_mark(args, client): | ||
| 210 | where = {k: v for k, v in args.where} | ||
| 211 | result = client.gc_mark(args.mark, where) | ||
| 212 | print("New hashes marked: %d" % result["count"]) | ||
| 213 | return 0 | ||
| 214 | |||
| 215 | def handle_gc_sweep(args, client): | ||
| 216 | result = client.gc_sweep(args.mark) | ||
| 217 | print("Removed %d rows" % result["count"]) | ||
| 218 | return 0 | ||
| 219 | |||
| 198 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') | 220 | parser = argparse.ArgumentParser(description='Hash Equivalence Client') |
| 199 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') | 221 | parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")') |
| 200 | parser.add_argument('--log', default='WARNING', help='Set logging level') | 222 | parser.add_argument('--log', default='WARNING', help='Set logging level') |
| @@ -274,6 +296,19 @@ def main(): | |||
| 274 | db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") | 296 | db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries") |
| 275 | db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) | 297 | db_query_columns_parser.set_defaults(func=handle_get_db_query_columns) |
| 276 | 298 | ||
| 299 | gc_status_parser = subparsers.add_parser("gc-status", help="Show garbage collection status") | ||
| 300 | gc_status_parser.set_defaults(func=handle_gc_status) | ||
| 301 | |||
| 302 | gc_mark_parser = subparsers.add_parser('gc-mark', help="Mark hashes to be kept for garbage collection") | ||
| 303 | gc_mark_parser.add_argument("mark", help="Mark for this garbage collection operation") | ||
| 304 | gc_mark_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[], | ||
| 305 | help="Keep entries in table where KEY == VALUE") | ||
| 306 | gc_mark_parser.set_defaults(func=handle_gc_mark) | ||
| 307 | |||
| 308 | gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked") | ||
| 309 | gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation") | ||
| 310 | gc_sweep_parser.set_defaults(func=handle_gc_sweep) | ||
| 311 | |||
| 277 | args = parser.parse_args() | 312 | args = parser.parse_args() |
| 278 | 313 | ||
| 279 | logger = logging.getLogger('hashserv') | 314 | logger = logging.getLogger('hashserv') |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 35a97687fb..e6dc417912 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -194,6 +194,34 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 194 | await self._set_mode(self.MODE_NORMAL) | 194 | await self._set_mode(self.MODE_NORMAL) |
| 195 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] | 195 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] |
| 196 | 196 | ||
| 197 | async def gc_status(self): | ||
| 198 | await self._set_mode(self.MODE_NORMAL) | ||
| 199 | return await self.invoke({"gc-status": {}}) | ||
| 200 | |||
| 201 | async def gc_mark(self, mark, where): | ||
| 202 | """ | ||
| 203 | Starts a new garbage collection operation identified by "mark". If | ||
| 204 | garbage collection is already in progress with "mark", the collection | ||
| 205 | is continued. | ||
| 206 | |||
| 207 | All unihash entries that match the "where" clause are marked to be | ||
| 208 | kept. In addition, any new entries added to the database after this | ||
| 209 | command will be automatically marked with "mark" | ||
| 210 | """ | ||
| 211 | await self._set_mode(self.MODE_NORMAL) | ||
| 212 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | ||
| 213 | |||
| 214 | async def gc_sweep(self, mark): | ||
| 215 | """ | ||
| 216 | Finishes garbage collection for "mark". All unihash entries that have | ||
| 217 | not been marked will be deleted. | ||
| 218 | |||
| 219 | It is recommended to clean unused outhash entries after running this to | ||
| 220 | cleanup any dangling outhashes | ||
| 221 | """ | ||
| 222 | await self._set_mode(self.MODE_NORMAL) | ||
| 223 | return await self.invoke({"gc-sweep": {"mark": mark}}) | ||
| 224 | |||
| 197 | 225 | ||
| 198 | class Client(bb.asyncrpc.Client): | 226 | class Client(bb.asyncrpc.Client): |
| 199 | def __init__(self, username=None, password=None): | 227 | def __init__(self, username=None, password=None): |
| @@ -224,6 +252,9 @@ class Client(bb.asyncrpc.Client): | |||
| 224 | "become_user", | 252 | "become_user", |
| 225 | "get_db_usage", | 253 | "get_db_usage", |
| 226 | "get_db_query_columns", | 254 | "get_db_query_columns", |
| 255 | "gc_status", | ||
| 256 | "gc_mark", | ||
| 257 | "gc_sweep", | ||
| 227 | ) | 258 | ) |
| 228 | 259 | ||
| 229 | def _get_async_client(self): | 260 | def _get_async_client(self): |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a86507830e..5ed852d1f3 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
| 199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): | 199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): |
| 200 | if not self.user: | 200 | if not self.user: |
| 201 | username = "Anonymous user" | 201 | username = "Anonymous user" |
| 202 | user_perms = self.anon_perms | 202 | user_perms = self.server.anon_perms |
| 203 | else: | 203 | else: |
| 204 | username = self.user.username | 204 | username = self.user.username |
| 205 | user_perms = self.user.permissions | 205 | user_perms = self.user.permissions |
| @@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
| 223 | 223 | ||
| 224 | 224 | ||
| 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
| 226 | def __init__( | 226 | def __init__(self, socket, server): |
| 227 | self, | 227 | super().__init__(socket, "OEHASHEQUIV", server.logger) |
| 228 | socket, | 228 | self.server = server |
| 229 | db_engine, | ||
| 230 | request_stats, | ||
| 231 | backfill_queue, | ||
| 232 | upstream, | ||
| 233 | read_only, | ||
| 234 | anon_perms, | ||
| 235 | ): | ||
| 236 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
| 237 | self.db_engine = db_engine | ||
| 238 | self.request_stats = request_stats | ||
| 239 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 229 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
| 240 | self.backfill_queue = backfill_queue | ||
| 241 | self.upstream = upstream | ||
| 242 | self.read_only = read_only | ||
| 243 | self.user = None | 230 | self.user = None |
| 244 | self.anon_perms = anon_perms | ||
| 245 | 231 | ||
| 246 | self.handlers.update( | 232 | self.handlers.update( |
| 247 | { | 233 | { |
| @@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 261 | } | 247 | } |
| 262 | ) | 248 | ) |
| 263 | 249 | ||
| 264 | if not read_only: | 250 | if not self.server.read_only: |
| 265 | self.handlers.update( | 251 | self.handlers.update( |
| 266 | { | 252 | { |
| 267 | "report-equiv": self.handle_equivreport, | 253 | "report-equiv": self.handle_equivreport, |
| 268 | "reset-stats": self.handle_reset_stats, | 254 | "reset-stats": self.handle_reset_stats, |
| 269 | "backfill-wait": self.handle_backfill_wait, | 255 | "backfill-wait": self.handle_backfill_wait, |
| 270 | "remove": self.handle_remove, | 256 | "remove": self.handle_remove, |
| 257 | "gc-mark": self.handle_gc_mark, | ||
| 258 | "gc-sweep": self.handle_gc_sweep, | ||
| 259 | "gc-status": self.handle_gc_status, | ||
| 271 | "clean-unused": self.handle_clean_unused, | 260 | "clean-unused": self.handle_clean_unused, |
| 272 | "refresh-token": self.handle_refresh_token, | 261 | "refresh-token": self.handle_refresh_token, |
| 273 | "set-user-perms": self.handle_set_perms, | 262 | "set-user-perms": self.handle_set_perms, |
| @@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 282 | def user_has_permissions(self, *permissions, allow_anon=True): | 271 | def user_has_permissions(self, *permissions, allow_anon=True): |
| 283 | permissions = set(permissions) | 272 | permissions = set(permissions) |
| 284 | if allow_anon: | 273 | if allow_anon: |
| 285 | if ALL_PERM in self.anon_perms: | 274 | if ALL_PERM in self.server.anon_perms: |
| 286 | return True | 275 | return True |
| 287 | 276 | ||
| 288 | if not permissions - self.anon_perms: | 277 | if not permissions - self.server.anon_perms: |
| 289 | return True | 278 | return True |
| 290 | 279 | ||
| 291 | if self.user is None: | 280 | if self.user is None: |
| @@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 303 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) | 292 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
| 304 | 293 | ||
| 305 | async def process_requests(self): | 294 | async def process_requests(self): |
| 306 | async with self.db_engine.connect(self.logger) as db: | 295 | async with self.server.db_engine.connect(self.logger) as db: |
| 307 | self.db = db | 296 | self.db = db |
| 308 | if self.upstream is not None: | 297 | if self.server.upstream is not None: |
| 309 | self.upstream_client = await create_async_client(self.upstream) | 298 | self.upstream_client = await create_async_client(self.server.upstream) |
| 310 | else: | 299 | else: |
| 311 | self.upstream_client = None | 300 | self.upstream_client = None |
| 312 | 301 | ||
| @@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 323 | if "stream" in k: | 312 | if "stream" in k: |
| 324 | return await self.handlers[k](msg[k]) | 313 | return await self.handlers[k](msg[k]) |
| 325 | else: | 314 | else: |
| 326 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): | 315 | with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
| 327 | return await self.handlers[k](msg[k]) | 316 | return await self.handlers[k](msg[k]) |
| 328 | 317 | ||
| 329 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 318 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
| @@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 404 | # possible (which is why the request sample is handled manually | 393 | # possible (which is why the request sample is handled manually |
| 405 | # instead of using 'with', and also why logging statements are | 394 | # instead of using 'with', and also why logging statements are |
| 406 | # commented out. | 395 | # commented out. |
| 407 | self.request_sample = self.request_stats.start_sample() | 396 | self.request_sample = self.server.request_stats.start_sample() |
| 408 | request_measure = self.request_sample.measure() | 397 | request_measure = self.request_sample.measure() |
| 409 | request_measure.start() | 398 | request_measure.start() |
| 410 | 399 | ||
| @@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 435 | # Post to the backfill queue after writing the result to minimize | 424 | # Post to the backfill queue after writing the result to minimize |
| 436 | # the turn around time on a request | 425 | # the turn around time on a request |
| 437 | if upstream is not None: | 426 | if upstream is not None: |
| 438 | await self.backfill_queue.put((method, taskhash)) | 427 | await self.server.backfill_queue.put((method, taskhash)) |
| 439 | 428 | ||
| 440 | await self.socket.send("ok") | 429 | await self.socket.send("ok") |
| 441 | return self.NO_RESPONSE | 430 | return self.NO_RESPONSE |
| @@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 461 | # report is made inside the function | 450 | # report is made inside the function |
| 462 | @permissions(READ_PERM) | 451 | @permissions(READ_PERM) |
| 463 | async def handle_report(self, data): | 452 | async def handle_report(self, data): |
| 464 | if self.read_only or not self.user_has_permissions(REPORT_PERM): | 453 | if self.server.read_only or not self.user_has_permissions(REPORT_PERM): |
| 465 | return await self.report_readonly(data) | 454 | return await self.report_readonly(data) |
| 466 | 455 | ||
| 467 | outhash_data = { | 456 | outhash_data = { |
| @@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 538 | @permissions(READ_PERM) | 527 | @permissions(READ_PERM) |
| 539 | async def handle_get_stats(self, request): | 528 | async def handle_get_stats(self, request): |
| 540 | return { | 529 | return { |
| 541 | "requests": self.request_stats.todict(), | 530 | "requests": self.server.request_stats.todict(), |
| 542 | } | 531 | } |
| 543 | 532 | ||
| 544 | @permissions(DB_ADMIN_PERM) | 533 | @permissions(DB_ADMIN_PERM) |
| 545 | async def handle_reset_stats(self, request): | 534 | async def handle_reset_stats(self, request): |
| 546 | d = { | 535 | d = { |
| 547 | "requests": self.request_stats.todict(), | 536 | "requests": self.server.request_stats.todict(), |
| 548 | } | 537 | } |
| 549 | 538 | ||
| 550 | self.request_stats.reset() | 539 | self.server.request_stats.reset() |
| 551 | return d | 540 | return d |
| 552 | 541 | ||
| 553 | @permissions(READ_PERM) | 542 | @permissions(READ_PERM) |
| 554 | async def handle_backfill_wait(self, request): | 543 | async def handle_backfill_wait(self, request): |
| 555 | d = { | 544 | d = { |
| 556 | "tasks": self.backfill_queue.qsize(), | 545 | "tasks": self.server.backfill_queue.qsize(), |
| 557 | } | 546 | } |
| 558 | await self.backfill_queue.join() | 547 | await self.server.backfill_queue.join() |
| 559 | return d | 548 | return d |
| 560 | 549 | ||
| 561 | @permissions(DB_ADMIN_PERM) | 550 | @permissions(DB_ADMIN_PERM) |
| @@ -567,6 +556,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 567 | return {"count": await self.db.remove(condition)} | 556 | return {"count": await self.db.remove(condition)} |
| 568 | 557 | ||
| 569 | @permissions(DB_ADMIN_PERM) | 558 | @permissions(DB_ADMIN_PERM) |
| 559 | async def handle_gc_mark(self, request): | ||
| 560 | condition = request["where"] | ||
| 561 | mark = request["mark"] | ||
| 562 | |||
| 563 | if not isinstance(condition, dict): | ||
| 564 | raise TypeError("Bad condition type %s" % type(condition)) | ||
| 565 | |||
| 566 | if not isinstance(mark, str): | ||
| 567 | raise TypeError("Bad mark type %s" % type(mark)) | ||
| 568 | |||
| 569 | return {"count": await self.db.gc_mark(mark, condition)} | ||
| 570 | |||
| 571 | @permissions(DB_ADMIN_PERM) | ||
| 572 | async def handle_gc_sweep(self, request): | ||
| 573 | mark = request["mark"] | ||
| 574 | |||
| 575 | if not isinstance(mark, str): | ||
| 576 | raise TypeError("Bad mark type %s" % type(mark)) | ||
| 577 | |||
| 578 | current_mark = await self.db.get_current_gc_mark() | ||
| 579 | |||
| 580 | if not current_mark or mark != current_mark: | ||
| 581 | raise bb.asyncrpc.InvokeError( | ||
| 582 | f"'{mark}' is not the current mark. Refusing to sweep" | ||
| 583 | ) | ||
| 584 | |||
| 585 | count = await self.db.gc_sweep() | ||
| 586 | |||
| 587 | return {"count": count} | ||
| 588 | |||
| 589 | @permissions(DB_ADMIN_PERM) | ||
| 590 | async def handle_gc_status(self, request): | ||
| 591 | (keep_rows, remove_rows, current_mark) = await self.db.gc_status() | ||
| 592 | return { | ||
| 593 | "keep": keep_rows, | ||
| 594 | "remove": remove_rows, | ||
| 595 | "mark": current_mark, | ||
| 596 | } | ||
| 597 | |||
| 598 | @permissions(DB_ADMIN_PERM) | ||
| 570 | async def handle_clean_unused(self, request): | 599 | async def handle_clean_unused(self, request): |
| 571 | max_age = request["max_age_seconds"] | 600 | max_age = request["max_age_seconds"] |
| 572 | oldest = datetime.now() - timedelta(seconds=-max_age) | 601 | oldest = datetime.now() - timedelta(seconds=-max_age) |
| @@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer): | |||
| 779 | ) | 808 | ) |
| 780 | 809 | ||
| 781 | def accept_client(self, socket): | 810 | def accept_client(self, socket): |
| 782 | return ServerClient( | 811 | return ServerClient(socket, self) |
| 783 | socket, | ||
| 784 | self.db_engine, | ||
| 785 | self.request_stats, | ||
| 786 | self.backfill_queue, | ||
| 787 | self.upstream, | ||
| 788 | self.read_only, | ||
| 789 | self.anon_perms, | ||
| 790 | ) | ||
| 791 | 812 | ||
| 792 | async def create_admin_user(self): | 813 | async def create_admin_user(self): |
| 793 | admin_permissions = (ALL_PERM,) | 814 | admin_permissions = (ALL_PERM,) |
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py index cee04bffb0..89a6b86d9d 100644 --- a/bitbake/lib/hashserv/sqlalchemy.py +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
| @@ -28,6 +28,7 @@ from sqlalchemy import ( | |||
| 28 | delete, | 28 | delete, |
| 29 | update, | 29 | update, |
| 30 | func, | 30 | func, |
| 31 | inspect, | ||
| 31 | ) | 32 | ) |
| 32 | import sqlalchemy.engine | 33 | import sqlalchemy.engine |
| 33 | from sqlalchemy.orm import declarative_base | 34 | from sqlalchemy.orm import declarative_base |
| @@ -36,16 +37,17 @@ from sqlalchemy.exc import IntegrityError | |||
| 36 | Base = declarative_base() | 37 | Base = declarative_base() |
| 37 | 38 | ||
| 38 | 39 | ||
| 39 | class UnihashesV2(Base): | 40 | class UnihashesV3(Base): |
| 40 | __tablename__ = "unihashes_v2" | 41 | __tablename__ = "unihashes_v3" |
| 41 | id = Column(Integer, primary_key=True, autoincrement=True) | 42 | id = Column(Integer, primary_key=True, autoincrement=True) |
| 42 | method = Column(Text, nullable=False) | 43 | method = Column(Text, nullable=False) |
| 43 | taskhash = Column(Text, nullable=False) | 44 | taskhash = Column(Text, nullable=False) |
| 44 | unihash = Column(Text, nullable=False) | 45 | unihash = Column(Text, nullable=False) |
| 46 | gc_mark = Column(Text, nullable=False) | ||
| 45 | 47 | ||
| 46 | __table_args__ = ( | 48 | __table_args__ = ( |
| 47 | UniqueConstraint("method", "taskhash"), | 49 | UniqueConstraint("method", "taskhash"), |
| 48 | Index("taskhash_lookup_v3", "method", "taskhash"), | 50 | Index("taskhash_lookup_v4", "method", "taskhash"), |
| 49 | ) | 51 | ) |
| 50 | 52 | ||
| 51 | 53 | ||
| @@ -79,6 +81,36 @@ class Users(Base): | |||
| 79 | __table_args__ = (UniqueConstraint("username"),) | 81 | __table_args__ = (UniqueConstraint("username"),) |
| 80 | 82 | ||
| 81 | 83 | ||
| 84 | class Config(Base): | ||
| 85 | __tablename__ = "config" | ||
| 86 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
| 87 | name = Column(Text, nullable=False) | ||
| 88 | value = Column(Text) | ||
| 89 | __table_args__ = ( | ||
| 90 | UniqueConstraint("name"), | ||
| 91 | Index("config_lookup", "name"), | ||
| 92 | ) | ||
| 93 | |||
| 94 | |||
| 95 | # | ||
| 96 | # Old table versions | ||
| 97 | # | ||
| 98 | DeprecatedBase = declarative_base() | ||
| 99 | |||
| 100 | |||
| 101 | class UnihashesV2(DeprecatedBase): | ||
| 102 | __tablename__ = "unihashes_v2" | ||
| 103 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
| 104 | method = Column(Text, nullable=False) | ||
| 105 | taskhash = Column(Text, nullable=False) | ||
| 106 | unihash = Column(Text, nullable=False) | ||
| 107 | |||
| 108 | __table_args__ = ( | ||
| 109 | UniqueConstraint("method", "taskhash"), | ||
| 110 | Index("taskhash_lookup_v3", "method", "taskhash"), | ||
| 111 | ) | ||
| 112 | |||
| 113 | |||
| 82 | class DatabaseEngine(object): | 114 | class DatabaseEngine(object): |
| 83 | def __init__(self, url, username=None, password=None): | 115 | def __init__(self, url, username=None, password=None): |
| 84 | self.logger = logging.getLogger("hashserv.sqlalchemy") | 116 | self.logger = logging.getLogger("hashserv.sqlalchemy") |
| @@ -91,6 +123,9 @@ class DatabaseEngine(object): | |||
| 91 | self.url = self.url.set(password=password) | 123 | self.url = self.url.set(password=password) |
| 92 | 124 | ||
| 93 | async def create(self): | 125 | async def create(self): |
| 126 | def check_table_exists(conn, name): | ||
| 127 | return inspect(conn).has_table(name) | ||
| 128 | |||
| 94 | self.logger.info("Using database %s", self.url) | 129 | self.logger.info("Using database %s", self.url) |
| 95 | self.engine = create_async_engine(self.url, poolclass=NullPool) | 130 | self.engine = create_async_engine(self.url, poolclass=NullPool) |
| 96 | 131 | ||
| @@ -99,6 +134,24 @@ class DatabaseEngine(object): | |||
| 99 | self.logger.info("Creating tables...") | 134 | self.logger.info("Creating tables...") |
| 100 | await conn.run_sync(Base.metadata.create_all) | 135 | await conn.run_sync(Base.metadata.create_all) |
| 101 | 136 | ||
| 137 | if await conn.run_sync(check_table_exists, UnihashesV2.__tablename__): | ||
| 138 | self.logger.info("Upgrading Unihashes V2 -> V3...") | ||
| 139 | statement = insert(UnihashesV3).from_select( | ||
| 140 | ["id", "method", "unihash", "taskhash", "gc_mark"], | ||
| 141 | select( | ||
| 142 | UnihashesV2.id, | ||
| 143 | UnihashesV2.method, | ||
| 144 | UnihashesV2.unihash, | ||
| 145 | UnihashesV2.taskhash, | ||
| 146 | literal("").label("gc_mark"), | ||
| 147 | ), | ||
| 148 | ) | ||
| 149 | self.logger.debug("%s", statement) | ||
| 150 | await conn.execute(statement) | ||
| 151 | |||
| 152 | await conn.run_sync(Base.metadata.drop_all, [UnihashesV2.__table__]) | ||
| 153 | self.logger.info("Upgrade complete") | ||
| 154 | |||
| 102 | def connect(self, logger): | 155 | def connect(self, logger): |
| 103 | return Database(self.engine, logger) | 156 | return Database(self.engine, logger) |
| 104 | 157 | ||
| @@ -118,6 +171,15 @@ def map_user(row): | |||
| 118 | ) | 171 | ) |
| 119 | 172 | ||
| 120 | 173 | ||
| 174 | def _make_condition_statement(table, condition): | ||
| 175 | where = {} | ||
| 176 | for c in table.__table__.columns: | ||
| 177 | if c.key in condition and condition[c.key] is not None: | ||
| 178 | where[c] = condition[c.key] | ||
| 179 | |||
| 180 | return [(k == v) for k, v in where.items()] | ||
| 181 | |||
| 182 | |||
| 121 | class Database(object): | 183 | class Database(object): |
| 122 | def __init__(self, engine, logger): | 184 | def __init__(self, engine, logger): |
| 123 | self.engine = engine | 185 | self.engine = engine |
| @@ -135,17 +197,52 @@ class Database(object): | |||
| 135 | await self.db.close() | 197 | await self.db.close() |
| 136 | self.db = None | 198 | self.db = None |
| 137 | 199 | ||
| 200 | async def _execute(self, statement): | ||
| 201 | self.logger.debug("%s", statement) | ||
| 202 | return await self.db.execute(statement) | ||
| 203 | |||
| 204 | async def _set_config(self, name, value): | ||
| 205 | while True: | ||
| 206 | result = await self._execute( | ||
| 207 | update(Config).where(Config.name == name).values(value=value) | ||
| 208 | ) | ||
| 209 | |||
| 210 | if result.rowcount == 0: | ||
| 211 | self.logger.debug("Config '%s' not found. Adding it", name) | ||
| 212 | try: | ||
| 213 | await self._execute(insert(Config).values(name=name, value=value)) | ||
| 214 | except IntegrityError: | ||
| 215 | # Race. Try again | ||
| 216 | continue | ||
| 217 | |||
| 218 | break | ||
| 219 | |||
| 220 | def _get_config_subquery(self, name, default=None): | ||
| 221 | if default is not None: | ||
| 222 | return func.coalesce( | ||
| 223 | select(Config.value).where(Config.name == name).scalar_subquery(), | ||
| 224 | default, | ||
| 225 | ) | ||
| 226 | return select(Config.value).where(Config.name == name).scalar_subquery() | ||
| 227 | |||
| 228 | async def _get_config(self, name): | ||
| 229 | result = await self._execute(select(Config.value).where(Config.name == name)) | ||
| 230 | row = result.first() | ||
| 231 | if row is None: | ||
| 232 | return None | ||
| 233 | return row.value | ||
| 234 | |||
| 138 | async def get_unihash_by_taskhash_full(self, method, taskhash): | 235 | async def get_unihash_by_taskhash_full(self, method, taskhash): |
| 139 | statement = ( | 236 | statement = ( |
| 140 | select( | 237 | select( |
| 141 | OuthashesV2, | 238 | OuthashesV2, |
| 142 | UnihashesV2.unihash.label("unihash"), | 239 | UnihashesV3.unihash.label("unihash"), |
| 143 | ) | 240 | ) |
| 144 | .join( | 241 | .join( |
| 145 | UnihashesV2, | 242 | UnihashesV3, |
| 146 | and_( | 243 | and_( |
| 147 | UnihashesV2.method == OuthashesV2.method, | 244 | UnihashesV3.method == OuthashesV2.method, |
| 148 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 245 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 149 | ), | 246 | ), |
| 150 | ) | 247 | ) |
| 151 | .where( | 248 | .where( |
| @@ -164,12 +261,12 @@ class Database(object): | |||
| 164 | 261 | ||
| 165 | async def get_unihash_by_outhash(self, method, outhash): | 262 | async def get_unihash_by_outhash(self, method, outhash): |
| 166 | statement = ( | 263 | statement = ( |
| 167 | select(OuthashesV2, UnihashesV2.unihash.label("unihash")) | 264 | select(OuthashesV2, UnihashesV3.unihash.label("unihash")) |
| 168 | .join( | 265 | .join( |
| 169 | UnihashesV2, | 266 | UnihashesV3, |
| 170 | and_( | 267 | and_( |
| 171 | UnihashesV2.method == OuthashesV2.method, | 268 | UnihashesV3.method == OuthashesV2.method, |
| 172 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 269 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 173 | ), | 270 | ), |
| 174 | ) | 271 | ) |
| 175 | .where( | 272 | .where( |
| @@ -208,13 +305,13 @@ class Database(object): | |||
| 208 | statement = ( | 305 | statement = ( |
| 209 | select( | 306 | select( |
| 210 | OuthashesV2.taskhash.label("taskhash"), | 307 | OuthashesV2.taskhash.label("taskhash"), |
| 211 | UnihashesV2.unihash.label("unihash"), | 308 | UnihashesV3.unihash.label("unihash"), |
| 212 | ) | 309 | ) |
| 213 | .join( | 310 | .join( |
| 214 | UnihashesV2, | 311 | UnihashesV3, |
| 215 | and_( | 312 | and_( |
| 216 | UnihashesV2.method == OuthashesV2.method, | 313 | UnihashesV3.method == OuthashesV2.method, |
| 217 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 314 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 218 | ), | 315 | ), |
| 219 | ) | 316 | ) |
| 220 | .where( | 317 | .where( |
| @@ -234,12 +331,12 @@ class Database(object): | |||
| 234 | 331 | ||
| 235 | async def get_equivalent(self, method, taskhash): | 332 | async def get_equivalent(self, method, taskhash): |
| 236 | statement = select( | 333 | statement = select( |
| 237 | UnihashesV2.unihash, | 334 | UnihashesV3.unihash, |
| 238 | UnihashesV2.method, | 335 | UnihashesV3.method, |
| 239 | UnihashesV2.taskhash, | 336 | UnihashesV3.taskhash, |
| 240 | ).where( | 337 | ).where( |
| 241 | UnihashesV2.method == method, | 338 | UnihashesV3.method == method, |
| 242 | UnihashesV2.taskhash == taskhash, | 339 | UnihashesV3.taskhash == taskhash, |
| 243 | ) | 340 | ) |
| 244 | self.logger.debug("%s", statement) | 341 | self.logger.debug("%s", statement) |
| 245 | async with self.db.begin(): | 342 | async with self.db.begin(): |
| @@ -248,13 +345,9 @@ class Database(object): | |||
| 248 | 345 | ||
| 249 | async def remove(self, condition): | 346 | async def remove(self, condition): |
| 250 | async def do_remove(table): | 347 | async def do_remove(table): |
| 251 | where = {} | 348 | where = _make_condition_statement(table, condition) |
| 252 | for c in table.__table__.columns: | ||
| 253 | if c.key in condition and condition[c.key] is not None: | ||
| 254 | where[c] = condition[c.key] | ||
| 255 | |||
| 256 | if where: | 349 | if where: |
| 257 | statement = delete(table).where(*[(k == v) for k, v in where.items()]) | 350 | statement = delete(table).where(*where) |
| 258 | self.logger.debug("%s", statement) | 351 | self.logger.debug("%s", statement) |
| 259 | async with self.db.begin(): | 352 | async with self.db.begin(): |
| 260 | result = await self.db.execute(statement) | 353 | result = await self.db.execute(statement) |
| @@ -263,19 +356,74 @@ class Database(object): | |||
| 263 | return 0 | 356 | return 0 |
| 264 | 357 | ||
| 265 | count = 0 | 358 | count = 0 |
| 266 | count += await do_remove(UnihashesV2) | 359 | count += await do_remove(UnihashesV3) |
| 267 | count += await do_remove(OuthashesV2) | 360 | count += await do_remove(OuthashesV2) |
| 268 | 361 | ||
| 269 | return count | 362 | return count |
| 270 | 363 | ||
| 364 | async def get_current_gc_mark(self): | ||
| 365 | async with self.db.begin(): | ||
| 366 | return await self._get_config("gc-mark") | ||
| 367 | |||
| 368 | async def gc_status(self): | ||
| 369 | async with self.db.begin(): | ||
| 370 | gc_mark_subquery = self._get_config_subquery("gc-mark", "") | ||
| 371 | |||
| 372 | result = await self._execute( | ||
| 373 | select(func.count()) | ||
| 374 | .select_from(UnihashesV3) | ||
| 375 | .where(UnihashesV3.gc_mark == gc_mark_subquery) | ||
| 376 | ) | ||
| 377 | keep_rows = result.scalar() | ||
| 378 | |||
| 379 | result = await self._execute( | ||
| 380 | select(func.count()) | ||
| 381 | .select_from(UnihashesV3) | ||
| 382 | .where(UnihashesV3.gc_mark != gc_mark_subquery) | ||
| 383 | ) | ||
| 384 | remove_rows = result.scalar() | ||
| 385 | |||
| 386 | return (keep_rows, remove_rows, await self._get_config("gc-mark")) | ||
| 387 | |||
| 388 | async def gc_mark(self, mark, condition): | ||
| 389 | async with self.db.begin(): | ||
| 390 | await self._set_config("gc-mark", mark) | ||
| 391 | |||
| 392 | where = _make_condition_statement(UnihashesV3, condition) | ||
| 393 | if not where: | ||
| 394 | return 0 | ||
| 395 | |||
| 396 | result = await self._execute( | ||
| 397 | update(UnihashesV3) | ||
| 398 | .values(gc_mark=self._get_config_subquery("gc-mark", "")) | ||
| 399 | .where(*where) | ||
| 400 | ) | ||
| 401 | return result.rowcount | ||
| 402 | |||
| 403 | async def gc_sweep(self): | ||
| 404 | async with self.db.begin(): | ||
| 405 | result = await self._execute( | ||
| 406 | delete(UnihashesV3).where( | ||
| 407 | # A sneaky conditional that provides some errant use | ||
| 408 | # protection: If the config mark is NULL, this will not | ||
| 409 | # match any rows because No default is specified in the | ||
| 410 | # select statement | ||
| 411 | UnihashesV3.gc_mark | ||
| 412 | != self._get_config_subquery("gc-mark") | ||
| 413 | ) | ||
| 414 | ) | ||
| 415 | await self._set_config("gc-mark", None) | ||
| 416 | |||
| 417 | return result.rowcount | ||
| 418 | |||
| 271 | async def clean_unused(self, oldest): | 419 | async def clean_unused(self, oldest): |
| 272 | statement = delete(OuthashesV2).where( | 420 | statement = delete(OuthashesV2).where( |
| 273 | OuthashesV2.created < oldest, | 421 | OuthashesV2.created < oldest, |
| 274 | ~( | 422 | ~( |
| 275 | select(UnihashesV2.id) | 423 | select(UnihashesV3.id) |
| 276 | .where( | 424 | .where( |
| 277 | UnihashesV2.method == OuthashesV2.method, | 425 | UnihashesV3.method == OuthashesV2.method, |
| 278 | UnihashesV2.taskhash == OuthashesV2.taskhash, | 426 | UnihashesV3.taskhash == OuthashesV2.taskhash, |
| 279 | ) | 427 | ) |
| 280 | .limit(1) | 428 | .limit(1) |
| 281 | .exists() | 429 | .exists() |
| @@ -287,15 +435,17 @@ class Database(object): | |||
| 287 | return result.rowcount | 435 | return result.rowcount |
| 288 | 436 | ||
| 289 | async def insert_unihash(self, method, taskhash, unihash): | 437 | async def insert_unihash(self, method, taskhash, unihash): |
| 290 | statement = insert(UnihashesV2).values( | ||
| 291 | method=method, | ||
| 292 | taskhash=taskhash, | ||
| 293 | unihash=unihash, | ||
| 294 | ) | ||
| 295 | self.logger.debug("%s", statement) | ||
| 296 | try: | 438 | try: |
| 297 | async with self.db.begin(): | 439 | async with self.db.begin(): |
| 298 | await self.db.execute(statement) | 440 | await self._execute( |
| 441 | insert(UnihashesV3).values( | ||
| 442 | method=method, | ||
| 443 | taskhash=taskhash, | ||
| 444 | unihash=unihash, | ||
| 445 | gc_mark=self._get_config_subquery("gc-mark", ""), | ||
| 446 | ) | ||
| 447 | ) | ||
| 448 | |||
| 299 | return True | 449 | return True |
| 300 | except IntegrityError: | 450 | except IntegrityError: |
| 301 | self.logger.debug( | 451 | self.logger.debug( |
| @@ -418,7 +568,7 @@ class Database(object): | |||
| 418 | 568 | ||
| 419 | async def get_query_columns(self): | 569 | async def get_query_columns(self): |
| 420 | columns = set() | 570 | columns = set() |
| 421 | for table in (UnihashesV2, OuthashesV2): | 571 | for table in (UnihashesV3, OuthashesV2): |
| 422 | for c in table.__table__.columns: | 572 | for c in table.__table__.columns: |
| 423 | if not isinstance(c.type, Text): | 573 | if not isinstance(c.type, Text): |
| 424 | continue | 574 | continue |
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py index f93cb2c1dd..608490730d 100644 --- a/bitbake/lib/hashserv/sqlite.py +++ b/bitbake/lib/hashserv/sqlite.py | |||
| @@ -15,6 +15,7 @@ UNIHASH_TABLE_DEFINITION = ( | |||
| 15 | ("method", "TEXT NOT NULL", "UNIQUE"), | 15 | ("method", "TEXT NOT NULL", "UNIQUE"), |
| 16 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | 16 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
| 17 | ("unihash", "TEXT NOT NULL", ""), | 17 | ("unihash", "TEXT NOT NULL", ""), |
| 18 | ("gc_mark", "TEXT NOT NULL", ""), | ||
| 18 | ) | 19 | ) |
| 19 | 20 | ||
| 20 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) | 21 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) |
| @@ -44,6 +45,14 @@ USERS_TABLE_DEFINITION = ( | |||
| 44 | USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) | 45 | USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) |
| 45 | 46 | ||
| 46 | 47 | ||
| 48 | CONFIG_TABLE_DEFINITION = ( | ||
| 49 | ("name", "TEXT NOT NULL", "UNIQUE"), | ||
| 50 | ("value", "TEXT", ""), | ||
| 51 | ) | ||
| 52 | |||
| 53 | CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) | ||
| 54 | |||
| 55 | |||
| 47 | def _make_table(cursor, name, definition): | 56 | def _make_table(cursor, name, definition): |
| 48 | cursor.execute( | 57 | cursor.execute( |
| 49 | """ | 58 | """ |
| @@ -71,6 +80,35 @@ def map_user(row): | |||
| 71 | ) | 80 | ) |
| 72 | 81 | ||
| 73 | 82 | ||
| 83 | def _make_condition_statement(columns, condition): | ||
| 84 | where = {} | ||
| 85 | for c in columns: | ||
| 86 | if c in condition and condition[c] is not None: | ||
| 87 | where[c] = condition[c] | ||
| 88 | |||
| 89 | return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys()) | ||
| 90 | |||
| 91 | |||
| 92 | def _get_sqlite_version(cursor): | ||
| 93 | cursor.execute("SELECT sqlite_version()") | ||
| 94 | |||
| 95 | version = [] | ||
| 96 | for v in cursor.fetchone()[0].split("."): | ||
| 97 | try: | ||
| 98 | version.append(int(v)) | ||
| 99 | except ValueError: | ||
| 100 | version.append(v) | ||
| 101 | |||
| 102 | return tuple(version) | ||
| 103 | |||
| 104 | |||
| 105 | def _schema_table_name(version): | ||
| 106 | if version >= (3, 33): | ||
| 107 | return "sqlite_schema" | ||
| 108 | |||
| 109 | return "sqlite_master" | ||
| 110 | |||
| 111 | |||
| 74 | class DatabaseEngine(object): | 112 | class DatabaseEngine(object): |
| 75 | def __init__(self, dbname, sync): | 113 | def __init__(self, dbname, sync): |
| 76 | self.dbname = dbname | 114 | self.dbname = dbname |
| @@ -82,9 +120,10 @@ class DatabaseEngine(object): | |||
| 82 | db.row_factory = sqlite3.Row | 120 | db.row_factory = sqlite3.Row |
| 83 | 121 | ||
| 84 | with closing(db.cursor()) as cursor: | 122 | with closing(db.cursor()) as cursor: |
| 85 | _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) | 123 | _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION) |
| 86 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) | 124 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) |
| 87 | _make_table(cursor, "users", USERS_TABLE_DEFINITION) | 125 | _make_table(cursor, "users", USERS_TABLE_DEFINITION) |
| 126 | _make_table(cursor, "config", CONFIG_TABLE_DEFINITION) | ||
| 88 | 127 | ||
| 89 | cursor.execute("PRAGMA journal_mode = WAL") | 128 | cursor.execute("PRAGMA journal_mode = WAL") |
| 90 | cursor.execute( | 129 | cursor.execute( |
| @@ -96,17 +135,38 @@ class DatabaseEngine(object): | |||
| 96 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup") | 135 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup") |
| 97 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") | 136 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") |
| 98 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") | 137 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") |
| 138 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3") | ||
| 99 | 139 | ||
| 100 | # TODO: Upgrade from tasks_v2? | 140 | # TODO: Upgrade from tasks_v2? |
| 101 | cursor.execute("DROP TABLE IF EXISTS tasks_v2") | 141 | cursor.execute("DROP TABLE IF EXISTS tasks_v2") |
| 102 | 142 | ||
| 103 | # Create new indexes | 143 | # Create new indexes |
| 104 | cursor.execute( | 144 | cursor.execute( |
| 105 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" | 145 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" |
| 106 | ) | 146 | ) |
| 107 | cursor.execute( | 147 | cursor.execute( |
| 108 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" | 148 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" |
| 109 | ) | 149 | ) |
| 150 | cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") | ||
| 151 | |||
| 152 | sqlite_version = _get_sqlite_version(cursor) | ||
| 153 | |||
| 154 | cursor.execute( | ||
| 155 | f""" | ||
| 156 | SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2' | ||
| 157 | """ | ||
| 158 | ) | ||
| 159 | if cursor.fetchone(): | ||
| 160 | self.logger.info("Upgrading Unihashes V2 -> V3...") | ||
| 161 | cursor.execute( | ||
| 162 | """ | ||
| 163 | INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark) | ||
| 164 | SELECT id, method, unihash, taskhash, '' FROM unihashes_v2 | ||
| 165 | """ | ||
| 166 | ) | ||
| 167 | cursor.execute("DROP TABLE unihashes_v2") | ||
| 168 | db.commit() | ||
| 169 | self.logger.info("Upgrade complete") | ||
| 110 | 170 | ||
| 111 | def connect(self, logger): | 171 | def connect(self, logger): |
| 112 | return Database(logger, self.dbname, self.sync) | 172 | return Database(logger, self.dbname, self.sync) |
| @@ -126,16 +186,7 @@ class Database(object): | |||
| 126 | "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") | 186 | "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") |
| 127 | ) | 187 | ) |
| 128 | 188 | ||
| 129 | cursor.execute("SELECT sqlite_version()") | 189 | self.sqlite_version = _get_sqlite_version(cursor) |
| 130 | |||
| 131 | version = [] | ||
| 132 | for v in cursor.fetchone()[0].split("."): | ||
| 133 | try: | ||
| 134 | version.append(int(v)) | ||
| 135 | except ValueError: | ||
| 136 | version.append(v) | ||
| 137 | |||
| 138 | self.sqlite_version = tuple(version) | ||
| 139 | 190 | ||
| 140 | async def __aenter__(self): | 191 | async def __aenter__(self): |
| 141 | return self | 192 | return self |
| @@ -143,6 +194,30 @@ class Database(object): | |||
| 143 | async def __aexit__(self, exc_type, exc_value, traceback): | 194 | async def __aexit__(self, exc_type, exc_value, traceback): |
| 144 | await self.close() | 195 | await self.close() |
| 145 | 196 | ||
| 197 | async def _set_config(self, cursor, name, value): | ||
| 198 | cursor.execute( | ||
| 199 | """ | ||
| 200 | INSERT OR REPLACE INTO config (id, name, value) VALUES | ||
| 201 | ((SELECT id FROM config WHERE name=:name), :name, :value) | ||
| 202 | """, | ||
| 203 | { | ||
| 204 | "name": name, | ||
| 205 | "value": value, | ||
| 206 | }, | ||
| 207 | ) | ||
| 208 | |||
| 209 | async def _get_config(self, cursor, name): | ||
| 210 | cursor.execute( | ||
| 211 | "SELECT value FROM config WHERE name=:name", | ||
| 212 | { | ||
| 213 | "name": name, | ||
| 214 | }, | ||
| 215 | ) | ||
| 216 | row = cursor.fetchone() | ||
| 217 | if row is None: | ||
| 218 | return None | ||
| 219 | return row["value"] | ||
| 220 | |||
| 146 | async def close(self): | 221 | async def close(self): |
| 147 | self.db.close() | 222 | self.db.close() |
| 148 | 223 | ||
| @@ -150,8 +225,8 @@ class Database(object): | |||
| 150 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
| 151 | cursor.execute( | 226 | cursor.execute( |
| 152 | """ | 227 | """ |
| 153 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | 228 | SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 |
| 154 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | 229 | INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash |
| 155 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | 230 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash |
| 156 | ORDER BY outhashes_v2.created ASC | 231 | ORDER BY outhashes_v2.created ASC |
| 157 | LIMIT 1 | 232 | LIMIT 1 |
| @@ -167,8 +242,8 @@ class Database(object): | |||
| 167 | with closing(self.db.cursor()) as cursor: | 242 | with closing(self.db.cursor()) as cursor: |
| 168 | cursor.execute( | 243 | cursor.execute( |
| 169 | """ | 244 | """ |
| 170 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | 245 | SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 |
| 171 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | 246 | INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash |
| 172 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | 247 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash |
| 173 | ORDER BY outhashes_v2.created ASC | 248 | ORDER BY outhashes_v2.created ASC |
| 174 | LIMIT 1 | 249 | LIMIT 1 |
| @@ -200,8 +275,8 @@ class Database(object): | |||
| 200 | with closing(self.db.cursor()) as cursor: | 275 | with closing(self.db.cursor()) as cursor: |
| 201 | cursor.execute( | 276 | cursor.execute( |
| 202 | """ | 277 | """ |
| 203 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | 278 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2 |
| 204 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | 279 | INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash |
| 205 | -- Select any matching output hash except the one we just inserted | 280 | -- Select any matching output hash except the one we just inserted |
| 206 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | 281 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash |
| 207 | -- Pick the oldest hash | 282 | -- Pick the oldest hash |
| @@ -219,7 +294,7 @@ class Database(object): | |||
| 219 | async def get_equivalent(self, method, taskhash): | 294 | async def get_equivalent(self, method, taskhash): |
| 220 | with closing(self.db.cursor()) as cursor: | 295 | with closing(self.db.cursor()) as cursor: |
| 221 | cursor.execute( | 296 | cursor.execute( |
| 222 | "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", | 297 | "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash", |
| 223 | { | 298 | { |
| 224 | "method": method, | 299 | "method": method, |
| 225 | "taskhash": taskhash, | 300 | "taskhash": taskhash, |
| @@ -229,15 +304,9 @@ class Database(object): | |||
| 229 | 304 | ||
| 230 | async def remove(self, condition): | 305 | async def remove(self, condition): |
| 231 | def do_remove(columns, table_name, cursor): | 306 | def do_remove(columns, table_name, cursor): |
| 232 | where = {} | 307 | where, clause = _make_condition_statement(columns, condition) |
| 233 | for c in columns: | ||
| 234 | if c in condition and condition[c] is not None: | ||
| 235 | where[c] = condition[c] | ||
| 236 | |||
| 237 | if where: | 308 | if where: |
| 238 | query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( | 309 | query = f"DELETE FROM {table_name} WHERE {clause}" |
| 239 | "%s=:%s" % (k, k) for k in where.keys() | ||
| 240 | ) | ||
| 241 | cursor.execute(query, where) | 310 | cursor.execute(query, where) |
| 242 | return cursor.rowcount | 311 | return cursor.rowcount |
| 243 | 312 | ||
| @@ -246,17 +315,80 @@ class Database(object): | |||
| 246 | count = 0 | 315 | count = 0 |
| 247 | with closing(self.db.cursor()) as cursor: | 316 | with closing(self.db.cursor()) as cursor: |
| 248 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) | 317 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) |
| 249 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 318 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor) |
| 250 | self.db.commit() | 319 | self.db.commit() |
| 251 | 320 | ||
| 252 | return count | 321 | return count |
| 253 | 322 | ||
| 323 | async def get_current_gc_mark(self): | ||
| 324 | with closing(self.db.cursor()) as cursor: | ||
| 325 | return await self._get_config(cursor, "gc-mark") | ||
| 326 | |||
| 327 | async def gc_status(self): | ||
| 328 | with closing(self.db.cursor()) as cursor: | ||
| 329 | cursor.execute( | ||
| 330 | """ | ||
| 331 | SELECT COUNT() FROM unihashes_v3 WHERE | ||
| 332 | gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
| 333 | """ | ||
| 334 | ) | ||
| 335 | keep_rows = cursor.fetchone()[0] | ||
| 336 | |||
| 337 | cursor.execute( | ||
| 338 | """ | ||
| 339 | SELECT COUNT() FROM unihashes_v3 WHERE | ||
| 340 | gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
| 341 | """ | ||
| 342 | ) | ||
| 343 | remove_rows = cursor.fetchone()[0] | ||
| 344 | |||
| 345 | current_mark = await self._get_config(cursor, "gc-mark") | ||
| 346 | |||
| 347 | return (keep_rows, remove_rows, current_mark) | ||
| 348 | |||
| 349 | async def gc_mark(self, mark, condition): | ||
| 350 | with closing(self.db.cursor()) as cursor: | ||
| 351 | await self._set_config(cursor, "gc-mark", mark) | ||
| 352 | |||
| 353 | where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition) | ||
| 354 | |||
| 355 | new_rows = 0 | ||
| 356 | if where: | ||
| 357 | cursor.execute( | ||
| 358 | f""" | ||
| 359 | UPDATE unihashes_v3 SET | ||
| 360 | gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
| 361 | WHERE {clause} | ||
| 362 | """, | ||
| 363 | where, | ||
| 364 | ) | ||
| 365 | new_rows = cursor.rowcount | ||
| 366 | |||
| 367 | self.db.commit() | ||
| 368 | return new_rows | ||
| 369 | |||
| 370 | async def gc_sweep(self): | ||
| 371 | with closing(self.db.cursor()) as cursor: | ||
| 372 | # NOTE: COALESCE is not used in this query so that if the current | ||
| 373 | # mark is NULL, nothing will happen | ||
| 374 | cursor.execute( | ||
| 375 | """ | ||
| 376 | DELETE FROM unihashes_v3 WHERE | ||
| 377 | gc_mark!=(SELECT value FROM config WHERE name='gc-mark') | ||
| 378 | """ | ||
| 379 | ) | ||
| 380 | count = cursor.rowcount | ||
| 381 | await self._set_config(cursor, "gc-mark", None) | ||
| 382 | |||
| 383 | self.db.commit() | ||
| 384 | return count | ||
| 385 | |||
| 254 | async def clean_unused(self, oldest): | 386 | async def clean_unused(self, oldest): |
| 255 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
| 256 | cursor.execute( | 388 | cursor.execute( |
| 257 | """ | 389 | """ |
| 258 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( | 390 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( |
| 259 | SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 | 391 | SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1 |
| 260 | ) | 392 | ) |
| 261 | """, | 393 | """, |
| 262 | { | 394 | { |
| @@ -271,7 +403,13 @@ class Database(object): | |||
| 271 | prevrowid = cursor.lastrowid | 403 | prevrowid = cursor.lastrowid |
| 272 | cursor.execute( | 404 | cursor.execute( |
| 273 | """ | 405 | """ |
| 274 | INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) | 406 | INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES |
| 407 | ( | ||
| 408 | :method, | ||
| 409 | :taskhash, | ||
| 410 | :unihash, | ||
| 411 | COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') | ||
| 412 | ) | ||
| 275 | """, | 413 | """, |
| 276 | { | 414 | { |
| 277 | "method": method, | 415 | "method": method, |
| @@ -383,14 +521,9 @@ class Database(object): | |||
| 383 | async def get_usage(self): | 521 | async def get_usage(self): |
| 384 | usage = {} | 522 | usage = {} |
| 385 | with closing(self.db.cursor()) as cursor: | 523 | with closing(self.db.cursor()) as cursor: |
| 386 | if self.sqlite_version >= (3, 33): | ||
| 387 | table_name = "sqlite_schema" | ||
| 388 | else: | ||
| 389 | table_name = "sqlite_master" | ||
| 390 | |||
| 391 | cursor.execute( | 524 | cursor.execute( |
| 392 | f""" | 525 | f""" |
| 393 | SELECT name FROM {table_name} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' | 526 | SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' |
| 394 | """ | 527 | """ |
| 395 | ) | 528 | ) |
| 396 | for row in cursor.fetchall(): | 529 | for row in cursor.fetchall(): |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 869f7636c5..aeedab3575 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -810,6 +810,27 @@ class HashEquivalenceCommonTests(object): | |||
| 810 | with self.auth_perms("@user-admin") as client: | 810 | with self.auth_perms("@user-admin") as client: |
| 811 | become = client.become_user(client.username) | 811 | become = client.become_user(client.username) |
| 812 | 812 | ||
| 813 | def test_auth_gc(self): | ||
| 814 | admin_client = self.start_auth_server() | ||
| 815 | |||
| 816 | with self.auth_perms() as client, self.assertRaises(InvokeError): | ||
| 817 | client.gc_mark("ABC", {"unihash": "123"}) | ||
| 818 | |||
| 819 | with self.auth_perms() as client, self.assertRaises(InvokeError): | ||
| 820 | client.gc_status() | ||
| 821 | |||
| 822 | with self.auth_perms() as client, self.assertRaises(InvokeError): | ||
| 823 | client.gc_sweep("ABC") | ||
| 824 | |||
| 825 | with self.auth_perms("@db-admin") as client: | ||
| 826 | client.gc_mark("ABC", {"unihash": "123"}) | ||
| 827 | |||
| 828 | with self.auth_perms("@db-admin") as client: | ||
| 829 | client.gc_status() | ||
| 830 | |||
| 831 | with self.auth_perms("@db-admin") as client: | ||
| 832 | client.gc_sweep("ABC") | ||
| 833 | |||
| 813 | def test_get_db_usage(self): | 834 | def test_get_db_usage(self): |
| 814 | usage = self.client.get_db_usage() | 835 | usage = self.client.get_db_usage() |
| 815 | 836 | ||
| @@ -837,6 +858,147 @@ class HashEquivalenceCommonTests(object): | |||
| 837 | data = client.get_taskhash(self.METHOD, taskhash, True) | 858 | data = client.get_taskhash(self.METHOD, taskhash, True) |
| 838 | self.assertEqual(data["owner"], user["username"]) | 859 | self.assertEqual(data["owner"], user["username"]) |
| 839 | 860 | ||
| 861 | def test_gc(self): | ||
| 862 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
| 863 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
| 864 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
| 865 | |||
| 866 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 867 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
| 868 | |||
| 869 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
| 870 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
| 871 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
| 872 | |||
| 873 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
| 874 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 875 | |||
| 876 | # Mark the first unihash to be kept | ||
| 877 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
| 878 | self.assertEqual(ret, {"count": 1}) | ||
| 879 | |||
| 880 | ret = self.client.gc_status() | ||
| 881 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) | ||
| 882 | |||
| 883 | # Second hash is still there; mark doesn't delete hashes | ||
| 884 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 885 | |||
| 886 | ret = self.client.gc_sweep("ABC") | ||
| 887 | self.assertEqual(ret, {"count": 1}) | ||
| 888 | |||
| 889 | # Hash is gone. Taskhash is returned for second hash | ||
| 890 | self.assertClientGetHash(self.client, taskhash2, None) | ||
| 891 | # First hash is still present | ||
| 892 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
| 893 | |||
| 894 | def test_gc_switch_mark(self): | ||
| 895 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
| 896 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
| 897 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
| 898 | |||
| 899 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 900 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
| 901 | |||
| 902 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
| 903 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
| 904 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
| 905 | |||
| 906 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
| 907 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 908 | |||
| 909 | # Mark the first unihash to be kept | ||
| 910 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
| 911 | self.assertEqual(ret, {"count": 1}) | ||
| 912 | |||
| 913 | ret = self.client.gc_status() | ||
| 914 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) | ||
| 915 | |||
| 916 | # Second hash is still there; mark doesn't delete hashes | ||
| 917 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 918 | |||
| 919 | # Switch to a different mark and mark the second hash. This will start | ||
| 920 | # a new collection cycle | ||
| 921 | ret = self.client.gc_mark("DEF", {"unihash": unihash2, "method": self.METHOD}) | ||
| 922 | self.assertEqual(ret, {"count": 1}) | ||
| 923 | |||
| 924 | ret = self.client.gc_status() | ||
| 925 | self.assertEqual(ret, {"mark": "DEF", "keep": 1, "remove": 1}) | ||
| 926 | |||
| 927 | # Both hashes are still present | ||
| 928 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 929 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
| 930 | |||
| 931 | # Sweep with the new mark | ||
| 932 | ret = self.client.gc_sweep("DEF") | ||
| 933 | self.assertEqual(ret, {"count": 1}) | ||
| 934 | |||
| 935 | # First hash is gone, second is kept | ||
| 936 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 937 | self.assertClientGetHash(self.client, taskhash, None) | ||
| 938 | |||
| 939 | def test_gc_switch_sweep_mark(self): | ||
| 940 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
| 941 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
| 942 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
| 943 | |||
| 944 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 945 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
| 946 | |||
| 947 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
| 948 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
| 949 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
| 950 | |||
| 951 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
| 952 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 953 | |||
| 954 | # Mark the first unihash to be kept | ||
| 955 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
| 956 | self.assertEqual(ret, {"count": 1}) | ||
| 957 | |||
| 958 | ret = self.client.gc_status() | ||
| 959 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 1}) | ||
| 960 | |||
| 961 | # Sweeping with a different mark raises an error | ||
| 962 | with self.assertRaises(InvokeError): | ||
| 963 | self.client.gc_sweep("DEF") | ||
| 964 | |||
| 965 | # Both hashes are present | ||
| 966 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 967 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
| 968 | |||
| 969 | def test_gc_new_hashes(self): | ||
| 970 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
| 971 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
| 972 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
| 973 | |||
| 974 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 975 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
| 976 | |||
| 977 | # Start a new garbage collection | ||
| 978 | ret = self.client.gc_mark("ABC", {"unihash": unihash, "method": self.METHOD}) | ||
| 979 | self.assertEqual(ret, {"count": 1}) | ||
| 980 | |||
| 981 | ret = self.client.gc_status() | ||
| 982 | self.assertEqual(ret, {"mark": "ABC", "keep": 1, "remove": 0}) | ||
| 983 | |||
| 984 | # Add second hash. It should inherit the mark from the current garbage | ||
| 985 | # collection operation | ||
| 986 | |||
| 987 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
| 988 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
| 989 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
| 990 | |||
| 991 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
| 992 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 993 | |||
| 994 | # Sweep should remove nothing | ||
| 995 | ret = self.client.gc_sweep("ABC") | ||
| 996 | self.assertEqual(ret, {"count": 0}) | ||
| 997 | |||
| 998 | # Both hashes are present | ||
| 999 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 1000 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
| 1001 | |||
| 840 | 1002 | ||
| 841 | class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): | 1003 | class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): |
| 842 | def get_server_addr(self, server_idx): | 1004 | def get_server_addr(self, server_idx): |
| @@ -1086,6 +1248,42 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase): | |||
| 1086 | "get-db-query-columns", | 1248 | "get-db-query-columns", |
| 1087 | ], check=True) | 1249 | ], check=True) |
| 1088 | 1250 | ||
| 1251 | def test_gc(self): | ||
| 1252 | taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' | ||
| 1253 | outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' | ||
| 1254 | unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' | ||
| 1255 | |||
| 1256 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
| 1257 | self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') | ||
| 1258 | |||
| 1259 | taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' | ||
| 1260 | outhash2 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' | ||
| 1261 | unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' | ||
| 1262 | |||
| 1263 | result = self.client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) | ||
| 1264 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 1265 | |||
| 1266 | # Mark the first unihash to be kept | ||
| 1267 | self.run_hashclient([ | ||
| 1268 | "--address", self.server_address, | ||
| 1269 | "gc-mark", "ABC", | ||
| 1270 | "--where", "unihash", unihash, | ||
| 1271 | "--where", "method", self.METHOD | ||
| 1272 | ], check=True) | ||
| 1273 | |||
| 1274 | # Second hash is still there; mark doesn't delete hashes | ||
| 1275 | self.assertClientGetHash(self.client, taskhash2, unihash2) | ||
| 1276 | |||
| 1277 | self.run_hashclient([ | ||
| 1278 | "--address", self.server_address, | ||
| 1279 | "gc-sweep", "ABC", | ||
| 1280 | ], check=True) | ||
| 1281 | |||
| 1282 | # Hash is gone. Taskhash is returned for second hash | ||
| 1283 | self.assertClientGetHash(self.client, taskhash2, None) | ||
| 1284 | # First hash is still present | ||
| 1285 | self.assertClientGetHash(self.client, taskhash, unihash) | ||
| 1286 | |||
| 1089 | 1287 | ||
| 1090 | class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | 1288 | class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): |
| 1091 | def get_server_addr(self, server_idx): | 1289 | def get_server_addr(self, server_idx): |
