summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbitbake/bin/bitbake-hashserv12
-rw-r--r--bitbake/lib/bb/asyncrpc/connection.py11
-rw-r--r--bitbake/lib/hashserv/__init__.py21
-rw-r--r--bitbake/lib/hashserv/sqlalchemy.py304
-rw-r--r--bitbake/lib/hashserv/tests.py19
5 files changed, 362 insertions, 5 deletions
diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv
index a916a90cb0..59b8b07f59 100755
--- a/bitbake/bin/bitbake-hashserv
+++ b/bitbake/bin/bitbake-hashserv
@@ -69,6 +69,16 @@ To bind to all addresses, leave the ADDRESS empty, e.g. "--bind :8686" or
69 action="store_true", 69 action="store_true",
70 help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)", 70 help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)",
71 ) 71 )
72 parser.add_argument(
73 "--db-username",
74 default=os.environ.get("HASHSERVER_DB_USERNAME", None),
75 help="Database username ($HASHSERVER_DB_USERNAME)",
76 )
77 parser.add_argument(
78 "--db-password",
79 default=os.environ.get("HASHSERVER_DB_PASSWORD", None),
80 help="Database password ($HASHSERVER_DB_PASSWORD)",
81 )
72 82
73 args = parser.parse_args() 83 args = parser.parse_args()
74 84
@@ -90,6 +100,8 @@ To bind to all addresses, leave the ADDRESS empty, e.g. "--bind :8686" or
90 args.database, 100 args.database,
91 upstream=args.upstream, 101 upstream=args.upstream,
92 read_only=read_only, 102 read_only=read_only,
103 db_username=args.db_username,
104 db_password=args.db_password,
93 ) 105 )
94 server.serve_forever() 106 server.serve_forever()
95 return 0 107 return 0
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py
index a10628f75a..7f0cf6ba96 100644
--- a/bitbake/lib/bb/asyncrpc/connection.py
+++ b/bitbake/lib/bb/asyncrpc/connection.py
@@ -7,6 +7,7 @@
7import asyncio 7import asyncio
8import itertools 8import itertools
9import json 9import json
10from datetime import datetime
10from .exceptions import ClientError, ConnectionClosedError 11from .exceptions import ClientError, ConnectionClosedError
11 12
12 13
@@ -30,6 +31,12 @@ def chunkify(msg, max_chunk):
30 yield "\n" 31 yield "\n"
31 32
32 33
34def json_serialize(obj):
35 if isinstance(obj, datetime):
36 return obj.isoformat()
37 raise TypeError("Type %s not serializeable" % type(obj))
38
39
33class StreamConnection(object): 40class StreamConnection(object):
34 def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): 41 def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK):
35 self.reader = reader 42 self.reader = reader
@@ -42,7 +49,7 @@ class StreamConnection(object):
42 return self.writer.get_extra_info("peername") 49 return self.writer.get_extra_info("peername")
43 50
44 async def send_message(self, msg): 51 async def send_message(self, msg):
45 for c in chunkify(json.dumps(msg), self.max_chunk): 52 for c in chunkify(json.dumps(msg, default=json_serialize), self.max_chunk):
46 self.writer.write(c.encode("utf-8")) 53 self.writer.write(c.encode("utf-8"))
47 await self.writer.drain() 54 await self.writer.drain()
48 55
@@ -105,7 +112,7 @@ class WebsocketConnection(object):
105 return ":".join(str(s) for s in self.socket.remote_address) 112 return ":".join(str(s) for s in self.socket.remote_address)
106 113
107 async def send_message(self, msg): 114 async def send_message(self, msg):
108 await self.send(json.dumps(msg)) 115 await self.send(json.dumps(msg, default=json_serialize))
109 116
110 async def recv_message(self): 117 async def recv_message(self):
111 m = await self.recv() 118 m = await self.recv()
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 90d8cff15f..9a8ee4e88b 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -35,15 +35,32 @@ def parse_address(addr):
35 return (ADDR_TYPE_TCP, (host, int(port))) 35 return (ADDR_TYPE_TCP, (host, int(port)))
36 36
37 37
38def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): 38def create_server(
39 addr,
40 dbname,
41 *,
42 sync=True,
43 upstream=None,
44 read_only=False,
45 db_username=None,
46 db_password=None
47):
39 def sqlite_engine(): 48 def sqlite_engine():
40 from .sqlite import DatabaseEngine 49 from .sqlite import DatabaseEngine
41 50
42 return DatabaseEngine(dbname, sync) 51 return DatabaseEngine(dbname, sync)
43 52
53 def sqlalchemy_engine():
54 from .sqlalchemy import DatabaseEngine
55
56 return DatabaseEngine(dbname, db_username, db_password)
57
44 from . import server 58 from . import server
45 59
46 db_engine = sqlite_engine() 60 if "://" in dbname:
61 db_engine = sqlalchemy_engine()
62 else:
63 db_engine = sqlite_engine()
47 64
48 s = server.Server(db_engine, upstream=upstream, read_only=read_only) 65 s = server.Server(db_engine, upstream=upstream, read_only=read_only)
49 66
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
new file mode 100644
index 0000000000..3216621f9d
--- /dev/null
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -0,0 +1,304 @@
1#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import logging
9from datetime import datetime
10
11from sqlalchemy.ext.asyncio import create_async_engine
12from sqlalchemy.pool import NullPool
13from sqlalchemy import (
14 MetaData,
15 Column,
16 Table,
17 Text,
18 Integer,
19 UniqueConstraint,
20 DateTime,
21 Index,
22 select,
23 insert,
24 exists,
25 literal,
26 and_,
27 delete,
28)
29import sqlalchemy.engine
30from sqlalchemy.orm import declarative_base
31from sqlalchemy.exc import IntegrityError
32
33logger = logging.getLogger("hashserv.sqlalchemy")
34
35Base = declarative_base()
36
37
38class UnihashesV2(Base):
39 __tablename__ = "unihashes_v2"
40 id = Column(Integer, primary_key=True, autoincrement=True)
41 method = Column(Text, nullable=False)
42 taskhash = Column(Text, nullable=False)
43 unihash = Column(Text, nullable=False)
44
45 __table_args__ = (
46 UniqueConstraint("method", "taskhash"),
47 Index("taskhash_lookup_v3", "method", "taskhash"),
48 )
49
50
51class OuthashesV2(Base):
52 __tablename__ = "outhashes_v2"
53 id = Column(Integer, primary_key=True, autoincrement=True)
54 method = Column(Text, nullable=False)
55 taskhash = Column(Text, nullable=False)
56 outhash = Column(Text, nullable=False)
57 created = Column(DateTime)
58 owner = Column(Text)
59 PN = Column(Text)
60 PV = Column(Text)
61 PR = Column(Text)
62 task = Column(Text)
63 outhash_siginfo = Column(Text)
64
65 __table_args__ = (
66 UniqueConstraint("method", "taskhash", "outhash"),
67 Index("outhash_lookup_v3", "method", "outhash"),
68 )
69
70
71class DatabaseEngine(object):
72 def __init__(self, url, username=None, password=None):
73 self.logger = logger
74 self.url = sqlalchemy.engine.make_url(url)
75
76 if username is not None:
77 self.url = self.url.set(username=username)
78
79 if password is not None:
80 self.url = self.url.set(password=password)
81
82 async def create(self):
83 self.logger.info("Using database %s", self.url)
84 self.engine = create_async_engine(self.url, poolclass=NullPool)
85
86 async with self.engine.begin() as conn:
87 # Create tables
88 logger.info("Creating tables...")
89 await conn.run_sync(Base.metadata.create_all)
90
91 def connect(self, logger):
92 return Database(self.engine, logger)
93
94
95def map_row(row):
96 if row is None:
97 return None
98 return dict(**row._mapping)
99
100
101class Database(object):
102 def __init__(self, engine, logger):
103 self.engine = engine
104 self.db = None
105 self.logger = logger
106
107 async def __aenter__(self):
108 self.db = await self.engine.connect()
109 return self
110
111 async def __aexit__(self, exc_type, exc_value, traceback):
112 await self.close()
113
114 async def close(self):
115 await self.db.close()
116 self.db = None
117
118 async def get_unihash_by_taskhash_full(self, method, taskhash):
119 statement = (
120 select(
121 OuthashesV2,
122 UnihashesV2.unihash.label("unihash"),
123 )
124 .join(
125 UnihashesV2,
126 and_(
127 UnihashesV2.method == OuthashesV2.method,
128 UnihashesV2.taskhash == OuthashesV2.taskhash,
129 ),
130 )
131 .where(
132 OuthashesV2.method == method,
133 OuthashesV2.taskhash == taskhash,
134 )
135 .order_by(
136 OuthashesV2.created.asc(),
137 )
138 .limit(1)
139 )
140 self.logger.debug("%s", statement)
141 async with self.db.begin():
142 result = await self.db.execute(statement)
143 return map_row(result.first())
144
145 async def get_unihash_by_outhash(self, method, outhash):
146 statement = (
147 select(OuthashesV2, UnihashesV2.unihash.label("unihash"))
148 .join(
149 UnihashesV2,
150 and_(
151 UnihashesV2.method == OuthashesV2.method,
152 UnihashesV2.taskhash == OuthashesV2.taskhash,
153 ),
154 )
155 .where(
156 OuthashesV2.method == method,
157 OuthashesV2.outhash == outhash,
158 )
159 .order_by(
160 OuthashesV2.created.asc(),
161 )
162 .limit(1)
163 )
164 self.logger.debug("%s", statement)
165 async with self.db.begin():
166 result = await self.db.execute(statement)
167 return map_row(result.first())
168
169 async def get_outhash(self, method, outhash):
170 statement = (
171 select(OuthashesV2)
172 .where(
173 OuthashesV2.method == method,
174 OuthashesV2.outhash == outhash,
175 )
176 .order_by(
177 OuthashesV2.created.asc(),
178 )
179 .limit(1)
180 )
181
182 self.logger.debug("%s", statement)
183 async with self.db.begin():
184 result = await self.db.execute(statement)
185 return map_row(result.first())
186
187 async def get_equivalent_for_outhash(self, method, outhash, taskhash):
188 statement = (
189 select(
190 OuthashesV2.taskhash.label("taskhash"),
191 UnihashesV2.unihash.label("unihash"),
192 )
193 .join(
194 UnihashesV2,
195 and_(
196 UnihashesV2.method == OuthashesV2.method,
197 UnihashesV2.taskhash == OuthashesV2.taskhash,
198 ),
199 )
200 .where(
201 OuthashesV2.method == method,
202 OuthashesV2.outhash == outhash,
203 OuthashesV2.taskhash != taskhash,
204 )
205 .order_by(
206 OuthashesV2.created.asc(),
207 )
208 .limit(1)
209 )
210 self.logger.debug("%s", statement)
211 async with self.db.begin():
212 result = await self.db.execute(statement)
213 return map_row(result.first())
214
215 async def get_equivalent(self, method, taskhash):
216 statement = select(
217 UnihashesV2.unihash,
218 UnihashesV2.method,
219 UnihashesV2.taskhash,
220 ).where(
221 UnihashesV2.method == method,
222 UnihashesV2.taskhash == taskhash,
223 )
224 self.logger.debug("%s", statement)
225 async with self.db.begin():
226 result = await self.db.execute(statement)
227 return map_row(result.first())
228
229 async def remove(self, condition):
230 async def do_remove(table):
231 where = {}
232 for c in table.__table__.columns:
233 if c.key in condition and condition[c.key] is not None:
234 where[c] = condition[c.key]
235
236 if where:
237 statement = delete(table).where(*[(k == v) for k, v in where.items()])
238 self.logger.debug("%s", statement)
239 async with self.db.begin():
240 result = await self.db.execute(statement)
241 return result.rowcount
242
243 return 0
244
245 count = 0
246 count += await do_remove(UnihashesV2)
247 count += await do_remove(OuthashesV2)
248
249 return count
250
251 async def clean_unused(self, oldest):
252 statement = delete(OuthashesV2).where(
253 OuthashesV2.created < oldest,
254 ~(
255 select(UnihashesV2.id)
256 .where(
257 UnihashesV2.method == OuthashesV2.method,
258 UnihashesV2.taskhash == OuthashesV2.taskhash,
259 )
260 .limit(1)
261 .exists()
262 ),
263 )
264 self.logger.debug("%s", statement)
265 async with self.db.begin():
266 result = await self.db.execute(statement)
267 return result.rowcount
268
269 async def insert_unihash(self, method, taskhash, unihash):
270 statement = insert(UnihashesV2).values(
271 method=method,
272 taskhash=taskhash,
273 unihash=unihash,
274 )
275 self.logger.debug("%s", statement)
276 try:
277 async with self.db.begin():
278 await self.db.execute(statement)
279 return True
280 except IntegrityError:
281 logger.debug(
282 "%s, %s, %s already in unihash database", method, taskhash, unihash
283 )
284 return False
285
286 async def insert_outhash(self, data):
287 outhash_columns = set(c.key for c in OuthashesV2.__table__.columns)
288
289 data = {k: v for k, v in data.items() if k in outhash_columns}
290
291 if "created" in data and not isinstance(data["created"], datetime):
292 data["created"] = datetime.fromisoformat(data["created"])
293
294 statement = insert(OuthashesV2).values(**data)
295 self.logger.debug("%s", statement)
296 try:
297 async with self.db.begin():
298 await self.db.execute(statement)
299 return True
300 except IntegrityError:
301 logger.debug(
302 "%s, %s already in outhash database", data["method"], data["outhash"]
303 )
304 return False
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index 4c98a280a5..268b27006f 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -33,7 +33,7 @@ class HashEquivalenceTestSetup(object):
33 def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): 33 def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc):
34 self.server_index += 1 34 self.server_index += 1
35 if dbpath is None: 35 if dbpath is None:
36 dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) 36 dbpath = self.make_dbpath()
37 37
38 def cleanup_server(server): 38 def cleanup_server(server):
39 if server.process.exitcode is not None: 39 if server.process.exitcode is not None:
@@ -53,6 +53,9 @@ class HashEquivalenceTestSetup(object):
53 53
54 return server 54 return server
55 55
56 def make_dbpath(self):
57 return os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
58
56 def start_client(self, server_address): 59 def start_client(self, server_address):
57 def cleanup_client(client): 60 def cleanup_client(client):
58 client.close() 61 client.close()
@@ -517,6 +520,20 @@ class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalen
517 return "ws://%s:0" % host 520 return "ws://%s:0" % host
518 521
519 522
523class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocketServer):
524 def setUp(self):
525 try:
526 import sqlalchemy
527 import aiosqlite
528 except ImportError as e:
529 self.skipTest(str(e))
530
531 super().setUp()
532
533 def make_dbpath(self):
534 return "sqlite+aiosqlite:///%s" % os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
535
536
520class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): 537class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
521 def start_test_server(self): 538 def start_test_server(self):
522 if 'BB_TEST_HASHSERV' not in os.environ: 539 if 'BB_TEST_HASHSERV' not in os.environ: