summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r--bitbake/lib/hashserv/server.py357
1 files changed, 353 insertions, 4 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index c691df7618..f5baa6be78 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -8,13 +8,48 @@ import asyncio
8import logging 8import logging
9import math 9import math
10import time 10import time
11import os
12import base64
13import hashlib
11from . import create_async_client 14from . import create_async_client
12import bb.asyncrpc 15import bb.asyncrpc
13 16
14
15logger = logging.getLogger("hashserv.server") 17logger = logging.getLogger("hashserv.server")
16 18
17 19
20# This permission only exists to match nothing
21NONE_PERM = "@none"
22
23READ_PERM = "@read"
24REPORT_PERM = "@report"
25DB_ADMIN_PERM = "@db-admin"
26USER_ADMIN_PERM = "@user-admin"
27ALL_PERM = "@all"
28
29ALL_PERMISSIONS = {
30 READ_PERM,
31 REPORT_PERM,
32 DB_ADMIN_PERM,
33 USER_ADMIN_PERM,
34 ALL_PERM,
35}
36
37DEFAULT_ANON_PERMS = (
38 READ_PERM,
39 REPORT_PERM,
40 DB_ADMIN_PERM,
41)
42
43TOKEN_ALGORITHM = "sha256"
44
45# 48 bytes of random data will result in 64 characters when base64
46# encoded. This number also ensures that the base64 encoding won't have any
47# trailing '=' characters.
48TOKEN_SIZE = 48
49
50SALT_SIZE = 8
51
52
18class Measurement(object): 53class Measurement(object):
19 def __init__(self, sample): 54 def __init__(self, sample):
20 self.sample = sample 55 self.sample = sample
@@ -108,6 +143,85 @@ class Stats(object):
108 } 143 }
109 144
110 145
146token_refresh_semaphore = asyncio.Lock()
147
148
149async def new_token():
150 # Prevent malicious users from using this API to deduce the entropy
151 # pool on the server and thus be able to guess a token. *All* token
152 # refresh requests lock the same global semaphore and then sleep for a
153 # short time. The effectively rate limits the total number of requests
154 # than can be made across all clients to 10/second, which should be enough
155 # since you have to be an authenticated users to make the request in the
156 # first place
157 async with token_refresh_semaphore:
158 await asyncio.sleep(0.1)
159 raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK)
160
161 return base64.b64encode(raw, b"._").decode("utf-8")
162
163
164def new_salt():
165 return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex()
166
167
168def hash_token(algo, salt, token):
169 h = hashlib.new(algo)
170 h.update(salt.encode("utf-8"))
171 h.update(token.encode("utf-8"))
172 return ":".join([algo, salt, h.hexdigest()])
173
174
175def permissions(*permissions, allow_anon=True, allow_self_service=False):
176 """
177 Function decorator that can be used to decorate an RPC function call and
178 check that the current users permissions match the require permissions.
179
180 If allow_anon is True, the user will also be allowed to make the RPC call
181 if the anonymous user permissions match the permissions.
182
183 If allow_self_service is True, and the "username" property in the request
184 is the currently logged in user, or not specified, the user will also be
185 allowed to make the request. This allows users to access normal privileged
186 API, as long as they are only modifying their own user properties (e.g.
187 users can be allowed to reset their own token without @user-admin
188 permissions, but not the token for any other user.
189 """
190
191 def wrapper(func):
192 async def wrap(self, request):
193 if allow_self_service and self.user is not None:
194 username = request.get("username", self.user.username)
195 if username == self.user.username:
196 request["username"] = self.user.username
197 return await func(self, request)
198
199 if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
200 if not self.user:
201 username = "Anonymous user"
202 user_perms = self.anon_perms
203 else:
204 username = self.user.username
205 user_perms = self.user.permissions
206
207 self.logger.info(
208 "User %s with permissions %r denied from calling %s. Missing permissions(s) %r",
209 username,
210 ", ".join(user_perms),
211 func.__name__,
212 ", ".join(permissions),
213 )
214 raise bb.asyncrpc.InvokeError(
215 f"{username} is not allowed to access permissions(s) {', '.join(permissions)}"
216 )
217
218 return await func(self, request)
219
220 return wrap
221
222 return wrapper
223
224
111class ServerClient(bb.asyncrpc.AsyncServerConnection): 225class ServerClient(bb.asyncrpc.AsyncServerConnection):
112 def __init__( 226 def __init__(
113 self, 227 self,
@@ -117,6 +231,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
117 backfill_queue, 231 backfill_queue,
118 upstream, 232 upstream,
119 read_only, 233 read_only,
234 anon_perms,
120 ): 235 ):
121 super().__init__(socket, "OEHASHEQUIV", logger) 236 super().__init__(socket, "OEHASHEQUIV", logger)
122 self.db_engine = db_engine 237 self.db_engine = db_engine
@@ -125,6 +240,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
125 self.backfill_queue = backfill_queue 240 self.backfill_queue = backfill_queue
126 self.upstream = upstream 241 self.upstream = upstream
127 self.read_only = read_only 242 self.read_only = read_only
243 self.user = None
244 self.anon_perms = anon_perms
128 245
129 self.handlers.update( 246 self.handlers.update(
130 { 247 {
@@ -135,6 +252,9 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
135 # Not always read-only, but internally checks if the server is 252 # Not always read-only, but internally checks if the server is
136 # read-only 253 # read-only
137 "report": self.handle_report, 254 "report": self.handle_report,
255 "auth": self.handle_auth,
256 "get-user": self.handle_get_user,
257 "get-all-users": self.handle_get_all_users,
138 } 258 }
139 ) 259 )
140 260
@@ -146,9 +266,36 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
146 "backfill-wait": self.handle_backfill_wait, 266 "backfill-wait": self.handle_backfill_wait,
147 "remove": self.handle_remove, 267 "remove": self.handle_remove,
148 "clean-unused": self.handle_clean_unused, 268 "clean-unused": self.handle_clean_unused,
269 "refresh-token": self.handle_refresh_token,
270 "set-user-perms": self.handle_set_perms,
271 "new-user": self.handle_new_user,
272 "delete-user": self.handle_delete_user,
149 } 273 }
150 ) 274 )
151 275
276 def raise_no_user_error(self, username):
277 raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists")
278
279 def user_has_permissions(self, *permissions, allow_anon=True):
280 permissions = set(permissions)
281 if allow_anon:
282 if ALL_PERM in self.anon_perms:
283 return True
284
285 if not permissions - self.anon_perms:
286 return True
287
288 if self.user is None:
289 return False
290
291 if ALL_PERM in self.user.permissions:
292 return True
293
294 if not permissions - self.user.permissions:
295 return True
296
297 return False
298
152 def validate_proto_version(self): 299 def validate_proto_version(self):
153 return self.proto_version > (1, 0) and self.proto_version <= (1, 1) 300 return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
154 301
@@ -178,6 +325,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
178 325
179 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 326 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
180 327
328 @permissions(READ_PERM)
181 async def handle_get(self, request): 329 async def handle_get(self, request):
182 method = request["method"] 330 method = request["method"]
183 taskhash = request["taskhash"] 331 taskhash = request["taskhash"]
@@ -206,6 +354,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
206 354
207 return d 355 return d
208 356
357 @permissions(READ_PERM)
209 async def handle_get_outhash(self, request): 358 async def handle_get_outhash(self, request):
210 method = request["method"] 359 method = request["method"]
211 outhash = request["outhash"] 360 outhash = request["outhash"]
@@ -236,6 +385,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
236 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 385 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
237 await self.db.insert_outhash(data) 386 await self.db.insert_outhash(data)
238 387
388 @permissions(READ_PERM)
239 async def handle_get_stream(self, request): 389 async def handle_get_stream(self, request):
240 await self.socket.send_message("ok") 390 await self.socket.send_message("ok")
241 391
@@ -304,8 +454,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
304 "unihash": unihash, 454 "unihash": unihash,
305 } 455 }
306 456
457 # Since this can be called either read only or to report, the check to
458 # report is made inside the function
459 @permissions(READ_PERM)
307 async def handle_report(self, data): 460 async def handle_report(self, data):
308 if self.read_only: 461 if self.read_only or not self.user_has_permissions(REPORT_PERM):
309 return await self.report_readonly(data) 462 return await self.report_readonly(data)
310 463
311 outhash_data = { 464 outhash_data = {
@@ -358,6 +511,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
358 "unihash": unihash, 511 "unihash": unihash,
359 } 512 }
360 513
514 @permissions(READ_PERM, REPORT_PERM)
361 async def handle_equivreport(self, data): 515 async def handle_equivreport(self, data):
362 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 516 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
363 517
@@ -375,11 +529,13 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
375 529
376 return {k: row[k] for k in ("taskhash", "method", "unihash")} 530 return {k: row[k] for k in ("taskhash", "method", "unihash")}
377 531
532 @permissions(READ_PERM)
378 async def handle_get_stats(self, request): 533 async def handle_get_stats(self, request):
379 return { 534 return {
380 "requests": self.request_stats.todict(), 535 "requests": self.request_stats.todict(),
381 } 536 }
382 537
538 @permissions(DB_ADMIN_PERM)
383 async def handle_reset_stats(self, request): 539 async def handle_reset_stats(self, request):
384 d = { 540 d = {
385 "requests": self.request_stats.todict(), 541 "requests": self.request_stats.todict(),
@@ -388,6 +544,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
388 self.request_stats.reset() 544 self.request_stats.reset()
389 return d 545 return d
390 546
547 @permissions(READ_PERM)
391 async def handle_backfill_wait(self, request): 548 async def handle_backfill_wait(self, request):
392 d = { 549 d = {
393 "tasks": self.backfill_queue.qsize(), 550 "tasks": self.backfill_queue.qsize(),
@@ -395,6 +552,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
395 await self.backfill_queue.join() 552 await self.backfill_queue.join()
396 return d 553 return d
397 554
555 @permissions(DB_ADMIN_PERM)
398 async def handle_remove(self, request): 556 async def handle_remove(self, request):
399 condition = request["where"] 557 condition = request["where"]
400 if not isinstance(condition, dict): 558 if not isinstance(condition, dict):
@@ -402,19 +560,178 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
402 560
403 return {"count": await self.db.remove(condition)} 561 return {"count": await self.db.remove(condition)}
404 562
563 @permissions(DB_ADMIN_PERM)
405 async def handle_clean_unused(self, request): 564 async def handle_clean_unused(self, request):
406 max_age = request["max_age_seconds"] 565 max_age = request["max_age_seconds"]
407 oldest = datetime.now() - timedelta(seconds=-max_age) 566 oldest = datetime.now() - timedelta(seconds=-max_age)
408 return {"count": await self.db.clean_unused(oldest)} 567 return {"count": await self.db.clean_unused(oldest)}
409 568
569 # The authentication API is always allowed
570 async def handle_auth(self, request):
571 username = str(request["username"])
572 token = str(request["token"])
573
574 async def fail_auth():
575 nonlocal username
576 # Rate limit bad login attempts
577 await asyncio.sleep(1)
578 raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}")
579
580 user, db_token = await self.db.lookup_user_token(username)
581
582 if not user or not db_token:
583 await fail_auth()
584
585 try:
586 algo, salt, _ = db_token.split(":")
587 except ValueError:
588 await fail_auth()
589
590 if hash_token(algo, salt, token) != db_token:
591 await fail_auth()
592
593 self.user = user
594
595 self.logger.info("Authenticated as %s", username)
596
597 return {
598 "result": True,
599 "username": self.user.username,
600 "permissions": sorted(list(self.user.permissions)),
601 }
602
603 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
604 async def handle_refresh_token(self, request):
605 username = str(request["username"])
606
607 token = await new_token()
608
609 updated = await self.db.set_user_token(
610 username,
611 hash_token(TOKEN_ALGORITHM, new_salt(), token),
612 )
613 if not updated:
614 self.raise_no_user_error(username)
615
616 return {"username": username, "token": token}
617
618 def get_perm_arg(self, arg):
619 if not isinstance(arg, list):
620 raise bb.asyncrpc.InvokeError("Unexpected type for permissions")
621
622 arg = set(arg)
623 try:
624 arg.remove(NONE_PERM)
625 except KeyError:
626 pass
627
628 unknown_perms = arg - ALL_PERMISSIONS
629 if unknown_perms:
630 raise bb.asyncrpc.InvokeError(
631 "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms)))
632 )
633
634 return sorted(list(arg))
635
636 def return_perms(self, permissions):
637 if ALL_PERM in permissions:
638 return sorted(list(ALL_PERMISSIONS))
639 return sorted(list(permissions))
640
641 @permissions(USER_ADMIN_PERM, allow_anon=False)
642 async def handle_set_perms(self, request):
643 username = str(request["username"])
644 permissions = self.get_perm_arg(request["permissions"])
645
646 if not await self.db.set_user_perms(username, permissions):
647 self.raise_no_user_error(username)
648
649 return {
650 "username": username,
651 "permissions": self.return_perms(permissions),
652 }
653
654 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
655 async def handle_get_user(self, request):
656 username = str(request["username"])
657
658 user = await self.db.lookup_user(username)
659 if user is None:
660 return None
661
662 return {
663 "username": user.username,
664 "permissions": self.return_perms(user.permissions),
665 }
666
667 @permissions(USER_ADMIN_PERM, allow_anon=False)
668 async def handle_get_all_users(self, request):
669 users = await self.db.get_all_users()
670 return {
671 "users": [
672 {
673 "username": u.username,
674 "permissions": self.return_perms(u.permissions),
675 }
676 for u in users
677 ]
678 }
679
680 @permissions(USER_ADMIN_PERM, allow_anon=False)
681 async def handle_new_user(self, request):
682 username = str(request["username"])
683 permissions = self.get_perm_arg(request["permissions"])
684
685 token = await new_token()
686
687 inserted = await self.db.new_user(
688 username,
689 permissions,
690 hash_token(TOKEN_ALGORITHM, new_salt(), token),
691 )
692 if not inserted:
693 raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'")
694
695 return {
696 "username": username,
697 "permissions": self.return_perms(permissions),
698 "token": token,
699 }
700
701 @permissions(USER_ADMIN_PERM, allow_anon=False)
702 async def handle_delete_user(self, request):
703 username = str(request["username"])
704
705 if not await self.db.delete_user(username):
706 self.raise_no_user_error(username)
707
708 return {"username": username}
709
410 710
411class Server(bb.asyncrpc.AsyncServer): 711class Server(bb.asyncrpc.AsyncServer):
412 def __init__(self, db_engine, upstream=None, read_only=False): 712 def __init__(
713 self,
714 db_engine,
715 upstream=None,
716 read_only=False,
717 anon_perms=DEFAULT_ANON_PERMS,
718 admin_username=None,
719 admin_password=None,
720 ):
413 if upstream and read_only: 721 if upstream and read_only:
414 raise bb.asyncrpc.ServerError( 722 raise bb.asyncrpc.ServerError(
415 "Read-only hashserv cannot pull from an upstream server" 723 "Read-only hashserv cannot pull from an upstream server"
416 ) 724 )
417 725
726 disallowed_perms = set(anon_perms) - set(
727 [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM]
728 )
729
730 if disallowed_perms:
731 raise bb.asyncrpc.ServerError(
732 f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users"
733 )
734
418 super().__init__(logger) 735 super().__init__(logger)
419 736
420 self.request_stats = Stats() 737 self.request_stats = Stats()
@@ -422,6 +739,13 @@ class Server(bb.asyncrpc.AsyncServer):
422 self.upstream = upstream 739 self.upstream = upstream
423 self.read_only = read_only 740 self.read_only = read_only
424 self.backfill_queue = None 741 self.backfill_queue = None
742 self.anon_perms = set(anon_perms)
743 self.admin_username = admin_username
744 self.admin_password = admin_password
745
746 self.logger.info(
747 "Anonymous user permissions are: %s", ", ".join(self.anon_perms)
748 )
425 749
426 def accept_client(self, socket): 750 def accept_client(self, socket):
427 return ServerClient( 751 return ServerClient(
@@ -431,12 +755,34 @@ class Server(bb.asyncrpc.AsyncServer):
431 self.backfill_queue, 755 self.backfill_queue,
432 self.upstream, 756 self.upstream,
433 self.read_only, 757 self.read_only,
758 self.anon_perms,
434 ) 759 )
435 760
761 async def create_admin_user(self):
762 admin_permissions = (ALL_PERM,)
763 async with self.db_engine.connect(self.logger) as db:
764 added = await db.new_user(
765 self.admin_username,
766 admin_permissions,
767 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
768 )
769 if added:
770 self.logger.info("Created admin user '%s'", self.admin_username)
771 else:
772 await db.set_user_perms(
773 self.admin_username,
774 admin_permissions,
775 )
776 await db.set_user_token(
777 self.admin_username,
778 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
779 )
780 self.logger.info("Admin user '%s' updated", self.admin_username)
781
436 async def backfill_worker_task(self): 782 async def backfill_worker_task(self):
437 async with await create_async_client( 783 async with await create_async_client(
438 self.upstream 784 self.upstream
439 ) as client, self.db_engine.connect(logger) as db: 785 ) as client, self.db_engine.connect(self.logger) as db:
440 while True: 786 while True:
441 item = await self.backfill_queue.get() 787 item = await self.backfill_queue.get()
442 if item is None: 788 if item is None:
@@ -457,6 +803,9 @@ class Server(bb.asyncrpc.AsyncServer):
457 803
458 self.loop.run_until_complete(self.db_engine.create()) 804 self.loop.run_until_complete(self.db_engine.create())
459 805
806 if self.admin_username:
807 self.loop.run_until_complete(self.create_admin_user())
808
460 return tasks 809 return tasks
461 810
462 async def stop(self): 811 async def stop(self):