summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/sqlalchemy.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:26 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:33:02 +0000
commitcfbb1d2cc01565610ba06a755e10425ff2076d9b (patch)
tree0e703d7210246573d50e1b07fd505c0001a9ae17 /bitbake/lib/hashserv/sqlalchemy.py
parentbaa3e5391daf41b6dd6e914a112abb00d3517da1 (diff)
downloadpoky-cfbb1d2cc01565610ba06a755e10425ff2076d9b.tar.gz
bitbake: hashserv: Add SQLalchemy backend
Adds an SQLAlchemy backend to the server. While this database backend is slower than the more direct sqlite backend, it easily supports just about any SQL server, which is useful for large scale deployments. (Bitbake rev: e0b73466dd7478c77c82f46879246c1b68b228c0) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/sqlalchemy.py')
-rw-r--r--bitbake/lib/hashserv/sqlalchemy.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/bitbake/lib/hashserv/sqlalchemy.py b/bitbake/lib/hashserv/sqlalchemy.py
new file mode 100644
index 0000000000..3216621f9d
--- /dev/null
+++ b/bitbake/lib/hashserv/sqlalchemy.py
@@ -0,0 +1,304 @@
1#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import logging
9from datetime import datetime
10
11from sqlalchemy.ext.asyncio import create_async_engine
12from sqlalchemy.pool import NullPool
13from sqlalchemy import (
14 MetaData,
15 Column,
16 Table,
17 Text,
18 Integer,
19 UniqueConstraint,
20 DateTime,
21 Index,
22 select,
23 insert,
24 exists,
25 literal,
26 and_,
27 delete,
28)
29import sqlalchemy.engine
30from sqlalchemy.orm import declarative_base
31from sqlalchemy.exc import IntegrityError
32
33logger = logging.getLogger("hashserv.sqlalchemy")
34
35Base = declarative_base()
36
37
38class UnihashesV2(Base):
39 __tablename__ = "unihashes_v2"
40 id = Column(Integer, primary_key=True, autoincrement=True)
41 method = Column(Text, nullable=False)
42 taskhash = Column(Text, nullable=False)
43 unihash = Column(Text, nullable=False)
44
45 __table_args__ = (
46 UniqueConstraint("method", "taskhash"),
47 Index("taskhash_lookup_v3", "method", "taskhash"),
48 )
49
50
51class OuthashesV2(Base):
52 __tablename__ = "outhashes_v2"
53 id = Column(Integer, primary_key=True, autoincrement=True)
54 method = Column(Text, nullable=False)
55 taskhash = Column(Text, nullable=False)
56 outhash = Column(Text, nullable=False)
57 created = Column(DateTime)
58 owner = Column(Text)
59 PN = Column(Text)
60 PV = Column(Text)
61 PR = Column(Text)
62 task = Column(Text)
63 outhash_siginfo = Column(Text)
64
65 __table_args__ = (
66 UniqueConstraint("method", "taskhash", "outhash"),
67 Index("outhash_lookup_v3", "method", "outhash"),
68 )
69
70
71class DatabaseEngine(object):
72 def __init__(self, url, username=None, password=None):
73 self.logger = logger
74 self.url = sqlalchemy.engine.make_url(url)
75
76 if username is not None:
77 self.url = self.url.set(username=username)
78
79 if password is not None:
80 self.url = self.url.set(password=password)
81
82 async def create(self):
83 self.logger.info("Using database %s", self.url)
84 self.engine = create_async_engine(self.url, poolclass=NullPool)
85
86 async with self.engine.begin() as conn:
87 # Create tables
88 logger.info("Creating tables...")
89 await conn.run_sync(Base.metadata.create_all)
90
91 def connect(self, logger):
92 return Database(self.engine, logger)
93
94
95def map_row(row):
96 if row is None:
97 return None
98 return dict(**row._mapping)
99
100
101class Database(object):
102 def __init__(self, engine, logger):
103 self.engine = engine
104 self.db = None
105 self.logger = logger
106
107 async def __aenter__(self):
108 self.db = await self.engine.connect()
109 return self
110
111 async def __aexit__(self, exc_type, exc_value, traceback):
112 await self.close()
113
114 async def close(self):
115 await self.db.close()
116 self.db = None
117
118 async def get_unihash_by_taskhash_full(self, method, taskhash):
119 statement = (
120 select(
121 OuthashesV2,
122 UnihashesV2.unihash.label("unihash"),
123 )
124 .join(
125 UnihashesV2,
126 and_(
127 UnihashesV2.method == OuthashesV2.method,
128 UnihashesV2.taskhash == OuthashesV2.taskhash,
129 ),
130 )
131 .where(
132 OuthashesV2.method == method,
133 OuthashesV2.taskhash == taskhash,
134 )
135 .order_by(
136 OuthashesV2.created.asc(),
137 )
138 .limit(1)
139 )
140 self.logger.debug("%s", statement)
141 async with self.db.begin():
142 result = await self.db.execute(statement)
143 return map_row(result.first())
144
145 async def get_unihash_by_outhash(self, method, outhash):
146 statement = (
147 select(OuthashesV2, UnihashesV2.unihash.label("unihash"))
148 .join(
149 UnihashesV2,
150 and_(
151 UnihashesV2.method == OuthashesV2.method,
152 UnihashesV2.taskhash == OuthashesV2.taskhash,
153 ),
154 )
155 .where(
156 OuthashesV2.method == method,
157 OuthashesV2.outhash == outhash,
158 )
159 .order_by(
160 OuthashesV2.created.asc(),
161 )
162 .limit(1)
163 )
164 self.logger.debug("%s", statement)
165 async with self.db.begin():
166 result = await self.db.execute(statement)
167 return map_row(result.first())
168
169 async def get_outhash(self, method, outhash):
170 statement = (
171 select(OuthashesV2)
172 .where(
173 OuthashesV2.method == method,
174 OuthashesV2.outhash == outhash,
175 )
176 .order_by(
177 OuthashesV2.created.asc(),
178 )
179 .limit(1)
180 )
181
182 self.logger.debug("%s", statement)
183 async with self.db.begin():
184 result = await self.db.execute(statement)
185 return map_row(result.first())
186
187 async def get_equivalent_for_outhash(self, method, outhash, taskhash):
188 statement = (
189 select(
190 OuthashesV2.taskhash.label("taskhash"),
191 UnihashesV2.unihash.label("unihash"),
192 )
193 .join(
194 UnihashesV2,
195 and_(
196 UnihashesV2.method == OuthashesV2.method,
197 UnihashesV2.taskhash == OuthashesV2.taskhash,
198 ),
199 )
200 .where(
201 OuthashesV2.method == method,
202 OuthashesV2.outhash == outhash,
203 OuthashesV2.taskhash != taskhash,
204 )
205 .order_by(
206 OuthashesV2.created.asc(),
207 )
208 .limit(1)
209 )
210 self.logger.debug("%s", statement)
211 async with self.db.begin():
212 result = await self.db.execute(statement)
213 return map_row(result.first())
214
215 async def get_equivalent(self, method, taskhash):
216 statement = select(
217 UnihashesV2.unihash,
218 UnihashesV2.method,
219 UnihashesV2.taskhash,
220 ).where(
221 UnihashesV2.method == method,
222 UnihashesV2.taskhash == taskhash,
223 )
224 self.logger.debug("%s", statement)
225 async with self.db.begin():
226 result = await self.db.execute(statement)
227 return map_row(result.first())
228
229 async def remove(self, condition):
230 async def do_remove(table):
231 where = {}
232 for c in table.__table__.columns:
233 if c.key in condition and condition[c.key] is not None:
234 where[c] = condition[c.key]
235
236 if where:
237 statement = delete(table).where(*[(k == v) for k, v in where.items()])
238 self.logger.debug("%s", statement)
239 async with self.db.begin():
240 result = await self.db.execute(statement)
241 return result.rowcount
242
243 return 0
244
245 count = 0
246 count += await do_remove(UnihashesV2)
247 count += await do_remove(OuthashesV2)
248
249 return count
250
251 async def clean_unused(self, oldest):
252 statement = delete(OuthashesV2).where(
253 OuthashesV2.created < oldest,
254 ~(
255 select(UnihashesV2.id)
256 .where(
257 UnihashesV2.method == OuthashesV2.method,
258 UnihashesV2.taskhash == OuthashesV2.taskhash,
259 )
260 .limit(1)
261 .exists()
262 ),
263 )
264 self.logger.debug("%s", statement)
265 async with self.db.begin():
266 result = await self.db.execute(statement)
267 return result.rowcount
268
269 async def insert_unihash(self, method, taskhash, unihash):
270 statement = insert(UnihashesV2).values(
271 method=method,
272 taskhash=taskhash,
273 unihash=unihash,
274 )
275 self.logger.debug("%s", statement)
276 try:
277 async with self.db.begin():
278 await self.db.execute(statement)
279 return True
280 except IntegrityError:
281 logger.debug(
282 "%s, %s, %s already in unihash database", method, taskhash, unihash
283 )
284 return False
285
286 async def insert_outhash(self, data):
287 outhash_columns = set(c.key for c in OuthashesV2.__table__.columns)
288
289 data = {k: v for k, v in data.items() if k in outhash_columns}
290
291 if "created" in data and not isinstance(data["created"], datetime):
292 data["created"] = datetime.fromisoformat(data["created"])
293
294 statement = insert(OuthashesV2).values(**data)
295 self.logger.debug("%s", statement)
296 try:
297 async with self.db.begin():
298 await self.db.execute(statement)
299 return True
300 except IntegrityError:
301 logger.debug(
302 "%s, %s already in outhash database", data["method"], data["outhash"]
303 )
304 return False