diff options
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r-- | bitbake/lib/hashserv/server.py | 357 |
1 files changed, 353 insertions, 4 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index c691df7618..f5baa6be78 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -8,13 +8,48 @@ import asyncio | |||
8 | import logging | 8 | import logging |
9 | import math | 9 | import math |
10 | import time | 10 | import time |
11 | import os | ||
12 | import base64 | ||
13 | import hashlib | ||
11 | from . import create_async_client | 14 | from . import create_async_client |
12 | import bb.asyncrpc | 15 | import bb.asyncrpc |
13 | 16 | ||
14 | |||
15 | logger = logging.getLogger("hashserv.server") | 17 | logger = logging.getLogger("hashserv.server") |
16 | 18 | ||
17 | 19 | ||
20 | # This permission only exists to match nothing | ||
21 | NONE_PERM = "@none" | ||
22 | |||
23 | READ_PERM = "@read" | ||
24 | REPORT_PERM = "@report" | ||
25 | DB_ADMIN_PERM = "@db-admin" | ||
26 | USER_ADMIN_PERM = "@user-admin" | ||
27 | ALL_PERM = "@all" | ||
28 | |||
29 | ALL_PERMISSIONS = { | ||
30 | READ_PERM, | ||
31 | REPORT_PERM, | ||
32 | DB_ADMIN_PERM, | ||
33 | USER_ADMIN_PERM, | ||
34 | ALL_PERM, | ||
35 | } | ||
36 | |||
37 | DEFAULT_ANON_PERMS = ( | ||
38 | READ_PERM, | ||
39 | REPORT_PERM, | ||
40 | DB_ADMIN_PERM, | ||
41 | ) | ||
42 | |||
43 | TOKEN_ALGORITHM = "sha256" | ||
44 | |||
45 | # 48 bytes of random data will result in 64 characters when base64 | ||
46 | # encoded. This number also ensures that the base64 encoding won't have any | ||
47 | # trailing '=' characters. | ||
48 | TOKEN_SIZE = 48 | ||
49 | |||
50 | SALT_SIZE = 8 | ||
51 | |||
52 | |||
18 | class Measurement(object): | 53 | class Measurement(object): |
19 | def __init__(self, sample): | 54 | def __init__(self, sample): |
20 | self.sample = sample | 55 | self.sample = sample |
@@ -108,6 +143,85 @@ class Stats(object): | |||
108 | } | 143 | } |
109 | 144 | ||
110 | 145 | ||
146 | token_refresh_semaphore = asyncio.Lock() | ||
147 | |||
148 | |||
149 | async def new_token(): | ||
150 | # Prevent malicious users from using this API to deduce the entropy | ||
151 | # pool on the server and thus be able to guess a token. *All* token | ||
152 | # refresh requests lock the same global semaphore and then sleep for a | ||
153 | # short time. The effectively rate limits the total number of requests | ||
154 | # than can be made across all clients to 10/second, which should be enough | ||
155 | # since you have to be an authenticated users to make the request in the | ||
156 | # first place | ||
157 | async with token_refresh_semaphore: | ||
158 | await asyncio.sleep(0.1) | ||
159 | raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK) | ||
160 | |||
161 | return base64.b64encode(raw, b"._").decode("utf-8") | ||
162 | |||
163 | |||
164 | def new_salt(): | ||
165 | return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex() | ||
166 | |||
167 | |||
168 | def hash_token(algo, salt, token): | ||
169 | h = hashlib.new(algo) | ||
170 | h.update(salt.encode("utf-8")) | ||
171 | h.update(token.encode("utf-8")) | ||
172 | return ":".join([algo, salt, h.hexdigest()]) | ||
173 | |||
174 | |||
175 | def permissions(*permissions, allow_anon=True, allow_self_service=False): | ||
176 | """ | ||
177 | Function decorator that can be used to decorate an RPC function call and | ||
178 | check that the current users permissions match the require permissions. | ||
179 | |||
180 | If allow_anon is True, the user will also be allowed to make the RPC call | ||
181 | if the anonymous user permissions match the permissions. | ||
182 | |||
183 | If allow_self_service is True, and the "username" property in the request | ||
184 | is the currently logged in user, or not specified, the user will also be | ||
185 | allowed to make the request. This allows users to access normal privileged | ||
186 | API, as long as they are only modifying their own user properties (e.g. | ||
187 | users can be allowed to reset their own token without @user-admin | ||
188 | permissions, but not the token for any other user. | ||
189 | """ | ||
190 | |||
191 | def wrapper(func): | ||
192 | async def wrap(self, request): | ||
193 | if allow_self_service and self.user is not None: | ||
194 | username = request.get("username", self.user.username) | ||
195 | if username == self.user.username: | ||
196 | request["username"] = self.user.username | ||
197 | return await func(self, request) | ||
198 | |||
199 | if not self.user_has_permissions(*permissions, allow_anon=allow_anon): | ||
200 | if not self.user: | ||
201 | username = "Anonymous user" | ||
202 | user_perms = self.anon_perms | ||
203 | else: | ||
204 | username = self.user.username | ||
205 | user_perms = self.user.permissions | ||
206 | |||
207 | self.logger.info( | ||
208 | "User %s with permissions %r denied from calling %s. Missing permissions(s) %r", | ||
209 | username, | ||
210 | ", ".join(user_perms), | ||
211 | func.__name__, | ||
212 | ", ".join(permissions), | ||
213 | ) | ||
214 | raise bb.asyncrpc.InvokeError( | ||
215 | f"{username} is not allowed to access permissions(s) {', '.join(permissions)}" | ||
216 | ) | ||
217 | |||
218 | return await func(self, request) | ||
219 | |||
220 | return wrap | ||
221 | |||
222 | return wrapper | ||
223 | |||
224 | |||
111 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 225 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
112 | def __init__( | 226 | def __init__( |
113 | self, | 227 | self, |
@@ -117,6 +231,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
117 | backfill_queue, | 231 | backfill_queue, |
118 | upstream, | 232 | upstream, |
119 | read_only, | 233 | read_only, |
234 | anon_perms, | ||
120 | ): | 235 | ): |
121 | super().__init__(socket, "OEHASHEQUIV", logger) | 236 | super().__init__(socket, "OEHASHEQUIV", logger) |
122 | self.db_engine = db_engine | 237 | self.db_engine = db_engine |
@@ -125,6 +240,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
125 | self.backfill_queue = backfill_queue | 240 | self.backfill_queue = backfill_queue |
126 | self.upstream = upstream | 241 | self.upstream = upstream |
127 | self.read_only = read_only | 242 | self.read_only = read_only |
243 | self.user = None | ||
244 | self.anon_perms = anon_perms | ||
128 | 245 | ||
129 | self.handlers.update( | 246 | self.handlers.update( |
130 | { | 247 | { |
@@ -135,6 +252,9 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
135 | # Not always read-only, but internally checks if the server is | 252 | # Not always read-only, but internally checks if the server is |
136 | # read-only | 253 | # read-only |
137 | "report": self.handle_report, | 254 | "report": self.handle_report, |
255 | "auth": self.handle_auth, | ||
256 | "get-user": self.handle_get_user, | ||
257 | "get-all-users": self.handle_get_all_users, | ||
138 | } | 258 | } |
139 | ) | 259 | ) |
140 | 260 | ||
@@ -146,9 +266,36 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
146 | "backfill-wait": self.handle_backfill_wait, | 266 | "backfill-wait": self.handle_backfill_wait, |
147 | "remove": self.handle_remove, | 267 | "remove": self.handle_remove, |
148 | "clean-unused": self.handle_clean_unused, | 268 | "clean-unused": self.handle_clean_unused, |
269 | "refresh-token": self.handle_refresh_token, | ||
270 | "set-user-perms": self.handle_set_perms, | ||
271 | "new-user": self.handle_new_user, | ||
272 | "delete-user": self.handle_delete_user, | ||
149 | } | 273 | } |
150 | ) | 274 | ) |
151 | 275 | ||
276 | def raise_no_user_error(self, username): | ||
277 | raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists") | ||
278 | |||
279 | def user_has_permissions(self, *permissions, allow_anon=True): | ||
280 | permissions = set(permissions) | ||
281 | if allow_anon: | ||
282 | if ALL_PERM in self.anon_perms: | ||
283 | return True | ||
284 | |||
285 | if not permissions - self.anon_perms: | ||
286 | return True | ||
287 | |||
288 | if self.user is None: | ||
289 | return False | ||
290 | |||
291 | if ALL_PERM in self.user.permissions: | ||
292 | return True | ||
293 | |||
294 | if not permissions - self.user.permissions: | ||
295 | return True | ||
296 | |||
297 | return False | ||
298 | |||
152 | def validate_proto_version(self): | 299 | def validate_proto_version(self): |
153 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) | 300 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
154 | 301 | ||
@@ -178,6 +325,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
178 | 325 | ||
179 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 326 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
180 | 327 | ||
328 | @permissions(READ_PERM) | ||
181 | async def handle_get(self, request): | 329 | async def handle_get(self, request): |
182 | method = request["method"] | 330 | method = request["method"] |
183 | taskhash = request["taskhash"] | 331 | taskhash = request["taskhash"] |
@@ -206,6 +354,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
206 | 354 | ||
207 | return d | 355 | return d |
208 | 356 | ||
357 | @permissions(READ_PERM) | ||
209 | async def handle_get_outhash(self, request): | 358 | async def handle_get_outhash(self, request): |
210 | method = request["method"] | 359 | method = request["method"] |
211 | outhash = request["outhash"] | 360 | outhash = request["outhash"] |
@@ -236,6 +385,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
236 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) | 385 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
237 | await self.db.insert_outhash(data) | 386 | await self.db.insert_outhash(data) |
238 | 387 | ||
388 | @permissions(READ_PERM) | ||
239 | async def handle_get_stream(self, request): | 389 | async def handle_get_stream(self, request): |
240 | await self.socket.send_message("ok") | 390 | await self.socket.send_message("ok") |
241 | 391 | ||
@@ -304,8 +454,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
304 | "unihash": unihash, | 454 | "unihash": unihash, |
305 | } | 455 | } |
306 | 456 | ||
457 | # Since this can be called either read only or to report, the check to | ||
458 | # report is made inside the function | ||
459 | @permissions(READ_PERM) | ||
307 | async def handle_report(self, data): | 460 | async def handle_report(self, data): |
308 | if self.read_only: | 461 | if self.read_only or not self.user_has_permissions(REPORT_PERM): |
309 | return await self.report_readonly(data) | 462 | return await self.report_readonly(data) |
310 | 463 | ||
311 | outhash_data = { | 464 | outhash_data = { |
@@ -358,6 +511,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
358 | "unihash": unihash, | 511 | "unihash": unihash, |
359 | } | 512 | } |
360 | 513 | ||
514 | @permissions(READ_PERM, REPORT_PERM) | ||
361 | async def handle_equivreport(self, data): | 515 | async def handle_equivreport(self, data): |
362 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) | 516 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
363 | 517 | ||
@@ -375,11 +529,13 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
375 | 529 | ||
376 | return {k: row[k] for k in ("taskhash", "method", "unihash")} | 530 | return {k: row[k] for k in ("taskhash", "method", "unihash")} |
377 | 531 | ||
532 | @permissions(READ_PERM) | ||
378 | async def handle_get_stats(self, request): | 533 | async def handle_get_stats(self, request): |
379 | return { | 534 | return { |
380 | "requests": self.request_stats.todict(), | 535 | "requests": self.request_stats.todict(), |
381 | } | 536 | } |
382 | 537 | ||
538 | @permissions(DB_ADMIN_PERM) | ||
383 | async def handle_reset_stats(self, request): | 539 | async def handle_reset_stats(self, request): |
384 | d = { | 540 | d = { |
385 | "requests": self.request_stats.todict(), | 541 | "requests": self.request_stats.todict(), |
@@ -388,6 +544,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
388 | self.request_stats.reset() | 544 | self.request_stats.reset() |
389 | return d | 545 | return d |
390 | 546 | ||
547 | @permissions(READ_PERM) | ||
391 | async def handle_backfill_wait(self, request): | 548 | async def handle_backfill_wait(self, request): |
392 | d = { | 549 | d = { |
393 | "tasks": self.backfill_queue.qsize(), | 550 | "tasks": self.backfill_queue.qsize(), |
@@ -395,6 +552,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
395 | await self.backfill_queue.join() | 552 | await self.backfill_queue.join() |
396 | return d | 553 | return d |
397 | 554 | ||
555 | @permissions(DB_ADMIN_PERM) | ||
398 | async def handle_remove(self, request): | 556 | async def handle_remove(self, request): |
399 | condition = request["where"] | 557 | condition = request["where"] |
400 | if not isinstance(condition, dict): | 558 | if not isinstance(condition, dict): |
@@ -402,19 +560,178 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
402 | 560 | ||
403 | return {"count": await self.db.remove(condition)} | 561 | return {"count": await self.db.remove(condition)} |
404 | 562 | ||
563 | @permissions(DB_ADMIN_PERM) | ||
405 | async def handle_clean_unused(self, request): | 564 | async def handle_clean_unused(self, request): |
406 | max_age = request["max_age_seconds"] | 565 | max_age = request["max_age_seconds"] |
407 | oldest = datetime.now() - timedelta(seconds=-max_age) | 566 | oldest = datetime.now() - timedelta(seconds=-max_age) |
408 | return {"count": await self.db.clean_unused(oldest)} | 567 | return {"count": await self.db.clean_unused(oldest)} |
409 | 568 | ||
569 | # The authentication API is always allowed | ||
570 | async def handle_auth(self, request): | ||
571 | username = str(request["username"]) | ||
572 | token = str(request["token"]) | ||
573 | |||
574 | async def fail_auth(): | ||
575 | nonlocal username | ||
576 | # Rate limit bad login attempts | ||
577 | await asyncio.sleep(1) | ||
578 | raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}") | ||
579 | |||
580 | user, db_token = await self.db.lookup_user_token(username) | ||
581 | |||
582 | if not user or not db_token: | ||
583 | await fail_auth() | ||
584 | |||
585 | try: | ||
586 | algo, salt, _ = db_token.split(":") | ||
587 | except ValueError: | ||
588 | await fail_auth() | ||
589 | |||
590 | if hash_token(algo, salt, token) != db_token: | ||
591 | await fail_auth() | ||
592 | |||
593 | self.user = user | ||
594 | |||
595 | self.logger.info("Authenticated as %s", username) | ||
596 | |||
597 | return { | ||
598 | "result": True, | ||
599 | "username": self.user.username, | ||
600 | "permissions": sorted(list(self.user.permissions)), | ||
601 | } | ||
602 | |||
603 | @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) | ||
604 | async def handle_refresh_token(self, request): | ||
605 | username = str(request["username"]) | ||
606 | |||
607 | token = await new_token() | ||
608 | |||
609 | updated = await self.db.set_user_token( | ||
610 | username, | ||
611 | hash_token(TOKEN_ALGORITHM, new_salt(), token), | ||
612 | ) | ||
613 | if not updated: | ||
614 | self.raise_no_user_error(username) | ||
615 | |||
616 | return {"username": username, "token": token} | ||
617 | |||
618 | def get_perm_arg(self, arg): | ||
619 | if not isinstance(arg, list): | ||
620 | raise bb.asyncrpc.InvokeError("Unexpected type for permissions") | ||
621 | |||
622 | arg = set(arg) | ||
623 | try: | ||
624 | arg.remove(NONE_PERM) | ||
625 | except KeyError: | ||
626 | pass | ||
627 | |||
628 | unknown_perms = arg - ALL_PERMISSIONS | ||
629 | if unknown_perms: | ||
630 | raise bb.asyncrpc.InvokeError( | ||
631 | "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms))) | ||
632 | ) | ||
633 | |||
634 | return sorted(list(arg)) | ||
635 | |||
636 | def return_perms(self, permissions): | ||
637 | if ALL_PERM in permissions: | ||
638 | return sorted(list(ALL_PERMISSIONS)) | ||
639 | return sorted(list(permissions)) | ||
640 | |||
641 | @permissions(USER_ADMIN_PERM, allow_anon=False) | ||
642 | async def handle_set_perms(self, request): | ||
643 | username = str(request["username"]) | ||
644 | permissions = self.get_perm_arg(request["permissions"]) | ||
645 | |||
646 | if not await self.db.set_user_perms(username, permissions): | ||
647 | self.raise_no_user_error(username) | ||
648 | |||
649 | return { | ||
650 | "username": username, | ||
651 | "permissions": self.return_perms(permissions), | ||
652 | } | ||
653 | |||
654 | @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) | ||
655 | async def handle_get_user(self, request): | ||
656 | username = str(request["username"]) | ||
657 | |||
658 | user = await self.db.lookup_user(username) | ||
659 | if user is None: | ||
660 | return None | ||
661 | |||
662 | return { | ||
663 | "username": user.username, | ||
664 | "permissions": self.return_perms(user.permissions), | ||
665 | } | ||
666 | |||
667 | @permissions(USER_ADMIN_PERM, allow_anon=False) | ||
668 | async def handle_get_all_users(self, request): | ||
669 | users = await self.db.get_all_users() | ||
670 | return { | ||
671 | "users": [ | ||
672 | { | ||
673 | "username": u.username, | ||
674 | "permissions": self.return_perms(u.permissions), | ||
675 | } | ||
676 | for u in users | ||
677 | ] | ||
678 | } | ||
679 | |||
680 | @permissions(USER_ADMIN_PERM, allow_anon=False) | ||
681 | async def handle_new_user(self, request): | ||
682 | username = str(request["username"]) | ||
683 | permissions = self.get_perm_arg(request["permissions"]) | ||
684 | |||
685 | token = await new_token() | ||
686 | |||
687 | inserted = await self.db.new_user( | ||
688 | username, | ||
689 | permissions, | ||
690 | hash_token(TOKEN_ALGORITHM, new_salt(), token), | ||
691 | ) | ||
692 | if not inserted: | ||
693 | raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'") | ||
694 | |||
695 | return { | ||
696 | "username": username, | ||
697 | "permissions": self.return_perms(permissions), | ||
698 | "token": token, | ||
699 | } | ||
700 | |||
701 | @permissions(USER_ADMIN_PERM, allow_anon=False) | ||
702 | async def handle_delete_user(self, request): | ||
703 | username = str(request["username"]) | ||
704 | |||
705 | if not await self.db.delete_user(username): | ||
706 | self.raise_no_user_error(username) | ||
707 | |||
708 | return {"username": username} | ||
709 | |||
410 | 710 | ||
411 | class Server(bb.asyncrpc.AsyncServer): | 711 | class Server(bb.asyncrpc.AsyncServer): |
412 | def __init__(self, db_engine, upstream=None, read_only=False): | 712 | def __init__( |
713 | self, | ||
714 | db_engine, | ||
715 | upstream=None, | ||
716 | read_only=False, | ||
717 | anon_perms=DEFAULT_ANON_PERMS, | ||
718 | admin_username=None, | ||
719 | admin_password=None, | ||
720 | ): | ||
413 | if upstream and read_only: | 721 | if upstream and read_only: |
414 | raise bb.asyncrpc.ServerError( | 722 | raise bb.asyncrpc.ServerError( |
415 | "Read-only hashserv cannot pull from an upstream server" | 723 | "Read-only hashserv cannot pull from an upstream server" |
416 | ) | 724 | ) |
417 | 725 | ||
726 | disallowed_perms = set(anon_perms) - set( | ||
727 | [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM] | ||
728 | ) | ||
729 | |||
730 | if disallowed_perms: | ||
731 | raise bb.asyncrpc.ServerError( | ||
732 | f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users" | ||
733 | ) | ||
734 | |||
418 | super().__init__(logger) | 735 | super().__init__(logger) |
419 | 736 | ||
420 | self.request_stats = Stats() | 737 | self.request_stats = Stats() |
@@ -422,6 +739,13 @@ class Server(bb.asyncrpc.AsyncServer): | |||
422 | self.upstream = upstream | 739 | self.upstream = upstream |
423 | self.read_only = read_only | 740 | self.read_only = read_only |
424 | self.backfill_queue = None | 741 | self.backfill_queue = None |
742 | self.anon_perms = set(anon_perms) | ||
743 | self.admin_username = admin_username | ||
744 | self.admin_password = admin_password | ||
745 | |||
746 | self.logger.info( | ||
747 | "Anonymous user permissions are: %s", ", ".join(self.anon_perms) | ||
748 | ) | ||
425 | 749 | ||
426 | def accept_client(self, socket): | 750 | def accept_client(self, socket): |
427 | return ServerClient( | 751 | return ServerClient( |
@@ -431,12 +755,34 @@ class Server(bb.asyncrpc.AsyncServer): | |||
431 | self.backfill_queue, | 755 | self.backfill_queue, |
432 | self.upstream, | 756 | self.upstream, |
433 | self.read_only, | 757 | self.read_only, |
758 | self.anon_perms, | ||
434 | ) | 759 | ) |
435 | 760 | ||
761 | async def create_admin_user(self): | ||
762 | admin_permissions = (ALL_PERM,) | ||
763 | async with self.db_engine.connect(self.logger) as db: | ||
764 | added = await db.new_user( | ||
765 | self.admin_username, | ||
766 | admin_permissions, | ||
767 | hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password), | ||
768 | ) | ||
769 | if added: | ||
770 | self.logger.info("Created admin user '%s'", self.admin_username) | ||
771 | else: | ||
772 | await db.set_user_perms( | ||
773 | self.admin_username, | ||
774 | admin_permissions, | ||
775 | ) | ||
776 | await db.set_user_token( | ||
777 | self.admin_username, | ||
778 | hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password), | ||
779 | ) | ||
780 | self.logger.info("Admin user '%s' updated", self.admin_username) | ||
781 | |||
436 | async def backfill_worker_task(self): | 782 | async def backfill_worker_task(self): |
437 | async with await create_async_client( | 783 | async with await create_async_client( |
438 | self.upstream | 784 | self.upstream |
439 | ) as client, self.db_engine.connect(logger) as db: | 785 | ) as client, self.db_engine.connect(self.logger) as db: |
440 | while True: | 786 | while True: |
441 | item = await self.backfill_queue.get() | 787 | item = await self.backfill_queue.get() |
442 | if item is None: | 788 | if item is None: |
@@ -457,6 +803,9 @@ class Server(bb.asyncrpc.AsyncServer): | |||
457 | 803 | ||
458 | self.loop.run_until_complete(self.db_engine.create()) | 804 | self.loop.run_until_complete(self.db_engine.create()) |
459 | 805 | ||
806 | if self.admin_username: | ||
807 | self.loop.run_until_complete(self.create_admin_user()) | ||
808 | |||
460 | return tasks | 809 | return tasks |
461 | 810 | ||
462 | async def stop(self): | 811 | async def stop(self): |