diff options
| author | Joshua Watt <JPEWhacker@gmail.com> | 2024-02-18 15:59:46 -0700 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-02-19 11:58:12 +0000 |
| commit | 1effd1014d9140905093efe25eeefedb28a10875 (patch) | |
| tree | b34fb1d26f020b361d22904695cab6b9a7c1ea50 /bitbake/lib/hashserv/server.py | |
| parent | 324c9fd666117afb0dd689eaa8551bb02d6a042b (diff) | |
| download | poky-1effd1014d9140905093efe25eeefedb28a10875.tar.gz | |
bitbake: hashserv: Add Unihash Garbage Collection
Adds support for removing unused unihashes from the database. This is
done using a "mark and sweep" style of garbage collection where a
collection is started by marking which unihashes should be kept in the
database, then performing a sweep to remove any unmarked hashes.
(Bitbake rev: 433d4a075a1acfbd2a2913061739353a84bb01ed)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 105 |
1 files changed, 63 insertions, 42 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a86507830e..5ed852d1f3 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
| 199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): | 199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): |
| 200 | if not self.user: | 200 | if not self.user: |
| 201 | username = "Anonymous user" | 201 | username = "Anonymous user" |
| 202 | user_perms = self.anon_perms | 202 | user_perms = self.server.anon_perms |
| 203 | else: | 203 | else: |
| 204 | username = self.user.username | 204 | username = self.user.username |
| 205 | user_perms = self.user.permissions | 205 | user_perms = self.user.permissions |
| @@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
| 223 | 223 | ||
| 224 | 224 | ||
| 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
| 226 | def __init__( | 226 | def __init__(self, socket, server): |
| 227 | self, | 227 | super().__init__(socket, "OEHASHEQUIV", server.logger) |
| 228 | socket, | 228 | self.server = server |
| 229 | db_engine, | ||
| 230 | request_stats, | ||
| 231 | backfill_queue, | ||
| 232 | upstream, | ||
| 233 | read_only, | ||
| 234 | anon_perms, | ||
| 235 | ): | ||
| 236 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
| 237 | self.db_engine = db_engine | ||
| 238 | self.request_stats = request_stats | ||
| 239 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 229 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
| 240 | self.backfill_queue = backfill_queue | ||
| 241 | self.upstream = upstream | ||
| 242 | self.read_only = read_only | ||
| 243 | self.user = None | 230 | self.user = None |
| 244 | self.anon_perms = anon_perms | ||
| 245 | 231 | ||
| 246 | self.handlers.update( | 232 | self.handlers.update( |
| 247 | { | 233 | { |
| @@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 261 | } | 247 | } |
| 262 | ) | 248 | ) |
| 263 | 249 | ||
| 264 | if not read_only: | 250 | if not self.server.read_only: |
| 265 | self.handlers.update( | 251 | self.handlers.update( |
| 266 | { | 252 | { |
| 267 | "report-equiv": self.handle_equivreport, | 253 | "report-equiv": self.handle_equivreport, |
| 268 | "reset-stats": self.handle_reset_stats, | 254 | "reset-stats": self.handle_reset_stats, |
| 269 | "backfill-wait": self.handle_backfill_wait, | 255 | "backfill-wait": self.handle_backfill_wait, |
| 270 | "remove": self.handle_remove, | 256 | "remove": self.handle_remove, |
| 257 | "gc-mark": self.handle_gc_mark, | ||
| 258 | "gc-sweep": self.handle_gc_sweep, | ||
| 259 | "gc-status": self.handle_gc_status, | ||
| 271 | "clean-unused": self.handle_clean_unused, | 260 | "clean-unused": self.handle_clean_unused, |
| 272 | "refresh-token": self.handle_refresh_token, | 261 | "refresh-token": self.handle_refresh_token, |
| 273 | "set-user-perms": self.handle_set_perms, | 262 | "set-user-perms": self.handle_set_perms, |
| @@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 282 | def user_has_permissions(self, *permissions, allow_anon=True): | 271 | def user_has_permissions(self, *permissions, allow_anon=True): |
| 283 | permissions = set(permissions) | 272 | permissions = set(permissions) |
| 284 | if allow_anon: | 273 | if allow_anon: |
| 285 | if ALL_PERM in self.anon_perms: | 274 | if ALL_PERM in self.server.anon_perms: |
| 286 | return True | 275 | return True |
| 287 | 276 | ||
| 288 | if not permissions - self.anon_perms: | 277 | if not permissions - self.server.anon_perms: |
| 289 | return True | 278 | return True |
| 290 | 279 | ||
| 291 | if self.user is None: | 280 | if self.user is None: |
| @@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 303 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) | 292 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
| 304 | 293 | ||
| 305 | async def process_requests(self): | 294 | async def process_requests(self): |
| 306 | async with self.db_engine.connect(self.logger) as db: | 295 | async with self.server.db_engine.connect(self.logger) as db: |
| 307 | self.db = db | 296 | self.db = db |
| 308 | if self.upstream is not None: | 297 | if self.server.upstream is not None: |
| 309 | self.upstream_client = await create_async_client(self.upstream) | 298 | self.upstream_client = await create_async_client(self.server.upstream) |
| 310 | else: | 299 | else: |
| 311 | self.upstream_client = None | 300 | self.upstream_client = None |
| 312 | 301 | ||
| @@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 323 | if "stream" in k: | 312 | if "stream" in k: |
| 324 | return await self.handlers[k](msg[k]) | 313 | return await self.handlers[k](msg[k]) |
| 325 | else: | 314 | else: |
| 326 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): | 315 | with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
| 327 | return await self.handlers[k](msg[k]) | 316 | return await self.handlers[k](msg[k]) |
| 328 | 317 | ||
| 329 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 318 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
| @@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 404 | # possible (which is why the request sample is handled manually | 393 | # possible (which is why the request sample is handled manually |
| 405 | # instead of using 'with', and also why logging statements are | 394 | # instead of using 'with', and also why logging statements are |
| 406 | # commented out. | 395 | # commented out. |
| 407 | self.request_sample = self.request_stats.start_sample() | 396 | self.request_sample = self.server.request_stats.start_sample() |
| 408 | request_measure = self.request_sample.measure() | 397 | request_measure = self.request_sample.measure() |
| 409 | request_measure.start() | 398 | request_measure.start() |
| 410 | 399 | ||
| @@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 435 | # Post to the backfill queue after writing the result to minimize | 424 | # Post to the backfill queue after writing the result to minimize |
| 436 | # the turn around time on a request | 425 | # the turn around time on a request |
| 437 | if upstream is not None: | 426 | if upstream is not None: |
| 438 | await self.backfill_queue.put((method, taskhash)) | 427 | await self.server.backfill_queue.put((method, taskhash)) |
| 439 | 428 | ||
| 440 | await self.socket.send("ok") | 429 | await self.socket.send("ok") |
| 441 | return self.NO_RESPONSE | 430 | return self.NO_RESPONSE |
| @@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 461 | # report is made inside the function | 450 | # report is made inside the function |
| 462 | @permissions(READ_PERM) | 451 | @permissions(READ_PERM) |
| 463 | async def handle_report(self, data): | 452 | async def handle_report(self, data): |
| 464 | if self.read_only or not self.user_has_permissions(REPORT_PERM): | 453 | if self.server.read_only or not self.user_has_permissions(REPORT_PERM): |
| 465 | return await self.report_readonly(data) | 454 | return await self.report_readonly(data) |
| 466 | 455 | ||
| 467 | outhash_data = { | 456 | outhash_data = { |
| @@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 538 | @permissions(READ_PERM) | 527 | @permissions(READ_PERM) |
| 539 | async def handle_get_stats(self, request): | 528 | async def handle_get_stats(self, request): |
| 540 | return { | 529 | return { |
| 541 | "requests": self.request_stats.todict(), | 530 | "requests": self.server.request_stats.todict(), |
| 542 | } | 531 | } |
| 543 | 532 | ||
| 544 | @permissions(DB_ADMIN_PERM) | 533 | @permissions(DB_ADMIN_PERM) |
| 545 | async def handle_reset_stats(self, request): | 534 | async def handle_reset_stats(self, request): |
| 546 | d = { | 535 | d = { |
| 547 | "requests": self.request_stats.todict(), | 536 | "requests": self.server.request_stats.todict(), |
| 548 | } | 537 | } |
| 549 | 538 | ||
| 550 | self.request_stats.reset() | 539 | self.server.request_stats.reset() |
| 551 | return d | 540 | return d |
| 552 | 541 | ||
| 553 | @permissions(READ_PERM) | 542 | @permissions(READ_PERM) |
| 554 | async def handle_backfill_wait(self, request): | 543 | async def handle_backfill_wait(self, request): |
| 555 | d = { | 544 | d = { |
| 556 | "tasks": self.backfill_queue.qsize(), | 545 | "tasks": self.server.backfill_queue.qsize(), |
| 557 | } | 546 | } |
| 558 | await self.backfill_queue.join() | 547 | await self.server.backfill_queue.join() |
| 559 | return d | 548 | return d |
| 560 | 549 | ||
| 561 | @permissions(DB_ADMIN_PERM) | 550 | @permissions(DB_ADMIN_PERM) |
| @@ -567,6 +556,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 567 | return {"count": await self.db.remove(condition)} | 556 | return {"count": await self.db.remove(condition)} |
| 568 | 557 | ||
| 569 | @permissions(DB_ADMIN_PERM) | 558 | @permissions(DB_ADMIN_PERM) |
| 559 | async def handle_gc_mark(self, request): | ||
| 560 | condition = request["where"] | ||
| 561 | mark = request["mark"] | ||
| 562 | |||
| 563 | if not isinstance(condition, dict): | ||
| 564 | raise TypeError("Bad condition type %s" % type(condition)) | ||
| 565 | |||
| 566 | if not isinstance(mark, str): | ||
| 567 | raise TypeError("Bad mark type %s" % type(mark)) | ||
| 568 | |||
| 569 | return {"count": await self.db.gc_mark(mark, condition)} | ||
| 570 | |||
| 571 | @permissions(DB_ADMIN_PERM) | ||
| 572 | async def handle_gc_sweep(self, request): | ||
| 573 | mark = request["mark"] | ||
| 574 | |||
| 575 | if not isinstance(mark, str): | ||
| 576 | raise TypeError("Bad mark type %s" % type(mark)) | ||
| 577 | |||
| 578 | current_mark = await self.db.get_current_gc_mark() | ||
| 579 | |||
| 580 | if not current_mark or mark != current_mark: | ||
| 581 | raise bb.asyncrpc.InvokeError( | ||
| 582 | f"'{mark}' is not the current mark. Refusing to sweep" | ||
| 583 | ) | ||
| 584 | |||
| 585 | count = await self.db.gc_sweep() | ||
| 586 | |||
| 587 | return {"count": count} | ||
| 588 | |||
| 589 | @permissions(DB_ADMIN_PERM) | ||
| 590 | async def handle_gc_status(self, request): | ||
| 591 | (keep_rows, remove_rows, current_mark) = await self.db.gc_status() | ||
| 592 | return { | ||
| 593 | "keep": keep_rows, | ||
| 594 | "remove": remove_rows, | ||
| 595 | "mark": current_mark, | ||
| 596 | } | ||
| 597 | |||
| 598 | @permissions(DB_ADMIN_PERM) | ||
| 570 | async def handle_clean_unused(self, request): | 599 | async def handle_clean_unused(self, request): |
| 571 | max_age = request["max_age_seconds"] | 600 | max_age = request["max_age_seconds"] |
| 572 | oldest = datetime.now() - timedelta(seconds=-max_age) | 601 | oldest = datetime.now() - timedelta(seconds=-max_age) |
| @@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer): | |||
| 779 | ) | 808 | ) |
| 780 | 809 | ||
| 781 | def accept_client(self, socket): | 810 | def accept_client(self, socket): |
| 782 | return ServerClient( | 811 | return ServerClient(socket, self) |
| 783 | socket, | ||
| 784 | self.db_engine, | ||
| 785 | self.request_stats, | ||
| 786 | self.backfill_queue, | ||
| 787 | self.upstream, | ||
| 788 | self.read_only, | ||
| 789 | self.anon_perms, | ||
| 790 | ) | ||
| 791 | 812 | ||
| 792 | async def create_admin_user(self): | 813 | async def create_admin_user(self): |
| 793 | admin_permissions = (ALL_PERM,) | 814 | admin_permissions = (ALL_PERM,) |
