diff options
-rwxr-xr-x | bitbake/bin/bitbake-hashserv | 12 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 11 | ||||
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 21 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 304 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 19 |
5 files changed, 362 insertions, 5 deletions
diff --git a/bitbake/bin/bitbake-hashserv b/bitbake/bin/bitbake-hashserv index a916a90cb0..59b8b07f59 100755 --- a/bitbake/bin/bitbake-hashserv +++ b/bitbake/bin/bitbake-hashserv | |||
@@ -69,6 +69,16 @@ To bind to all addresses, leave the ADDRESS empty, e.g. "--bind :8686" or | |||
69 | action="store_true", | 69 | action="store_true", |
70 | help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)", | 70 | help="Disallow write operations from clients ($HASHSERVER_READ_ONLY)", |
71 | ) | 71 | ) |
72 | parser.add_argument( | ||
73 | "--db-username", | ||
74 | default=os.environ.get("HASHSERVER_DB_USERNAME", None), | ||
75 | help="Database username ($HASHSERVER_DB_USERNAME)", | ||
76 | ) | ||
77 | parser.add_argument( | ||
78 | "--db-password", | ||
79 | default=os.environ.get("HASHSERVER_DB_PASSWORD", None), | ||
80 | help="Database password ($HASHSERVER_DB_PASSWORD)", | ||
81 | ) | ||
72 | 82 | ||
73 | args = parser.parse_args() | 83 | args = parser.parse_args() |
74 | 84 | ||
@@ -90,6 +100,8 @@ To bind to all addresses, leave the ADDRESS empty, e.g. "--bind :8686" or | |||
90 | args.database, | 100 | args.database, |
91 | upstream=args.upstream, | 101 | upstream=args.upstream, |
92 | read_only=read_only, | 102 | read_only=read_only, |
103 | db_username=args.db_username, | ||
104 | db_password=args.db_password, | ||
93 | ) | 105 | ) |
94 | server.serve_forever() | 106 | server.serve_forever() |
95 | return 0 | 107 | return 0 |
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index a10628f75a..7f0cf6ba96 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
@@ -7,6 +7,7 @@ | |||
7 | import asyncio | 7 | import asyncio |
8 | import itertools | 8 | import itertools |
9 | import json | 9 | import json |
10 | from datetime import datetime | ||
10 | from .exceptions import ClientError, ConnectionClosedError | 11 | from .exceptions import ClientError, ConnectionClosedError |
11 | 12 | ||
12 | 13 | ||
@@ -30,6 +31,12 @@ def chunkify(msg, max_chunk): | |||
30 | yield "\n" | 31 | yield "\n" |
31 | 32 | ||
32 | 33 | ||
34 | def json_serialize(obj): | ||
35 | if isinstance(obj, datetime): | ||
36 | return obj.isoformat() | ||
37 | raise TypeError("Type %s not serializeable" % type(obj)) | ||
38 | |||
39 | |||
33 | class StreamConnection(object): | 40 | class StreamConnection(object): |
34 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): | 41 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): |
35 | self.reader = reader | 42 | self.reader = reader |
@@ -42,7 +49,7 @@ class StreamConnection(object): | |||
42 | return self.writer.get_extra_info("peername") | 49 | return self.writer.get_extra_info("peername") |
43 | 50 | ||
44 | async def send_message(self, msg): | 51 | async def send_message(self, msg): |
45 | for c in chunkify(json.dumps(msg), self.max_chunk): | 52 | for c in chunkify(json.dumps(msg, default=json_serialize), self.max_chunk): |
46 | self.writer.write(c.encode("utf-8")) | 53 | self.writer.write(c.encode("utf-8")) |
47 | await self.writer.drain() | 54 | await self.writer.drain() |
48 | 55 | ||
@@ -105,7 +112,7 @@ class WebsocketConnection(object): | |||
105 | return ":".join(str(s) for s in self.socket.remote_address) | 112 | return ":".join(str(s) for s in self.socket.remote_address) |
106 | 113 | ||
107 | async def send_message(self, msg): | 114 | async def send_message(self, msg): |
108 | await self.send(json.dumps(msg)) | 115 | await self.send(json.dumps(msg, default=json_serialize)) |
109 | 116 | ||
110 | async def recv_message(self): | 117 | async def recv_message(self): |
111 | m = await self.recv() | 118 | m = await self.recv() |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 90d8cff15f..9a8ee4e88b 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -35,15 +35,32 @@ def parse_address(addr): | |||
35 | return (ADDR_TYPE_TCP, (host, int(port))) | 35 | return (ADDR_TYPE_TCP, (host, int(port))) |
36 | 36 | ||
37 | 37 | ||
38 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 38 | def create_server( |
39 | addr, | ||
40 | dbname, | ||
41 | *, | ||
42 | sync=True, | ||
43 | upstream=None, | ||
44 | read_only=False, | ||
45 | db_username=None, | ||
46 | db_password=None | ||
47 | ): | ||
39 | def sqlite_engine(): | 48 | def sqlite_engine(): |
40 | from .sqlite import DatabaseEngine | 49 | from .sqlite import DatabaseEngine |
41 | 50 | ||
42 | return DatabaseEngine(dbname, sync) | 51 | return DatabaseEngine(dbname, sync) |
43 | 52 | ||
53 | def sqlalchemy_engine(): | ||
54 | from .sqlalchemy import DatabaseEngine | ||
55 | |||
56 | return DatabaseEngine(dbname, db_username, db_password) | ||
57 | |||
44 | from . import server | 58 | from . import server |
45 | 59 | ||
46 | db_engine = sqlite_engine() | 60 | if "://" in dbname: |
61 | db_engine = sqlalchemy_engine() | ||
62 | else: | ||
63 | db_engine = sqlite_engine() | ||
47 | 64 | ||
48 | s = server.Server(db_engine, upstream=upstream, read_only=read_only) | 65 | s = server.Server(db_engine, upstream=upstream, read_only=read_only) |
49 | 66 | ||
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py new file mode 100644 index 0000000000..3216621f9d --- /dev/null +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
@@ -0,0 +1,304 @@ | |||
1 | #! /usr/bin/env python3 | ||
2 | # | ||
3 | # Copyright (C) 2023 Garmin Ltd. | ||
4 | # | ||
5 | # SPDX-License-Identifier: GPL-2.0-only | ||
6 | # | ||
7 | |||
8 | import logging | ||
9 | from datetime import datetime | ||
10 | |||
11 | from sqlalchemy.ext.asyncio import create_async_engine | ||
12 | from sqlalchemy.pool import NullPool | ||
13 | from sqlalchemy import ( | ||
14 | MetaData, | ||
15 | Column, | ||
16 | Table, | ||
17 | Text, | ||
18 | Integer, | ||
19 | UniqueConstraint, | ||
20 | DateTime, | ||
21 | Index, | ||
22 | select, | ||
23 | insert, | ||
24 | exists, | ||
25 | literal, | ||
26 | and_, | ||
27 | delete, | ||
28 | ) | ||
29 | import sqlalchemy.engine | ||
30 | from sqlalchemy.orm import declarative_base | ||
31 | from sqlalchemy.exc import IntegrityError | ||
32 | |||
33 | logger = logging.getLogger("hashserv.sqlalchemy") | ||
34 | |||
35 | Base = declarative_base() | ||
36 | |||
37 | |||
38 | class UnihashesV2(Base): | ||
39 | __tablename__ = "unihashes_v2" | ||
40 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
41 | method = Column(Text, nullable=False) | ||
42 | taskhash = Column(Text, nullable=False) | ||
43 | unihash = Column(Text, nullable=False) | ||
44 | |||
45 | __table_args__ = ( | ||
46 | UniqueConstraint("method", "taskhash"), | ||
47 | Index("taskhash_lookup_v3", "method", "taskhash"), | ||
48 | ) | ||
49 | |||
50 | |||
51 | class OuthashesV2(Base): | ||
52 | __tablename__ = "outhashes_v2" | ||
53 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
54 | method = Column(Text, nullable=False) | ||
55 | taskhash = Column(Text, nullable=False) | ||
56 | outhash = Column(Text, nullable=False) | ||
57 | created = Column(DateTime) | ||
58 | owner = Column(Text) | ||
59 | PN = Column(Text) | ||
60 | PV = Column(Text) | ||
61 | PR = Column(Text) | ||
62 | task = Column(Text) | ||
63 | outhash_siginfo = Column(Text) | ||
64 | |||
65 | __table_args__ = ( | ||
66 | UniqueConstraint("method", "taskhash", "outhash"), | ||
67 | Index("outhash_lookup_v3", "method", "outhash"), | ||
68 | ) | ||
69 | |||
70 | |||
71 | class DatabaseEngine(object): | ||
72 | def __init__(self, url, username=None, password=None): | ||
73 | self.logger = logger | ||
74 | self.url = sqlalchemy.engine.make_url(url) | ||
75 | |||
76 | if username is not None: | ||
77 | self.url = self.url.set(username=username) | ||
78 | |||
79 | if password is not None: | ||
80 | self.url = self.url.set(password=password) | ||
81 | |||
82 | async def create(self): | ||
83 | self.logger.info("Using database %s", self.url) | ||
84 | self.engine = create_async_engine(self.url, poolclass=NullPool) | ||
85 | |||
86 | async with self.engine.begin() as conn: | ||
87 | # Create tables | ||
88 | logger.info("Creating tables...") | ||
89 | await conn.run_sync(Base.metadata.create_all) | ||
90 | |||
91 | def connect(self, logger): | ||
92 | return Database(self.engine, logger) | ||
93 | |||
94 | |||
95 | def map_row(row): | ||
96 | if row is None: | ||
97 | return None | ||
98 | return dict(**row._mapping) | ||
99 | |||
100 | |||
101 | class Database(object): | ||
102 | def __init__(self, engine, logger): | ||
103 | self.engine = engine | ||
104 | self.db = None | ||
105 | self.logger = logger | ||
106 | |||
107 | async def __aenter__(self): | ||
108 | self.db = await self.engine.connect() | ||
109 | return self | ||
110 | |||
111 | async def __aexit__(self, exc_type, exc_value, traceback): | ||
112 | await self.close() | ||
113 | |||
114 | async def close(self): | ||
115 | await self.db.close() | ||
116 | self.db = None | ||
117 | |||
118 | async def get_unihash_by_taskhash_full(self, method, taskhash): | ||
119 | statement = ( | ||
120 | select( | ||
121 | OuthashesV2, | ||
122 | UnihashesV2.unihash.label("unihash"), | ||
123 | ) | ||
124 | .join( | ||
125 | UnihashesV2, | ||
126 | and_( | ||
127 | UnihashesV2.method == OuthashesV2.method, | ||
128 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
129 | ), | ||
130 | ) | ||
131 | .where( | ||
132 | OuthashesV2.method == method, | ||
133 | OuthashesV2.taskhash == taskhash, | ||
134 | ) | ||
135 | .order_by( | ||
136 | OuthashesV2.created.asc(), | ||
137 | ) | ||
138 | .limit(1) | ||
139 | ) | ||
140 | self.logger.debug("%s", statement) | ||
141 | async with self.db.begin(): | ||
142 | result = await self.db.execute(statement) | ||
143 | return map_row(result.first()) | ||
144 | |||
145 | async def get_unihash_by_outhash(self, method, outhash): | ||
146 | statement = ( | ||
147 | select(OuthashesV2, UnihashesV2.unihash.label("unihash")) | ||
148 | .join( | ||
149 | UnihashesV2, | ||
150 | and_( | ||
151 | UnihashesV2.method == OuthashesV2.method, | ||
152 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
153 | ), | ||
154 | ) | ||
155 | .where( | ||
156 | OuthashesV2.method == method, | ||
157 | OuthashesV2.outhash == outhash, | ||
158 | ) | ||
159 | .order_by( | ||
160 | OuthashesV2.created.asc(), | ||
161 | ) | ||
162 | .limit(1) | ||
163 | ) | ||
164 | self.logger.debug("%s", statement) | ||
165 | async with self.db.begin(): | ||
166 | result = await self.db.execute(statement) | ||
167 | return map_row(result.first()) | ||
168 | |||
169 | async def get_outhash(self, method, outhash): | ||
170 | statement = ( | ||
171 | select(OuthashesV2) | ||
172 | .where( | ||
173 | OuthashesV2.method == method, | ||
174 | OuthashesV2.outhash == outhash, | ||
175 | ) | ||
176 | .order_by( | ||
177 | OuthashesV2.created.asc(), | ||
178 | ) | ||
179 | .limit(1) | ||
180 | ) | ||
181 | |||
182 | self.logger.debug("%s", statement) | ||
183 | async with self.db.begin(): | ||
184 | result = await self.db.execute(statement) | ||
185 | return map_row(result.first()) | ||
186 | |||
187 | async def get_equivalent_for_outhash(self, method, outhash, taskhash): | ||
188 | statement = ( | ||
189 | select( | ||
190 | OuthashesV2.taskhash.label("taskhash"), | ||
191 | UnihashesV2.unihash.label("unihash"), | ||
192 | ) | ||
193 | .join( | ||
194 | UnihashesV2, | ||
195 | and_( | ||
196 | UnihashesV2.method == OuthashesV2.method, | ||
197 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
198 | ), | ||
199 | ) | ||
200 | .where( | ||
201 | OuthashesV2.method == method, | ||
202 | OuthashesV2.outhash == outhash, | ||
203 | OuthashesV2.taskhash != taskhash, | ||
204 | ) | ||
205 | .order_by( | ||
206 | OuthashesV2.created.asc(), | ||
207 | ) | ||
208 | .limit(1) | ||
209 | ) | ||
210 | self.logger.debug("%s", statement) | ||
211 | async with self.db.begin(): | ||
212 | result = await self.db.execute(statement) | ||
213 | return map_row(result.first()) | ||
214 | |||
215 | async def get_equivalent(self, method, taskhash): | ||
216 | statement = select( | ||
217 | UnihashesV2.unihash, | ||
218 | UnihashesV2.method, | ||
219 | UnihashesV2.taskhash, | ||
220 | ).where( | ||
221 | UnihashesV2.method == method, | ||
222 | UnihashesV2.taskhash == taskhash, | ||
223 | ) | ||
224 | self.logger.debug("%s", statement) | ||
225 | async with self.db.begin(): | ||
226 | result = await self.db.execute(statement) | ||
227 | return map_row(result.first()) | ||
228 | |||
229 | async def remove(self, condition): | ||
230 | async def do_remove(table): | ||
231 | where = {} | ||
232 | for c in table.__table__.columns: | ||
233 | if c.key in condition and condition[c.key] is not None: | ||
234 | where[c] = condition[c.key] | ||
235 | |||
236 | if where: | ||
237 | statement = delete(table).where(*[(k == v) for k, v in where.items()]) | ||
238 | self.logger.debug("%s", statement) | ||
239 | async with self.db.begin(): | ||
240 | result = await self.db.execute(statement) | ||
241 | return result.rowcount | ||
242 | |||
243 | return 0 | ||
244 | |||
245 | count = 0 | ||
246 | count += await do_remove(UnihashesV2) | ||
247 | count += await do_remove(OuthashesV2) | ||
248 | |||
249 | return count | ||
250 | |||
251 | async def clean_unused(self, oldest): | ||
252 | statement = delete(OuthashesV2).where( | ||
253 | OuthashesV2.created < oldest, | ||
254 | ~( | ||
255 | select(UnihashesV2.id) | ||
256 | .where( | ||
257 | UnihashesV2.method == OuthashesV2.method, | ||
258 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
259 | ) | ||
260 | .limit(1) | ||
261 | .exists() | ||
262 | ), | ||
263 | ) | ||
264 | self.logger.debug("%s", statement) | ||
265 | async with self.db.begin(): | ||
266 | result = await self.db.execute(statement) | ||
267 | return result.rowcount | ||
268 | |||
269 | async def insert_unihash(self, method, taskhash, unihash): | ||
270 | statement = insert(UnihashesV2).values( | ||
271 | method=method, | ||
272 | taskhash=taskhash, | ||
273 | unihash=unihash, | ||
274 | ) | ||
275 | self.logger.debug("%s", statement) | ||
276 | try: | ||
277 | async with self.db.begin(): | ||
278 | await self.db.execute(statement) | ||
279 | return True | ||
280 | except IntegrityError: | ||
281 | logger.debug( | ||
282 | "%s, %s, %s already in unihash database", method, taskhash, unihash | ||
283 | ) | ||
284 | return False | ||
285 | |||
286 | async def insert_outhash(self, data): | ||
287 | outhash_columns = set(c.key for c in OuthashesV2.__table__.columns) | ||
288 | |||
289 | data = {k: v for k, v in data.items() if k in outhash_columns} | ||
290 | |||
291 | if "created" in data and not isinstance(data["created"], datetime): | ||
292 | data["created"] = datetime.fromisoformat(data["created"]) | ||
293 | |||
294 | statement = insert(OuthashesV2).values(**data) | ||
295 | self.logger.debug("%s", statement) | ||
296 | try: | ||
297 | async with self.db.begin(): | ||
298 | await self.db.execute(statement) | ||
299 | return True | ||
300 | except IntegrityError: | ||
301 | logger.debug( | ||
302 | "%s, %s already in outhash database", data["method"], data["outhash"] | ||
303 | ) | ||
304 | return False | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 4c98a280a5..268b27006f 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -33,7 +33,7 @@ class HashEquivalenceTestSetup(object): | |||
33 | def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): | 33 | def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): |
34 | self.server_index += 1 | 34 | self.server_index += 1 |
35 | if dbpath is None: | 35 | if dbpath is None: |
36 | dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) | 36 | dbpath = self.make_dbpath() |
37 | 37 | ||
38 | def cleanup_server(server): | 38 | def cleanup_server(server): |
39 | if server.process.exitcode is not None: | 39 | if server.process.exitcode is not None: |
@@ -53,6 +53,9 @@ class HashEquivalenceTestSetup(object): | |||
53 | 53 | ||
54 | return server | 54 | return server |
55 | 55 | ||
56 | def make_dbpath(self): | ||
57 | return os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) | ||
58 | |||
56 | def start_client(self, server_address): | 59 | def start_client(self, server_address): |
57 | def cleanup_client(client): | 60 | def cleanup_client(client): |
58 | client.close() | 61 | client.close() |
@@ -517,6 +520,20 @@ class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalen | |||
517 | return "ws://%s:0" % host | 520 | return "ws://%s:0" % host |
518 | 521 | ||
519 | 522 | ||
523 | class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocketServer): | ||
524 | def setUp(self): | ||
525 | try: | ||
526 | import sqlalchemy | ||
527 | import aiosqlite | ||
528 | except ImportError as e: | ||
529 | self.skipTest(str(e)) | ||
530 | |||
531 | super().setUp() | ||
532 | |||
533 | def make_dbpath(self): | ||
534 | return "sqlite+aiosqlite:///%s" % os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) | ||
535 | |||
536 | |||
520 | class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | 537 | class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): |
521 | def start_test_server(self): | 538 | def start_test_server(self): |
522 | if 'BB_TEST_HASHSERV' not in os.environ: | 539 | if 'BB_TEST_HASHSERV' not in os.environ: |