diff options
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r-- | bitbake/lib/hashserv/server.py | 105 |
1 files changed, 63 insertions, 42 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index a86507830e..5ed852d1f3 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -199,7 +199,7 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): | 199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): |
200 | if not self.user: | 200 | if not self.user: |
201 | username = "Anonymous user" | 201 | username = "Anonymous user" |
202 | user_perms = self.anon_perms | 202 | user_perms = self.server.anon_perms |
203 | else: | 203 | else: |
204 | username = self.user.username | 204 | username = self.user.username |
205 | user_perms = self.user.permissions | 205 | user_perms = self.user.permissions |
@@ -223,25 +223,11 @@ def permissions(*permissions, allow_anon=True, allow_self_service=False): | |||
223 | 223 | ||
224 | 224 | ||
225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
226 | def __init__( | 226 | def __init__(self, socket, server): |
227 | self, | 227 | super().__init__(socket, "OEHASHEQUIV", server.logger) |
228 | socket, | 228 | self.server = server |
229 | db_engine, | ||
230 | request_stats, | ||
231 | backfill_queue, | ||
232 | upstream, | ||
233 | read_only, | ||
234 | anon_perms, | ||
235 | ): | ||
236 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
237 | self.db_engine = db_engine | ||
238 | self.request_stats = request_stats | ||
239 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 229 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
240 | self.backfill_queue = backfill_queue | ||
241 | self.upstream = upstream | ||
242 | self.read_only = read_only | ||
243 | self.user = None | 230 | self.user = None |
244 | self.anon_perms = anon_perms | ||
245 | 231 | ||
246 | self.handlers.update( | 232 | self.handlers.update( |
247 | { | 233 | { |
@@ -261,13 +247,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
261 | } | 247 | } |
262 | ) | 248 | ) |
263 | 249 | ||
264 | if not read_only: | 250 | if not self.server.read_only: |
265 | self.handlers.update( | 251 | self.handlers.update( |
266 | { | 252 | { |
267 | "report-equiv": self.handle_equivreport, | 253 | "report-equiv": self.handle_equivreport, |
268 | "reset-stats": self.handle_reset_stats, | 254 | "reset-stats": self.handle_reset_stats, |
269 | "backfill-wait": self.handle_backfill_wait, | 255 | "backfill-wait": self.handle_backfill_wait, |
270 | "remove": self.handle_remove, | 256 | "remove": self.handle_remove, |
257 | "gc-mark": self.handle_gc_mark, | ||
258 | "gc-sweep": self.handle_gc_sweep, | ||
259 | "gc-status": self.handle_gc_status, | ||
271 | "clean-unused": self.handle_clean_unused, | 260 | "clean-unused": self.handle_clean_unused, |
272 | "refresh-token": self.handle_refresh_token, | 261 | "refresh-token": self.handle_refresh_token, |
273 | "set-user-perms": self.handle_set_perms, | 262 | "set-user-perms": self.handle_set_perms, |
@@ -282,10 +271,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
282 | def user_has_permissions(self, *permissions, allow_anon=True): | 271 | def user_has_permissions(self, *permissions, allow_anon=True): |
283 | permissions = set(permissions) | 272 | permissions = set(permissions) |
284 | if allow_anon: | 273 | if allow_anon: |
285 | if ALL_PERM in self.anon_perms: | 274 | if ALL_PERM in self.server.anon_perms: |
286 | return True | 275 | return True |
287 | 276 | ||
288 | if not permissions - self.anon_perms: | 277 | if not permissions - self.server.anon_perms: |
289 | return True | 278 | return True |
290 | 279 | ||
291 | if self.user is None: | 280 | if self.user is None: |
@@ -303,10 +292,10 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
303 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) | 292 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
304 | 293 | ||
305 | async def process_requests(self): | 294 | async def process_requests(self): |
306 | async with self.db_engine.connect(self.logger) as db: | 295 | async with self.server.db_engine.connect(self.logger) as db: |
307 | self.db = db | 296 | self.db = db |
308 | if self.upstream is not None: | 297 | if self.server.upstream is not None: |
309 | self.upstream_client = await create_async_client(self.upstream) | 298 | self.upstream_client = await create_async_client(self.server.upstream) |
310 | else: | 299 | else: |
311 | self.upstream_client = None | 300 | self.upstream_client = None |
312 | 301 | ||
@@ -323,7 +312,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
323 | if "stream" in k: | 312 | if "stream" in k: |
324 | return await self.handlers[k](msg[k]) | 313 | return await self.handlers[k](msg[k]) |
325 | else: | 314 | else: |
326 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): | 315 | with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
327 | return await self.handlers[k](msg[k]) | 316 | return await self.handlers[k](msg[k]) |
328 | 317 | ||
329 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 318 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
@@ -404,7 +393,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
404 | # possible (which is why the request sample is handled manually | 393 | # possible (which is why the request sample is handled manually |
405 | # instead of using 'with', and also why logging statements are | 394 | # instead of using 'with', and also why logging statements are |
406 | # commented out. | 395 | # commented out. |
407 | self.request_sample = self.request_stats.start_sample() | 396 | self.request_sample = self.server.request_stats.start_sample() |
408 | request_measure = self.request_sample.measure() | 397 | request_measure = self.request_sample.measure() |
409 | request_measure.start() | 398 | request_measure.start() |
410 | 399 | ||
@@ -435,7 +424,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
435 | # Post to the backfill queue after writing the result to minimize | 424 | # Post to the backfill queue after writing the result to minimize |
436 | # the turn around time on a request | 425 | # the turn around time on a request |
437 | if upstream is not None: | 426 | if upstream is not None: |
438 | await self.backfill_queue.put((method, taskhash)) | 427 | await self.server.backfill_queue.put((method, taskhash)) |
439 | 428 | ||
440 | await self.socket.send("ok") | 429 | await self.socket.send("ok") |
441 | return self.NO_RESPONSE | 430 | return self.NO_RESPONSE |
@@ -461,7 +450,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
461 | # report is made inside the function | 450 | # report is made inside the function |
462 | @permissions(READ_PERM) | 451 | @permissions(READ_PERM) |
463 | async def handle_report(self, data): | 452 | async def handle_report(self, data): |
464 | if self.read_only or not self.user_has_permissions(REPORT_PERM): | 453 | if self.server.read_only or not self.user_has_permissions(REPORT_PERM): |
465 | return await self.report_readonly(data) | 454 | return await self.report_readonly(data) |
466 | 455 | ||
467 | outhash_data = { | 456 | outhash_data = { |
@@ -538,24 +527,24 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
538 | @permissions(READ_PERM) | 527 | @permissions(READ_PERM) |
539 | async def handle_get_stats(self, request): | 528 | async def handle_get_stats(self, request): |
540 | return { | 529 | return { |
541 | "requests": self.request_stats.todict(), | 530 | "requests": self.server.request_stats.todict(), |
542 | } | 531 | } |
543 | 532 | ||
544 | @permissions(DB_ADMIN_PERM) | 533 | @permissions(DB_ADMIN_PERM) |
545 | async def handle_reset_stats(self, request): | 534 | async def handle_reset_stats(self, request): |
546 | d = { | 535 | d = { |
547 | "requests": self.request_stats.todict(), | 536 | "requests": self.server.request_stats.todict(), |
548 | } | 537 | } |
549 | 538 | ||
550 | self.request_stats.reset() | 539 | self.server.request_stats.reset() |
551 | return d | 540 | return d |
552 | 541 | ||
553 | @permissions(READ_PERM) | 542 | @permissions(READ_PERM) |
554 | async def handle_backfill_wait(self, request): | 543 | async def handle_backfill_wait(self, request): |
555 | d = { | 544 | d = { |
556 | "tasks": self.backfill_queue.qsize(), | 545 | "tasks": self.server.backfill_queue.qsize(), |
557 | } | 546 | } |
558 | await self.backfill_queue.join() | 547 | await self.server.backfill_queue.join() |
559 | return d | 548 | return d |
560 | 549 | ||
561 | @permissions(DB_ADMIN_PERM) | 550 | @permissions(DB_ADMIN_PERM) |
@@ -567,6 +556,46 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
567 | return {"count": await self.db.remove(condition)} | 556 | return {"count": await self.db.remove(condition)} |
568 | 557 | ||
569 | @permissions(DB_ADMIN_PERM) | 558 | @permissions(DB_ADMIN_PERM) |
559 | async def handle_gc_mark(self, request): | ||
560 | condition = request["where"] | ||
561 | mark = request["mark"] | ||
562 | |||
563 | if not isinstance(condition, dict): | ||
564 | raise TypeError("Bad condition type %s" % type(condition)) | ||
565 | |||
566 | if not isinstance(mark, str): | ||
567 | raise TypeError("Bad mark type %s" % type(mark)) | ||
568 | |||
569 | return {"count": await self.db.gc_mark(mark, condition)} | ||
570 | |||
571 | @permissions(DB_ADMIN_PERM) | ||
572 | async def handle_gc_sweep(self, request): | ||
573 | mark = request["mark"] | ||
574 | |||
575 | if not isinstance(mark, str): | ||
576 | raise TypeError("Bad mark type %s" % type(mark)) | ||
577 | |||
578 | current_mark = await self.db.get_current_gc_mark() | ||
579 | |||
580 | if not current_mark or mark != current_mark: | ||
581 | raise bb.asyncrpc.InvokeError( | ||
582 | f"'{mark}' is not the current mark. Refusing to sweep" | ||
583 | ) | ||
584 | |||
585 | count = await self.db.gc_sweep() | ||
586 | |||
587 | return {"count": count} | ||
588 | |||
589 | @permissions(DB_ADMIN_PERM) | ||
590 | async def handle_gc_status(self, request): | ||
591 | (keep_rows, remove_rows, current_mark) = await self.db.gc_status() | ||
592 | return { | ||
593 | "keep": keep_rows, | ||
594 | "remove": remove_rows, | ||
595 | "mark": current_mark, | ||
596 | } | ||
597 | |||
598 | @permissions(DB_ADMIN_PERM) | ||
570 | async def handle_clean_unused(self, request): | 599 | async def handle_clean_unused(self, request): |
571 | max_age = request["max_age_seconds"] | 600 | max_age = request["max_age_seconds"] |
572 | oldest = datetime.now() - timedelta(seconds=-max_age) | 601 | oldest = datetime.now() - timedelta(seconds=-max_age) |
@@ -779,15 +808,7 @@ class Server(bb.asyncrpc.AsyncServer): | |||
779 | ) | 808 | ) |
780 | 809 | ||
781 | def accept_client(self, socket): | 810 | def accept_client(self, socket): |
782 | return ServerClient( | 811 | return ServerClient(socket, self) |
783 | socket, | ||
784 | self.db_engine, | ||
785 | self.request_stats, | ||
786 | self.backfill_queue, | ||
787 | self.upstream, | ||
788 | self.read_only, | ||
789 | self.anon_perms, | ||
790 | ) | ||
791 | 812 | ||
792 | async def create_admin_user(self): | 813 | async def create_admin_user(self): |
793 | admin_permissions = (ALL_PERM,) | 814 | admin_permissions = (ALL_PERM,) |