summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r--bitbake/lib/hashserv/__init__.py175
1 files changed, 75 insertions, 100 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 5f2e101e52..ac891e0174 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -5,129 +5,104 @@
5 5
6import asyncio 6import asyncio
7from contextlib import closing 7from contextlib import closing
8import re
9import sqlite3
10import itertools 8import itertools
11import json 9import json
10from collections import namedtuple
11from urllib.parse import urlparse
12from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
12 13
13UNIX_PREFIX = "unix://" 14User = namedtuple("User", ("username", "permissions"))
14
15ADDR_TYPE_UNIX = 0
16ADDR_TYPE_TCP = 1
17
18# The Python async server defaults to a 64K receive buffer, so we hardcode our
19# maximum chunk size. It would be better if the client and server reported to
20# each other what the maximum chunk sizes were, but that will slow down the
21# connection setup with a round trip delay so I'd rather not do that unless it
22# is necessary
23DEFAULT_MAX_CHUNK = 32 * 1024
24
25TABLE_DEFINITION = (
26 ("method", "TEXT NOT NULL"),
27 ("outhash", "TEXT NOT NULL"),
28 ("taskhash", "TEXT NOT NULL"),
29 ("unihash", "TEXT NOT NULL"),
30 ("created", "DATETIME"),
31
32 # Optional fields
33 ("owner", "TEXT"),
34 ("PN", "TEXT"),
35 ("PV", "TEXT"),
36 ("PR", "TEXT"),
37 ("task", "TEXT"),
38 ("outhash_siginfo", "TEXT"),
39)
40
41TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION)
42
43def setup_database(database, sync=True):
44 db = sqlite3.connect(database)
45 db.row_factory = sqlite3.Row
46
47 with closing(db.cursor()) as cursor:
48 cursor.execute('''
49 CREATE TABLE IF NOT EXISTS tasks_v2 (
50 id INTEGER PRIMARY KEY AUTOINCREMENT,
51 %s
52 UNIQUE(method, outhash, taskhash)
53 )
54 ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION))
55 cursor.execute('PRAGMA journal_mode = WAL')
56 cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
57
58 # Drop old indexes
59 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
60 cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
61
62 # Create new indexes
63 cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
64 cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
65
66 return db
67
68
69def parse_address(addr):
70 if addr.startswith(UNIX_PREFIX):
71 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
72 else:
73 m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
74 if m is not None:
75 host = m.group('host')
76 port = m.group('port')
77 else:
78 host, port = addr.split(':')
79 15
80 return (ADDR_TYPE_TCP, (host, int(port)))
81 16
17def create_server(
18 addr,
19 dbname,
20 *,
21 sync=True,
22 upstream=None,
23 read_only=False,
24 db_username=None,
25 db_password=None,
26 anon_perms=None,
27 admin_username=None,
28 admin_password=None,
29 reuseport=False,
30):
31 def sqlite_engine():
32 from .sqlite import DatabaseEngine
82 33
83def chunkify(msg, max_chunk): 34 return DatabaseEngine(dbname, sync)
84 if len(msg) < max_chunk - 1:
85 yield ''.join((msg, "\n"))
86 else:
87 yield ''.join((json.dumps({
88 'chunk-stream': None
89 }), "\n"))
90 35
91 args = [iter(msg)] * (max_chunk - 1) 36 def sqlalchemy_engine():
92 for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): 37 from .sqlalchemy import DatabaseEngine
93 yield ''.join(itertools.chain(m, "\n"))
94 yield "\n"
95 38
39 return DatabaseEngine(dbname, db_username, db_password)
96 40
97def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
98 from . import server 41 from . import server
99 db = setup_database(dbname, sync=sync) 42
100 s = server.Server(db, upstream=upstream, read_only=read_only) 43 if "://" in dbname:
44 db_engine = sqlalchemy_engine()
45 else:
46 db_engine = sqlite_engine()
47
48 if anon_perms is None:
49 anon_perms = server.DEFAULT_ANON_PERMS
50
51 s = server.Server(
52 db_engine,
53 upstream=upstream,
54 read_only=read_only,
55 anon_perms=anon_perms,
56 admin_username=admin_username,
57 admin_password=admin_password,
58 )
101 59
102 (typ, a) = parse_address(addr) 60 (typ, a) = parse_address(addr)
103 if typ == ADDR_TYPE_UNIX: 61 if typ == ADDR_TYPE_UNIX:
104 s.start_unix_server(*a) 62 s.start_unix_server(*a)
63 elif typ == ADDR_TYPE_WS:
64 url = urlparse(a[0])
65 s.start_websocket_server(url.hostname, url.port, reuseport=reuseport)
105 else: 66 else:
106 s.start_tcp_server(*a) 67 s.start_tcp_server(*a, reuseport=reuseport)
107 68
108 return s 69 return s
109 70
110 71
111def create_client(addr): 72def create_client(addr, username=None, password=None):
112 from . import client 73 from . import client
113 c = client.Client()
114 74
115 (typ, a) = parse_address(addr) 75 c = client.Client(username, password)
116 if typ == ADDR_TYPE_UNIX: 76
117 c.connect_unix(*a) 77 try:
118 else: 78 (typ, a) = parse_address(addr)
119 c.connect_tcp(*a) 79 if typ == ADDR_TYPE_UNIX:
80 c.connect_unix(*a)
81 elif typ == ADDR_TYPE_WS:
82 c.connect_websocket(*a)
83 else:
84 c.connect_tcp(*a)
85 return c
86 except Exception as e:
87 c.close()
88 raise e
120 89
121 return c
122 90
123async def create_async_client(addr): 91async def create_async_client(addr, username=None, password=None):
124 from . import client 92 from . import client
125 c = client.AsyncClient()
126 93
127 (typ, a) = parse_address(addr) 94 c = client.AsyncClient(username, password)
128 if typ == ADDR_TYPE_UNIX: 95
129 await c.connect_unix(*a) 96 try:
130 else: 97 (typ, a) = parse_address(addr)
131 await c.connect_tcp(*a) 98 if typ == ADDR_TYPE_UNIX:
99 await c.connect_unix(*a)
100 elif typ == ADDR_TYPE_WS:
101 await c.connect_websocket(*a)
102 else:
103 await c.connect_tcp(*a)
132 104
133 return c 105 return c
106 except Exception as e:
107 await c.close()
108 raise e