summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/__init__.py')
-rw-r--r--bitbake/lib/hashserv/__init__.py177
1 files changed, 75 insertions, 102 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 5f2e101e52..74367eb6b4 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -5,129 +5,102 @@
5 5
6import asyncio 6import asyncio
7from contextlib import closing 7from contextlib import closing
8import re
9import sqlite3
10import itertools 8import itertools
11import json 9import json
10from collections import namedtuple
11from urllib.parse import urlparse
12from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
13
14User = namedtuple("User", ("username", "permissions"))
15
16def create_server(
17 addr,
18 dbname,
19 *,
20 sync=True,
21 upstream=None,
22 read_only=False,
23 db_username=None,
24 db_password=None,
25 anon_perms=None,
26 admin_username=None,
27 admin_password=None,
28):
29 def sqlite_engine():
30 from .sqlite import DatabaseEngine
31
32 return DatabaseEngine(dbname, sync)
33
34 def sqlalchemy_engine():
35 from .sqlalchemy import DatabaseEngine
36
37 return DatabaseEngine(dbname, db_username, db_password)
12 38
13UNIX_PREFIX = "unix://" 39 from . import server
14
15ADDR_TYPE_UNIX = 0
16ADDR_TYPE_TCP = 1
17
18# The Python async server defaults to a 64K receive buffer, so we hardcode our
19# maximum chunk size. It would be better if the client and server reported to
20# each other what the maximum chunk sizes were, but that will slow down the
21# connection setup with a round trip delay so I'd rather not do that unless it
22# is necessary
23DEFAULT_MAX_CHUNK = 32 * 1024
24
25TABLE_DEFINITION = (
26 ("method", "TEXT NOT NULL"),
27 ("outhash", "TEXT NOT NULL"),
28 ("taskhash", "TEXT NOT NULL"),
29 ("unihash", "TEXT NOT NULL"),
30 ("created", "DATETIME"),
31
32 # Optional fields
33 ("owner", "TEXT"),
34 ("PN", "TEXT"),
35 ("PV", "TEXT"),
36 ("PR", "TEXT"),
37 ("task", "TEXT"),
38 ("outhash_siginfo", "TEXT"),
39)
40
41TABLE_COLUMNS = tuple(name for name, _ in TABLE_DEFINITION)
42
43def setup_database(database, sync=True):
44 db = sqlite3.connect(database)
45 db.row_factory = sqlite3.Row
46
47 with closing(db.cursor()) as cursor:
48 cursor.execute('''
49 CREATE TABLE IF NOT EXISTS tasks_v2 (
50 id INTEGER PRIMARY KEY AUTOINCREMENT,
51 %s
52 UNIQUE(method, outhash, taskhash)
53 )
54 ''' % " ".join("%s %s," % (name, typ) for name, typ in TABLE_DEFINITION))
55 cursor.execute('PRAGMA journal_mode = WAL')
56 cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
57
58 # Drop old indexes
59 cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
60 cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
61
62 # Create new indexes
63 cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
64 cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
65
66 return db
67
68
69def parse_address(addr):
70 if addr.startswith(UNIX_PREFIX):
71 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
72 else:
73 m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
74 if m is not None:
75 host = m.group('host')
76 port = m.group('port')
77 else:
78 host, port = addr.split(':')
79
80 return (ADDR_TYPE_TCP, (host, int(port)))
81
82 40
83def chunkify(msg, max_chunk): 41 if "://" in dbname:
84 if len(msg) < max_chunk - 1: 42 db_engine = sqlalchemy_engine()
85 yield ''.join((msg, "\n"))
86 else: 43 else:
87 yield ''.join((json.dumps({ 44 db_engine = sqlite_engine()
88 'chunk-stream': None
89 }), "\n"))
90 45
91 args = [iter(msg)] * (max_chunk - 1) 46 if anon_perms is None:
92 for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): 47 anon_perms = server.DEFAULT_ANON_PERMS
93 yield ''.join(itertools.chain(m, "\n"))
94 yield "\n"
95 48
96 49 s = server.Server(
97def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): 50 db_engine,
98 from . import server 51 upstream=upstream,
99 db = setup_database(dbname, sync=sync) 52 read_only=read_only,
100 s = server.Server(db, upstream=upstream, read_only=read_only) 53 anon_perms=anon_perms,
54 admin_username=admin_username,
55 admin_password=admin_password,
56 )
101 57
102 (typ, a) = parse_address(addr) 58 (typ, a) = parse_address(addr)
103 if typ == ADDR_TYPE_UNIX: 59 if typ == ADDR_TYPE_UNIX:
104 s.start_unix_server(*a) 60 s.start_unix_server(*a)
61 elif typ == ADDR_TYPE_WS:
62 url = urlparse(a[0])
63 s.start_websocket_server(url.hostname, url.port)
105 else: 64 else:
106 s.start_tcp_server(*a) 65 s.start_tcp_server(*a)
107 66
108 return s 67 return s
109 68
110 69
111def create_client(addr): 70def create_client(addr, username=None, password=None):
112 from . import client 71 from . import client
113 c = client.Client()
114 72
115 (typ, a) = parse_address(addr) 73 c = client.Client(username, password)
116 if typ == ADDR_TYPE_UNIX: 74
117 c.connect_unix(*a) 75 try:
118 else: 76 (typ, a) = parse_address(addr)
119 c.connect_tcp(*a) 77 if typ == ADDR_TYPE_UNIX:
78 c.connect_unix(*a)
79 elif typ == ADDR_TYPE_WS:
80 c.connect_websocket(*a)
81 else:
82 c.connect_tcp(*a)
83 return c
84 except Exception as e:
85 c.close()
86 raise e
120 87
121 return c
122 88
123async def create_async_client(addr): 89async def create_async_client(addr, username=None, password=None):
124 from . import client 90 from . import client
125 c = client.AsyncClient()
126 91
127 (typ, a) = parse_address(addr) 92 c = client.AsyncClient(username, password)
128 if typ == ADDR_TYPE_UNIX: 93
129 await c.connect_unix(*a) 94 try:
130 else: 95 (typ, a) = parse_address(addr)
131 await c.connect_tcp(*a) 96 if typ == ADDR_TYPE_UNIX:
97 await c.connect_unix(*a)
98 elif typ == ADDR_TYPE_WS:
99 await c.connect_websocket(*a)
100 else:
101 await c.connect_tcp(*a)
132 102
133 return c 103 return c
104 except Exception as e:
105 await c.close()
106 raise e