summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:26 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:33:02 +0000
commitcfbb1d2cc01565610ba06a755e10425ff2076d9b (patch)
tree0e703d7210246573d50e1b07fd505c0001a9ae17
parentbaa3e5391daf41b6dd6e914a112abb00d3517da1 (diff)
downloadpoky-cfbb1d2cc01565610ba06a755e10425ff2076d9b.tar.gz
bitbake: hashserv: Add SQLalchemy backend
Adds an SQLAlchemy backend to the server. While this database backend is slower than the more direct sqlite backend, it easily supports just about any SQL server, which is useful for large scale deployments. (Bitbake rev: e0b73466dd7478c77c82f46879246c1b68b228c0) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rwxr-xr-xbitbake/bin/bitbake-hashserv12
-rw-r--r--bitbake/lib/bb/asyncrpc/connection.py11
-rw-r--r--bitbake/lib/hashserv/__init__.py21
-rw-r--r--bitbake/lib/hashserv/sqlalchemy.py304
-rw-r--r--bitbake/lib/hashserv/tests.py19
5 files changed, 362 insertions, 5 deletions
diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv
index a916a90cb0..59b8b07f59 100755
--- a/bitbake/bin/bitbake-hashserv
+++ b/bitbake/bin/bitbake-hashserv
@@ -69,6 +69,16 @@ To bind to all addresses, leave the ADDRESS empty, e.g. "--bind :8686" or
69 action="store_true", 69 action="store_true",
70 help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)", 70 help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)",
71 ) 71 )
72 parser.add_argument(
73 "--db-username",
74 default=os.environ.get("HASHSERVER_DB_USERNAME", None),
75 help="Database username ($HASHSERVER_DB_USERNAME)",
76 )
77 parser.add_argument(
78 "--db-password",
79 default=os.environ.get("HASHSERVER_DB_PASSWORD", None),
80 help="Database password ($HASHSERVER_DB_PASSWORD)",
81 )
72 82
73 args = parser.parse_args() 83 args = parser.parse_args()
74 84
@@ -90,6 +100,8 @@ To bind to all addresses, leave the ADDRESS empty, e.g. "--bind :8686" or
90 args.database, 100 args.database,
91 upstream=args.upstream, 101 upstream=args.upstream,
92 read_only=read_only, 102 read_only=read_only,
103 db_username=args.db_username,
104 db_password=args.db_password,
93 ) 105 )
94 server.serve_forever() 106 server.serve_forever()
95 return 0 107 return 0
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py
index a10628f75a..7f0cf6ba96 100644
--- a/bitbake/lib/bb/asyncrpc/connection.py
+++ b/bitbake/lib/bb/asyncrpc/connection.py
@@ -7,6 +7,7 @@
7import asyncio 7import asyncio
8import itertools 8import itertools
9import json 9import json
10from datetime import datetime
10from .exceptions import ClientError, ConnectionClosedError 11from .exceptions import ClientError, ConnectionClosedError
11 12
12 13
@@ -30,6 +31,12 @@ def chunkify(msg, max_chunk):
30 yield "\n" 31 yield "\n"
31 32
32 33
34def json_serialize(obj):
35 if isinstance(obj, datetime):
36 return obj.isoformat()
37 raise TypeError("Type %s not serializeable" % type(obj))
38
39
33class StreamConnection(object): 40class StreamConnection(object):
34 def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): 41 def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK):
35 self.reader = reader 42 self.reader = reader
@@ -42,7 +49,7 @@ class StreamConnection(object):
42 return self.writer.get_extra_info("peername") 49 return self.writer.get_extra_info("peername")
43 50
44 async def send_message(self, msg): 51 async def send_message(self, msg):
45 for c in chunkify(json.dumps(msg), self.max_chunk): 52 for c in chunkify(json.dumps(msg, default=json_serialize), self.max_chunk):
46 self.writer.write(c.encode("utf-8")) 53 self.writer.write(c.encode("utf-8"))
47 await self.writer.drain() 54 await self.writer.drain()
48 55
@@ -105,7 +112,7 @@ class WebsocketConnection(object):
105 return ":".join(str(s) for s in self.socket.remote_address) 112 return ":".join(str(s) for s in self.socket.remote_address)
106 113
107 async def send_message(self, msg): 114 async def send_message(self, msg):
108 await self.send(json.dumps(msg)) 115 await self.send(json.dumps(msg, default=json_serialize))
109 116
110 async def recv_message(self): 117 async def recv_message(self):
111 m = await self.recv() 118 m = await self.recv()
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 90d8cff15f..9a8ee4e88b 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -35,15 +35,32 @@ def parse_address(addr):
35 return (ADDR_TYPE_TCP, (host, int(port))) 35 return (ADDR_TYPE_TCP, (host, int(port)))
36 36
37 37
38def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): 38def create_server(
39 addr,
40 dbname,
41 *,
42 sync=True,
43 upstream=None,
44 read_only=False,
45 db_username=None,
46 db_password=None
47):
39 def sqlite_engine(): 48 def sqlite_engine():
40 from .sqlite import DatabaseEngine 49 from .sqlite import DatabaseEngine
41 50
42 return DatabaseEngine(dbname, sync) 51 return DatabaseEngine(dbname, sync)
43 52
53 def sqlalchemy_engine():
54 from .sqlalchemy import DatabaseEngine
55
56 return DatabaseEngine(dbname, db_username, db_password)
57
44 from . import server 58 from . import server
45 59
46 db_engine = sqlite_engine() 60 if "://" in dbname:
61 db_engine = sqlalchemy_engine()
62 else:
63 db_engine = sqlite_engine()
47 64
48 s = server.Server(db_engine, upstream=upstream, read_only=read_only) 65 s = server.Server(db_engine, upstream=upstream, read_only=read_only)
49 66
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
new file mode 100644
index 0000000000..3216621f9d
--- /dev/null
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -0,0 +1,304 @@
1#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import logging
9from datetime import datetime
10
11from sqlalchemy.ext.asyncio import create_async_engine
12from sqlalchemy.pool import NullPool
13from sqlalchemy import (
14 MetaData,
15 Column,
16 Table,
17 Text,
18 Integer,
19 UniqueConstraint,
20 DateTime,
21 Index,
22 select,
23 insert,
24 exists,
25 literal,
26 and_,
27 delete,
28)
29import sqlalchemy.engine
30from sqlalchemy.orm import declarative_base
31from sqlalchemy.exc import IntegrityError
32
33logger = logging.getLogger("hashserv.sqlalchemy")
34
35Base = declarative_base()
36
37
38class UnihashesV2(Base):
39 __tablename__ = "unihashes_v2"
40 id = Column(Integer, primary_key=True, autoincrement=True)
41 method = Column(Text, nullable=False)
42 taskhash = Column(Text, nullable=False)
43 unihash = Column(Text, nullable=False)
44
45 __table_args__ = (
46 UniqueConstraint("method", "taskhash"),
47 Index("taskhash_lookup_v3", "method", "taskhash"),
48 )
49
50
51class OuthashesV2(Base):
52 __tablename__ = "outhashes_v2"
53 id = Column(Integer, primary_key=True, autoincrement=True)
54 method = Column(Text, nullable=False)
55 taskhash = Column(Text, nullable=False)
56 outhash = Column(Text, nullable=False)
57 created = Column(DateTime)
58 owner = Column(Text)
59 PN = Column(Text)
60 PV = Column(Text)
61 PR = Column(Text)
62 task = Column(Text)
63 outhash_siginfo = Column(Text)
64
65 __table_args__ = (
66 UniqueConstraint("method", "taskhash", "outhash"),
67 Index("outhash_lookup_v3", "method", "outhash"),
68 )
69
70
71class DatabaseEngine(object):
72 def __init__(self, url, username=None, password=None):
73 self.logger = logger
74 self.url = sqlalchemy.engine.make_url(url)
75
76 if username is not None:
77 self.url = self.url.set(username=username)
78
79 if password is not None:
80 self.url = self.url.set(password=password)
81
82 async def create(self):
83 self.logger.info("Using database %s", self.url)
84 self.engine = create_async_engine(self.url, poolclass=NullPool)
85
86 async with self.engine.begin() as conn:
87 # Create tables
88 logger.info("Creating tables...")
89 await conn.run_sync(Base.metadata.create_all)
90
91 def connect(self, logger):
92 return Database(self.engine, logger)
93
94
95def map_row(row):
96 if row is None:
97 return None
98 return dict(**row._mapping)
99
100
101class Database(object):
102 def __init__(self, engine, logger):
103 self.engine = engine
104 self.db = None
105 self.logger = logger
106
107 async def __aenter__(self):
108 self.db = await self.engine.connect()
109 return self
110
111 async def __aexit__(self, exc_type, exc_value, traceback):
112 await self.close()
113
114 async def close(self):
115 await self.db.close()
116 self.db = None
117
118 async def get_unihash_by_taskhash_full(self, method, taskhash):
119 statement = (
120 select(
121 OuthashesV2,
122 UnihashesV2.unihash.label("unihash"),
123 )
124 .join(
125 UnihashesV2,
126 and_(
127 UnihashesV2.method == OuthashesV2.method,
128 UnihashesV2.taskhash == OuthashesV2.taskhash,
129 ),
130 )
131 .where(
132 OuthashesV2.method == method,
133 OuthashesV2.taskhash == taskhash,
134 )
135 .order_by(
136 OuthashesV2.created.asc(),
137 )
138 .limit(1)
139 )
140 self.logger.debug("%s", statement)
141 async with self.db.begin():
142 result = await self.db.execute(statement)
143 return map_row(result.first())
144
145 async def get_unihash_by_outhash(self, method, outhash):
146 statement = (
147 select(OuthashesV2, UnihashesV2.unihash.label("unihash"))
148 .join(
149 UnihashesV2,
150 and_(
151 UnihashesV2.method == OuthashesV2.method,
152 UnihashesV2.taskhash == OuthashesV2.taskhash,
153 ),
154 )
155 .where(
156 OuthashesV2.method == method,
157 OuthashesV2.outhash == outhash,
158 )
159 .order_by(
160 OuthashesV2.created.asc(),
161 )
162 .limit(1)
163 )
164 self.logger.debug("%s", statement)
165 async with self.db.begin():
166 result = await self.db.execute(statement)
167 return map_row(result.first())
168
169 async def get_outhash(self, method, outhash):
170 statement = (
171 select(OuthashesV2)
172 .where(
173 OuthashesV2.method == method,
174 OuthashesV2.outhash == outhash,
175 )
176 .order_by(
177 OuthashesV2.created.asc(),
178 )
179 .limit(1)
180 )
181
182 self.logger.debug("%s", statement)
183 async with self.db.begin():
184 result = await self.db.execute(statement)
185 return map_row(result.first())
186
187 async def get_equivalent_for_outhash(self, method, outhash, taskhash):
188 statement = (
189 select(
190 OuthashesV2.taskhash.label("taskhash"),
191 UnihashesV2.unihash.label("unihash"),
192 )
193 .join(
194 UnihashesV2,
195 and_(
196 UnihashesV2.method == OuthashesV2.method,
197 UnihashesV2.taskhash == OuthashesV2.taskhash,
198 ),
199 )
200 .where(
201 OuthashesV2.method == method,
202 OuthashesV2.outhash == outhash,
203 OuthashesV2.taskhash != taskhash,
204 )
205 .order_by(
206 OuthashesV2.created.asc(),
207 )
208 .limit(1)
209 )
210 self.logger.debug("%s", statement)
211 async with self.db.begin():
212 result = await self.db.execute(statement)
213 return map_row(result.first())
214
215 async def get_equivalent(self, method, taskhash):
216 statement = select(
217 UnihashesV2.unihash,
218 UnihashesV2.method,
219 UnihashesV2.taskhash,
220 ).where(
221 UnihashesV2.method == method,
222 UnihashesV2.taskhash == taskhash,
223 )
224 self.logger.debug("%s", statement)
225 async with self.db.begin():
226 result = await self.db.execute(statement)
227 return map_row(result.first())
228
229 async def remove(self, condition):
230 async def do_remove(table):
231 where = {}
232 for c in table.__table__.columns:
233 if c.key in condition and condition[c.key] is not None:
234 where[c] = condition[c.key]
235
236 if where:
237 statement = delete(table).where(*[(k == v) for k, v in where.items()])
238 self.logger.debug("%s", statement)
239 async with self.db.begin():
240 result = await self.db.execute(statement)
241 return result.rowcount
242
243 return 0
244
245 count = 0
246 count += await do_remove(UnihashesV2)
247 count += await do_remove(OuthashesV2)
248
249 return count
250
251 async def clean_unused(self, oldest):
252 statement = delete(OuthashesV2).where(
253 OuthashesV2.created < oldest,
254 ~(
255 select(UnihashesV2.id)
256 .where(
257 UnihashesV2.method == OuthashesV2.method,
258 UnihashesV2.taskhash == OuthashesV2.taskhash,
259 )
260 .limit(1)
261 .exists()
262 ),
263 )
264 self.logger.debug("%s", statement)
265 async with self.db.begin():
266 result = await self.db.execute(statement)
267 return result.rowcount
268
269 async def insert_unihash(self, method, taskhash, unihash):
270 statement = insert(UnihashesV2).values(
271 method=method,
272 taskhash=taskhash,
273 unihash=unihash,
274 )
275 self.logger.debug("%s", statement)
276 try:
277 async with self.db.begin():
278 await self.db.execute(statement)
279 return True
280 except IntegrityError:
281 logger.debug(
282 "%s, %s, %s already in unihash database", method, taskhash, unihash
283 )
284 return False
285
286 async def insert_outhash(self, data):
287 outhash_columns = set(c.key for c in OuthashesV2.__table__.columns)
288
289 data = {k: v for k, v in data.items() if k in outhash_columns}
290
291 if "created" in data and not isinstance(data["created"], datetime):
292 data["created"] = datetime.fromisoformat(data["created"])
293
294 statement = insert(OuthashesV2).values(**data)
295 self.logger.debug("%s", statement)
296 try:
297 async with self.db.begin():
298 await self.db.execute(statement)
299 return True
300 except IntegrityError:
301 logger.debug(
302 "%s, %s already in outhash database", data["method"], data["outhash"]
303 )
304 return False
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 4c98a280a5..268b27006f 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -33,7 +33,7 @@ class HashEquivalenceTestSetup(object):
33 def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): 33 def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc):
34 self.server_index += 1 34 self.server_index += 1
35 if dbpath is None: 35 if dbpath is None:
36 dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) 36 dbpath = self.make_dbpath()
37 37
38 def cleanup_server(server): 38 def cleanup_server(server):
39 if server.process.exitcode is not None: 39 if server.process.exitcode is not None:
@@ -53,6 +53,9 @@ class HashEquivalenceTestSetup(object):
53 53
54 return server 54 return server
55 55
56 def make_dbpath(self):
57 return os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
58
56 def start_client(self, server_address): 59 def start_client(self, server_address):
57 def cleanup_client(client): 60 def cleanup_client(client):
58 client.close() 61 client.close()
@@ -517,6 +520,20 @@ class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalen
517 return "ws://%s:0" % host 520 return "ws://%s:0" % host
518 521
519 522
523class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocketServer):
524 def setUp(self):
525 try:
526 import sqlalchemy
527 import aiosqlite
528 except ImportError as e:
529 self.skipTest(str(e))
530
531 super().setUp()
532
533 def make_dbpath(self):
534 return "sqlite+aiosqlite:///%s" % os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
535
536
520class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): 537class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
521 def start_test_server(self): 538 def start_test_server(self):
522 if 'BB_TEST_HASHSERV' not in os.environ: 539 if 'BB_TEST_HASHSERV' not in os.environ: