diff options
Diffstat (limited to 'bitbake/lib')
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 11 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/__init__.py | 21 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/sqlalchemy.py | 304 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 19 |
4 files changed, 350 insertions, 5 deletions
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index a10628f75a..7f0cf6ba96 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
| @@ -7,6 +7,7 @@ | |||
| 7 | import asyncio | 7 | import asyncio |
| 8 | import itertools | 8 | import itertools |
| 9 | import json | 9 | import json |
| 10 | from datetime import datetime | ||
| 10 | from .exceptions import ClientError, ConnectionClosedError | 11 | from .exceptions import ClientError, ConnectionClosedError |
| 11 | 12 | ||
| 12 | 13 | ||
| @@ -30,6 +31,12 @@ def chunkify(msg, max_chunk): | |||
| 30 | yield "\n" | 31 | yield "\n" |
| 31 | 32 | ||
| 32 | 33 | ||
| 34 | def json_serialize(obj): | ||
| 35 | if isinstance(obj, datetime): | ||
| 36 | return obj.isoformat() | ||
| 37 | raise TypeError("Type %s not serializeable" % type(obj)) | ||
| 38 | |||
| 39 | |||
| 33 | class StreamConnection(object): | 40 | class StreamConnection(object): |
| 34 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): | 41 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): |
| 35 | self.reader = reader | 42 | self.reader = reader |
| @@ -42,7 +49,7 @@ class StreamConnection(object): | |||
| 42 | return self.writer.get_extra_info("peername") | 49 | return self.writer.get_extra_info("peername") |
| 43 | 50 | ||
| 44 | async def send_message(self, msg): | 51 | async def send_message(self, msg): |
| 45 | for c in chunkify(json.dumps(msg), self.max_chunk): | 52 | for c in chunkify(json.dumps(msg, default=json_serialize), self.max_chunk): |
| 46 | self.writer.write(c.encode("utf-8")) | 53 | self.writer.write(c.encode("utf-8")) |
| 47 | await self.writer.drain() | 54 | await self.writer.drain() |
| 48 | 55 | ||
| @@ -105,7 +112,7 @@ class WebsocketConnection(object): | |||
| 105 | return ":".join(str(s) for s in self.socket.remote_address) | 112 | return ":".join(str(s) for s in self.socket.remote_address) |
| 106 | 113 | ||
| 107 | async def send_message(self, msg): | 114 | async def send_message(self, msg): |
| 108 | await self.send(json.dumps(msg)) | 115 | await self.send(json.dumps(msg, default=json_serialize)) |
| 109 | 116 | ||
| 110 | async def recv_message(self): | 117 | async def recv_message(self): |
| 111 | m = await self.recv() | 118 | m = await self.recv() |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 90d8cff15f..9a8ee4e88b 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
| @@ -35,15 +35,32 @@ def parse_address(addr): | |||
| 35 | return (ADDR_TYPE_TCP, (host, int(port))) | 35 | return (ADDR_TYPE_TCP, (host, int(port))) |
| 36 | 36 | ||
| 37 | 37 | ||
| 38 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 38 | def create_server( |
| 39 | addr, | ||
| 40 | dbname, | ||
| 41 | *, | ||
| 42 | sync=True, | ||
| 43 | upstream=None, | ||
| 44 | read_only=False, | ||
| 45 | db_username=None, | ||
| 46 | db_password=None | ||
| 47 | ): | ||
| 39 | def sqlite_engine(): | 48 | def sqlite_engine(): |
| 40 | from .sqlite import DatabaseEngine | 49 | from .sqlite import DatabaseEngine |
| 41 | 50 | ||
| 42 | return DatabaseEngine(dbname, sync) | 51 | return DatabaseEngine(dbname, sync) |
| 43 | 52 | ||
| 53 | def sqlalchemy_engine(): | ||
| 54 | from .sqlalchemy import DatabaseEngine | ||
| 55 | |||
| 56 | return DatabaseEngine(dbname, db_username, db_password) | ||
| 57 | |||
| 44 | from . import server | 58 | from . import server |
| 45 | 59 | ||
| 46 | db_engine = sqlite_engine() | 60 | if "://" in dbname: |
| 61 | db_engine = sqlalchemy_engine() | ||
| 62 | else: | ||
| 63 | db_engine = sqlite_engine() | ||
| 47 | 64 | ||
| 48 | s = server.Server(db_engine, upstream=upstream, read_only=read_only) | 65 | s = server.Server(db_engine, upstream=upstream, read_only=read_only) |
| 49 | 66 | ||
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py new file mode 100644 index 0000000000..3216621f9d --- /dev/null +++ b/bitbake/lib/hashserv/sqlalchemy.py | |||
| @@ -0,0 +1,304 @@ | |||
| 1 | #! /usr/bin/env python3 | ||
| 2 | # | ||
| 3 | # Copyright (C) 2023 Garmin Ltd. | ||
| 4 | # | ||
| 5 | # SPDX-License-Identifier: GPL-2.0-only | ||
| 6 | # | ||
| 7 | |||
| 8 | import logging | ||
| 9 | from datetime import datetime | ||
| 10 | |||
| 11 | from sqlalchemy.ext.asyncio import create_async_engine | ||
| 12 | from sqlalchemy.pool import NullPool | ||
| 13 | from sqlalchemy import ( | ||
| 14 | MetaData, | ||
| 15 | Column, | ||
| 16 | Table, | ||
| 17 | Text, | ||
| 18 | Integer, | ||
| 19 | UniqueConstraint, | ||
| 20 | DateTime, | ||
| 21 | Index, | ||
| 22 | select, | ||
| 23 | insert, | ||
| 24 | exists, | ||
| 25 | literal, | ||
| 26 | and_, | ||
| 27 | delete, | ||
| 28 | ) | ||
| 29 | import sqlalchemy.engine | ||
| 30 | from sqlalchemy.orm import declarative_base | ||
| 31 | from sqlalchemy.exc import IntegrityError | ||
| 32 | |||
| 33 | logger = logging.getLogger("hashserv.sqlalchemy") | ||
| 34 | |||
| 35 | Base = declarative_base() | ||
| 36 | |||
| 37 | |||
| 38 | class UnihashesV2(Base): | ||
| 39 | __tablename__ = "unihashes_v2" | ||
| 40 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
| 41 | method = Column(Text, nullable=False) | ||
| 42 | taskhash = Column(Text, nullable=False) | ||
| 43 | unihash = Column(Text, nullable=False) | ||
| 44 | |||
| 45 | __table_args__ = ( | ||
| 46 | UniqueConstraint("method", "taskhash"), | ||
| 47 | Index("taskhash_lookup_v3", "method", "taskhash"), | ||
| 48 | ) | ||
| 49 | |||
| 50 | |||
| 51 | class OuthashesV2(Base): | ||
| 52 | __tablename__ = "outhashes_v2" | ||
| 53 | id = Column(Integer, primary_key=True, autoincrement=True) | ||
| 54 | method = Column(Text, nullable=False) | ||
| 55 | taskhash = Column(Text, nullable=False) | ||
| 56 | outhash = Column(Text, nullable=False) | ||
| 57 | created = Column(DateTime) | ||
| 58 | owner = Column(Text) | ||
| 59 | PN = Column(Text) | ||
| 60 | PV = Column(Text) | ||
| 61 | PR = Column(Text) | ||
| 62 | task = Column(Text) | ||
| 63 | outhash_siginfo = Column(Text) | ||
| 64 | |||
| 65 | __table_args__ = ( | ||
| 66 | UniqueConstraint("method", "taskhash", "outhash"), | ||
| 67 | Index("outhash_lookup_v3", "method", "outhash"), | ||
| 68 | ) | ||
| 69 | |||
| 70 | |||
| 71 | class DatabaseEngine(object): | ||
| 72 | def __init__(self, url, username=None, password=None): | ||
| 73 | self.logger = logger | ||
| 74 | self.url = sqlalchemy.engine.make_url(url) | ||
| 75 | |||
| 76 | if username is not None: | ||
| 77 | self.url = self.url.set(username=username) | ||
| 78 | |||
| 79 | if password is not None: | ||
| 80 | self.url = self.url.set(password=password) | ||
| 81 | |||
| 82 | async def create(self): | ||
| 83 | self.logger.info("Using database %s", self.url) | ||
| 84 | self.engine = create_async_engine(self.url, poolclass=NullPool) | ||
| 85 | |||
| 86 | async with self.engine.begin() as conn: | ||
| 87 | # Create tables | ||
| 88 | logger.info("Creating tables...") | ||
| 89 | await conn.run_sync(Base.metadata.create_all) | ||
| 90 | |||
| 91 | def connect(self, logger): | ||
| 92 | return Database(self.engine, logger) | ||
| 93 | |||
| 94 | |||
| 95 | def map_row(row): | ||
| 96 | if row is None: | ||
| 97 | return None | ||
| 98 | return dict(**row._mapping) | ||
| 99 | |||
| 100 | |||
| 101 | class Database(object): | ||
| 102 | def __init__(self, engine, logger): | ||
| 103 | self.engine = engine | ||
| 104 | self.db = None | ||
| 105 | self.logger = logger | ||
| 106 | |||
| 107 | async def __aenter__(self): | ||
| 108 | self.db = await self.engine.connect() | ||
| 109 | return self | ||
| 110 | |||
| 111 | async def __aexit__(self, exc_type, exc_value, traceback): | ||
| 112 | await self.close() | ||
| 113 | |||
| 114 | async def close(self): | ||
| 115 | await self.db.close() | ||
| 116 | self.db = None | ||
| 117 | |||
| 118 | async def get_unihash_by_taskhash_full(self, method, taskhash): | ||
| 119 | statement = ( | ||
| 120 | select( | ||
| 121 | OuthashesV2, | ||
| 122 | UnihashesV2.unihash.label("unihash"), | ||
| 123 | ) | ||
| 124 | .join( | ||
| 125 | UnihashesV2, | ||
| 126 | and_( | ||
| 127 | UnihashesV2.method == OuthashesV2.method, | ||
| 128 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
| 129 | ), | ||
| 130 | ) | ||
| 131 | .where( | ||
| 132 | OuthashesV2.method == method, | ||
| 133 | OuthashesV2.taskhash == taskhash, | ||
| 134 | ) | ||
| 135 | .order_by( | ||
| 136 | OuthashesV2.created.asc(), | ||
| 137 | ) | ||
| 138 | .limit(1) | ||
| 139 | ) | ||
| 140 | self.logger.debug("%s", statement) | ||
| 141 | async with self.db.begin(): | ||
| 142 | result = await self.db.execute(statement) | ||
| 143 | return map_row(result.first()) | ||
| 144 | |||
| 145 | async def get_unihash_by_outhash(self, method, outhash): | ||
| 146 | statement = ( | ||
| 147 | select(OuthashesV2, UnihashesV2.unihash.label("unihash")) | ||
| 148 | .join( | ||
| 149 | UnihashesV2, | ||
| 150 | and_( | ||
| 151 | UnihashesV2.method == OuthashesV2.method, | ||
| 152 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
| 153 | ), | ||
| 154 | ) | ||
| 155 | .where( | ||
| 156 | OuthashesV2.method == method, | ||
| 157 | OuthashesV2.outhash == outhash, | ||
| 158 | ) | ||
| 159 | .order_by( | ||
| 160 | OuthashesV2.created.asc(), | ||
| 161 | ) | ||
| 162 | .limit(1) | ||
| 163 | ) | ||
| 164 | self.logger.debug("%s", statement) | ||
| 165 | async with self.db.begin(): | ||
| 166 | result = await self.db.execute(statement) | ||
| 167 | return map_row(result.first()) | ||
| 168 | |||
| 169 | async def get_outhash(self, method, outhash): | ||
| 170 | statement = ( | ||
| 171 | select(OuthashesV2) | ||
| 172 | .where( | ||
| 173 | OuthashesV2.method == method, | ||
| 174 | OuthashesV2.outhash == outhash, | ||
| 175 | ) | ||
| 176 | .order_by( | ||
| 177 | OuthashesV2.created.asc(), | ||
| 178 | ) | ||
| 179 | .limit(1) | ||
| 180 | ) | ||
| 181 | |||
| 182 | self.logger.debug("%s", statement) | ||
| 183 | async with self.db.begin(): | ||
| 184 | result = await self.db.execute(statement) | ||
| 185 | return map_row(result.first()) | ||
| 186 | |||
| 187 | async def get_equivalent_for_outhash(self, method, outhash, taskhash): | ||
| 188 | statement = ( | ||
| 189 | select( | ||
| 190 | OuthashesV2.taskhash.label("taskhash"), | ||
| 191 | UnihashesV2.unihash.label("unihash"), | ||
| 192 | ) | ||
| 193 | .join( | ||
| 194 | UnihashesV2, | ||
| 195 | and_( | ||
| 196 | UnihashesV2.method == OuthashesV2.method, | ||
| 197 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
| 198 | ), | ||
| 199 | ) | ||
| 200 | .where( | ||
| 201 | OuthashesV2.method == method, | ||
| 202 | OuthashesV2.outhash == outhash, | ||
| 203 | OuthashesV2.taskhash != taskhash, | ||
| 204 | ) | ||
| 205 | .order_by( | ||
| 206 | OuthashesV2.created.asc(), | ||
| 207 | ) | ||
| 208 | .limit(1) | ||
| 209 | ) | ||
| 210 | self.logger.debug("%s", statement) | ||
| 211 | async with self.db.begin(): | ||
| 212 | result = await self.db.execute(statement) | ||
| 213 | return map_row(result.first()) | ||
| 214 | |||
| 215 | async def get_equivalent(self, method, taskhash): | ||
| 216 | statement = select( | ||
| 217 | UnihashesV2.unihash, | ||
| 218 | UnihashesV2.method, | ||
| 219 | UnihashesV2.taskhash, | ||
| 220 | ).where( | ||
| 221 | UnihashesV2.method == method, | ||
| 222 | UnihashesV2.taskhash == taskhash, | ||
| 223 | ) | ||
| 224 | self.logger.debug("%s", statement) | ||
| 225 | async with self.db.begin(): | ||
| 226 | result = await self.db.execute(statement) | ||
| 227 | return map_row(result.first()) | ||
| 228 | |||
| 229 | async def remove(self, condition): | ||
| 230 | async def do_remove(table): | ||
| 231 | where = {} | ||
| 232 | for c in table.__table__.columns: | ||
| 233 | if c.key in condition and condition[c.key] is not None: | ||
| 234 | where[c] = condition[c.key] | ||
| 235 | |||
| 236 | if where: | ||
| 237 | statement = delete(table).where(*[(k == v) for k, v in where.items()]) | ||
| 238 | self.logger.debug("%s", statement) | ||
| 239 | async with self.db.begin(): | ||
| 240 | result = await self.db.execute(statement) | ||
| 241 | return result.rowcount | ||
| 242 | |||
| 243 | return 0 | ||
| 244 | |||
| 245 | count = 0 | ||
| 246 | count += await do_remove(UnihashesV2) | ||
| 247 | count += await do_remove(OuthashesV2) | ||
| 248 | |||
| 249 | return count | ||
| 250 | |||
| 251 | async def clean_unused(self, oldest): | ||
| 252 | statement = delete(OuthashesV2).where( | ||
| 253 | OuthashesV2.created < oldest, | ||
| 254 | ~( | ||
| 255 | select(UnihashesV2.id) | ||
| 256 | .where( | ||
| 257 | UnihashesV2.method == OuthashesV2.method, | ||
| 258 | UnihashesV2.taskhash == OuthashesV2.taskhash, | ||
| 259 | ) | ||
| 260 | .limit(1) | ||
| 261 | .exists() | ||
| 262 | ), | ||
| 263 | ) | ||
| 264 | self.logger.debug("%s", statement) | ||
| 265 | async with self.db.begin(): | ||
| 266 | result = await self.db.execute(statement) | ||
| 267 | return result.rowcount | ||
| 268 | |||
| 269 | async def insert_unihash(self, method, taskhash, unihash): | ||
| 270 | statement = insert(UnihashesV2).values( | ||
| 271 | method=method, | ||
| 272 | taskhash=taskhash, | ||
| 273 | unihash=unihash, | ||
| 274 | ) | ||
| 275 | self.logger.debug("%s", statement) | ||
| 276 | try: | ||
| 277 | async with self.db.begin(): | ||
| 278 | await self.db.execute(statement) | ||
| 279 | return True | ||
| 280 | except IntegrityError: | ||
| 281 | logger.debug( | ||
| 282 | "%s, %s, %s already in unihash database", method, taskhash, unihash | ||
| 283 | ) | ||
| 284 | return False | ||
| 285 | |||
| 286 | async def insert_outhash(self, data): | ||
| 287 | outhash_columns = set(c.key for c in OuthashesV2.__table__.columns) | ||
| 288 | |||
| 289 | data = {k: v for k, v in data.items() if k in outhash_columns} | ||
| 290 | |||
| 291 | if "created" in data and not isinstance(data["created"], datetime): | ||
| 292 | data["created"] = datetime.fromisoformat(data["created"]) | ||
| 293 | |||
| 294 | statement = insert(OuthashesV2).values(**data) | ||
| 295 | self.logger.debug("%s", statement) | ||
| 296 | try: | ||
| 297 | async with self.db.begin(): | ||
| 298 | await self.db.execute(statement) | ||
| 299 | return True | ||
| 300 | except IntegrityError: | ||
| 301 | logger.debug( | ||
| 302 | "%s, %s already in outhash database", data["method"], data["outhash"] | ||
| 303 | ) | ||
| 304 | return False | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index 4c98a280a5..268b27006f 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -33,7 +33,7 @@ class HashEquivalenceTestSetup(object): | |||
| 33 | def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): | 33 | def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc): |
| 34 | self.server_index += 1 | 34 | self.server_index += 1 |
| 35 | if dbpath is None: | 35 | if dbpath is None: |
| 36 | dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) | 36 | dbpath = self.make_dbpath() |
| 37 | 37 | ||
| 38 | def cleanup_server(server): | 38 | def cleanup_server(server): |
| 39 | if server.process.exitcode is not None: | 39 | if server.process.exitcode is not None: |
| @@ -53,6 +53,9 @@ class HashEquivalenceTestSetup(object): | |||
| 53 | 53 | ||
| 54 | return server | 54 | return server |
| 55 | 55 | ||
| 56 | def make_dbpath(self): | ||
| 57 | return os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) | ||
| 58 | |||
| 56 | def start_client(self, server_address): | 59 | def start_client(self, server_address): |
| 57 | def cleanup_client(client): | 60 | def cleanup_client(client): |
| 58 | client.close() | 61 | client.close() |
| @@ -517,6 +520,20 @@ class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalen | |||
| 517 | return "ws://%s:0" % host | 520 | return "ws://%s:0" % host |
| 518 | 521 | ||
| 519 | 522 | ||
| 523 | class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocketServer): | ||
| 524 | def setUp(self): | ||
| 525 | try: | ||
| 526 | import sqlalchemy | ||
| 527 | import aiosqlite | ||
| 528 | except ImportError as e: | ||
| 529 | self.skipTest(str(e)) | ||
| 530 | |||
| 531 | super().setUp() | ||
| 532 | |||
| 533 | def make_dbpath(self): | ||
| 534 | return "sqlite+aiosqlite:///%s" % os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index) | ||
| 535 | |||
| 536 | |||
| 520 | class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | 537 | class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): |
| 521 | def start_test_server(self): | 538 | def start_test_server(self): |
| 522 | if 'BB_TEST_HASHSERV' not in os.environ: | 539 | if 'BB_TEST_HASHSERV' not in os.environ: |
