diff options
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 177 |
1 files changed, 75 insertions, 102 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 5f2e101e52..74367eb6b4 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -5,129 +5,102 @@ | |||
5 | 5 | ||
6 | import asyncio | 6 | import asyncio |
7 | from contextlib import closing | 7 | from contextlib import closing |
8 | import re | ||
9 | import sqlite3 | ||
10 | import itertools | 8 | import itertools |
11 | import json | 9 | import json |
10 | from collections import namedtuple | ||
11 | from urllib.parse import urlparse | ||
12 | from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS | ||
13 | |||
14 | User = namedtuple("User", ("username", "permissions")) | ||
15 | |||
16 | def create_server( | ||
17 | addr, | ||
18 | dbname, | ||
19 | *, | ||
20 | sync=True, | ||
21 | upstream=None, | ||
22 | read_only=False, | ||
23 | db_username=None, | ||
24 | db_password=None, | ||
25 | anon_perms=None, | ||
26 | admin_username=None, | ||
27 | admin_password=None, | ||
28 | ): | ||
29 | def sqlite_engine(): | ||
30 | from .sqlite import DatabaseEngine | ||
31 | |||
32 | return DatabaseEngine(dbname, sync) | ||
33 | |||
34 | def sqlalchemy_engine(): | ||
35 | from .sqlalchemy import DatabaseEngine | ||
36 | |||
37 | return DatabaseEngine(dbname, db_username, db_password) | ||
12 | 38 | ||
13 | UNIX_PREFIX = "unix://" | 39 | from . import server |
14 | |||
15 | ADDR_TYPE_UNIX = 0 | ||
16 | ADDR_TYPE_TCP = 1 | ||
17 | |||
18 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
19 | # maximum chunk size. It would be better if the client and server reported to | ||
20 | # each other what the maximum chunk sizes were, but that will slow down the | ||
21 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
22 | # is necessary | ||
23 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
24 | |||
25 | TABLE_DEFINITION = ( | ||
26 | ("method", "TEXT NOT NULL"), | ||
27 | ("outhash", "TEXT NOT NULL"), | ||
28 | ("taskhash", "TEXT NOT NULL"), | ||
29 | ("unihash", "TEXT NOT NULL"), | ||
30 | ("created", "DATETIME"), | ||
31 | |||
32 | # Optional fields | ||
33 | ("owner", "TEXT"), | ||
34 | ("PN", "TEXT"), | ||
35 | ("PV", "TEXT"), | ||
36 | ("PR", "TEXT"), | ||
37 | ("task", "TEXT"), | ||
38 | ("outhash_siginfo", "TEXT"), | ||
39 | ) | ||
40 | |||
41 | TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION) | ||
42 | |||
43 | def setup_database(database, sync=True): | ||
44 | db = sqlite3.connect(database) | ||
45 | db.row_factory = sqlite3.Row | ||
46 | |||
47 | with closing(db.cursor()) as cursor: | ||
48 | cursor.execute(''' | ||
49 | CREATE TABLE IF NOT EXISTS tasks_v2 ( | ||
50 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
51 | %s | ||
52 | UNIQUE(method, outhash, taskhash) | ||
53 | ) | ||
54 | ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION)) | ||
55 | cursor.execute('PRAGMA journal_mode = WAL') | ||
56 | cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) | ||
57 | |||
58 | # Drop old indexes | ||
59 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') | ||
60 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup') | ||
61 | |||
62 | # Create new indexes | ||
63 | cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') | ||
64 | cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') | ||
65 | |||
66 | return db | ||
67 | |||
68 | |||
69 | def parse_address(addr): | ||
70 | if addr.startswith(UNIX_PREFIX): | ||
71 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) | ||
72 | else: | ||
73 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) | ||
74 | if m is not None: | ||
75 | host = m.group('host') | ||
76 | port = m.group('port') | ||
77 | else: | ||
78 | host, port = addr.split(':') | ||
79 | |||
80 | return (ADDR_TYPE_TCP, (host, int(port))) | ||
81 | |||
82 | 40 | ||
83 | def chunkify(msg, max_chunk): | 41 | if "://" in dbname: |
84 | if len(msg) < max_chunk - 1: | 42 | db_engine = sqlalchemy_engine() |
85 | yield ''.join((msg, "\n")) | ||
86 | else: | 43 | else: |
87 | yield ''.join((json.dumps({ | 44 | db_engine = sqlite_engine() |
88 | 'chunk-stream': None | ||
89 | }), "\n")) | ||
90 | 45 | ||
91 | args = [iter(msg)] * (max_chunk - 1) | 46 | if anon_perms is None: |
92 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | 47 | anon_perms = server.DEFAULT_ANON_PERMS |
93 | yield ''.join(itertools.chain(m, "\n")) | ||
94 | yield "\n" | ||
95 | 48 | ||
96 | 49 | s = server.Server( | |
97 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 50 | db_engine, |
98 | from . import server | 51 | upstream=upstream, |
99 | db = setup_database(dbname, sync=sync) | 52 | read_only=read_only, |
100 | s = server.Server(db, upstream=upstream, read_only=read_only) | 53 | anon_perms=anon_perms, |
54 | admin_username=admin_username, | ||
55 | admin_password=admin_password, | ||
56 | ) | ||
101 | 57 | ||
102 | (typ, a) = parse_address(addr) | 58 | (typ, a) = parse_address(addr) |
103 | if typ == ADDR_TYPE_UNIX: | 59 | if typ == ADDR_TYPE_UNIX: |
104 | s.start_unix_server(*a) | 60 | s.start_unix_server(*a) |
61 | elif typ == ADDR_TYPE_WS: | ||
62 | url = urlparse(a[0]) | ||
63 | s.start_websocket_server(url.hostname, url.port) | ||
105 | else: | 64 | else: |
106 | s.start_tcp_server(*a) | 65 | s.start_tcp_server(*a) |
107 | 66 | ||
108 | return s | 67 | return s |
109 | 68 | ||
110 | 69 | ||
111 | def create_client(addr): | 70 | def create_client(addr, username=None, password=None): |
112 | from . import client | 71 | from . import client |
113 | c = client.Client() | ||
114 | 72 | ||
115 | (typ, a) = parse_address(addr) | 73 | c = client.Client(username, password) |
116 | if typ == ADDR_TYPE_UNIX: | 74 | |
117 | c.connect_unix(*a) | 75 | try: |
118 | else: | 76 | (typ, a) = parse_address(addr) |
119 | c.connect_tcp(*a) | 77 | if typ == ADDR_TYPE_UNIX: |
78 | c.connect_unix(*a) | ||
79 | elif typ == ADDR_TYPE_WS: | ||
80 | c.connect_websocket(*a) | ||
81 | else: | ||
82 | c.connect_tcp(*a) | ||
83 | return c | ||
84 | except Exception as e: | ||
85 | c.close() | ||
86 | raise e | ||
120 | 87 | ||
121 | return c | ||
122 | 88 | ||
123 | async def create_async_client(addr): | 89 | async def create_async_client(addr, username=None, password=None): |
124 | from . import client | 90 | from . import client |
125 | c = client.AsyncClient() | ||
126 | 91 | ||
127 | (typ, a) = parse_address(addr) | 92 | c = client.AsyncClient(username, password) |
128 | if typ == ADDR_TYPE_UNIX: | 93 | |
129 | await c.connect_unix(*a) | 94 | try: |
130 | else: | 95 | (typ, a) = parse_address(addr) |
131 | await c.connect_tcp(*a) | 96 | if typ == ADDR_TYPE_UNIX: |
97 | await c.connect_unix(*a) | ||
98 | elif typ == ADDR_TYPE_WS: | ||
99 | await c.connect_websocket(*a) | ||
100 | else: | ||
101 | await c.connect_tcp(*a) | ||
132 | 102 | ||
133 | return c | 103 | return c |
104 | except Exception as e: | ||
105 | await c.close() | ||
106 | raise e | ||