summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Opdenacker <michael.opdenacker@bootlin.com>2024-05-11 16:31:31 +0530
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-05-21 14:23:43 +0100
commit65757c9e200cb34b39a87f3a8c2a8f8665a81677 (patch)
treefe5d13e4c96f99f6e176637f9d494c57553f597e
parent4cbce9cdf7d22b0b4fe933867f931019540a6663 (diff)
downloadpoky-65757c9e200cb34b39a87f3a8c2a8f8665a81677.tar.gz
bitbake: prserv: enable database sharing
sqlite3 can allow multiple processes to access the database simultaneously, but it must be opened correctly. The key change is that the database is no longer opened in "exclusive" mode (defaulting to shared mode). In addition, the journal is set to "WAL" mode, as this is the most efficient for dealing with simultaneous access between different processes. In order to keep the database performance, synchronous mode is set to "off". The WAL journal will protect against incomplete transactions in any given client, however the database will not be protected against unexpected power loss from the OS (which is a fine trade off for performance, and also the same as the previous implementation). The use of a database cursor enabled to remove the _execute() wrapper. The cursor automatically makes sure that the query happens in an atomic transaction and commits when finished. This also removes the need for a "dirty" flag for the database and for explicit database syncing, which simplifies the code. (Bitbake rev: 385833243c495dc68ec26a963136c1ced3f272d0) Signed-off-by: Michael Opdenacker <michael.opdenacker@bootlin.com> Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Cc: Tim Orling <ticotimo@gmail.com> Cc: Thomas Petazzoni <thomas.petazzoni@bootlin.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--bitbake/lib/prserv/db.py322
-rw-r--r--bitbake/lib/prserv/serv.py9
2 files changed, 151 insertions, 180 deletions
diff --git a/bitbake/lib/prserv/db.py b/bitbake/lib/prserv/db.py
index b2520f3158..f430586d73 100644
--- a/bitbake/lib/prserv/db.py
+++ b/bitbake/lib/prserv/db.py
@@ -8,21 +8,13 @@ import logging
8import os.path 8import os.path
9import errno 9import errno
10import prserv 10import prserv
11import time 11import sqlite3
12 12
13from contextlib import closing
13from . import increase_revision, revision_greater, revision_smaller 14from . import increase_revision, revision_greater, revision_smaller
14 15
15try:
16 import sqlite3
17except ImportError:
18 from pysqlite2 import dbapi2 as sqlite3
19
20logger = logging.getLogger("BitBake.PRserv") 16logger = logging.getLogger("BitBake.PRserv")
21 17
22sqlversion = sqlite3.sqlite_version_info
23if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3):
24 raise Exception("sqlite3 version 3.3.0 or later is required.")
25
26# 18#
27# "No History" mode - for a given query tuple (version, pkgarch, checksum), 19# "No History" mode - for a given query tuple (version, pkgarch, checksum),
28# the returned value will be the largest among all the values of the same 20# the returned value will be the largest among all the values of the same
@@ -31,40 +23,28 @@ if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3):
31# "History" mode - Return a new higher value for previously unseen query 23# "History" mode - Return a new higher value for previously unseen query
32# tuple (version, pkgarch, checksum), otherwise return historical value. 24# tuple (version, pkgarch, checksum), otherwise return historical value.
33# Value can decrement if returning to a previous build. 25# Value can decrement if returning to a previous build.
34#
35 26
36class PRTable(object): 27class PRTable(object):
37 def __init__(self, conn, table, read_only): 28 def __init__(self, conn, table, read_only):
38 self.conn = conn 29 self.conn = conn
39 self.read_only = read_only 30 self.read_only = read_only
40 self.dirty = False
41 self.table = table 31 self.table = table
42 32
43 if self.read_only: 33 with closing(self.conn.cursor()) as cursor:
44 table_exists = self._execute( 34 if self.read_only:
45 "SELECT count(*) FROM sqlite_master \ 35 table_exists = cursor.execute(
46 WHERE type='table' AND name='%s'" % (self.table)) 36 "SELECT count(*) FROM sqlite_master \
47 if not table_exists: 37 WHERE type='table' AND name='%s'" % (self.table))
48 raise prserv.NotFoundError 38 if not table_exists:
49 else: 39 raise prserv.NotFoundError
50 self._execute("CREATE TABLE IF NOT EXISTS %s \ 40 else:
51 (version TEXT NOT NULL, \ 41 cursor.execute("CREATE TABLE IF NOT EXISTS %s \
52 pkgarch TEXT NOT NULL, \ 42 (version TEXT NOT NULL, \
53 checksum TEXT NOT NULL, \ 43 pkgarch TEXT NOT NULL, \
54 value TEXT, \ 44 checksum TEXT NOT NULL, \
55 PRIMARY KEY (version, pkgarch, checksum, value));" % self.table) 45 value TEXT, \
56 46 PRIMARY KEY (version, pkgarch, checksum, value));" % self.table)
57 def _execute(self, *query): 47 self.conn.commit()
58 """Execute a query, waiting to acquire a lock if necessary"""
59 start = time.time()
60 end = start + 20
61 while True:
62 try:
63 return self.conn.execute(*query)
64 except sqlite3.OperationalError as exc:
65 if "is locked" in str(exc) and end > time.time():
66 continue
67 raise exc
68 48
69 def _extremum_value(self, rows, is_max): 49 def _extremum_value(self, rows, is_max):
70 value = None 50 value = None
@@ -88,49 +68,42 @@ class PRTable(object):
88 def _min_value(self, rows): 68 def _min_value(self, rows):
89 return self._extremum_value(rows, False) 69 return self._extremum_value(rows, False)
90 70
91 def sync(self):
92 if not self.read_only:
93 self.conn.commit()
94 self._execute("BEGIN EXCLUSIVE TRANSACTION")
95
96 def sync_if_dirty(self):
97 if self.dirty:
98 self.sync()
99 self.dirty = False
100
101 def test_package(self, version, pkgarch): 71 def test_package(self, version, pkgarch):
102 """Returns whether the specified package version is found in the database for the specified architecture""" 72 """Returns whether the specified package version is found in the database for the specified architecture"""
103 73
104 # Just returns the value if found or None otherwise 74 # Just returns the value if found or None otherwise
105 data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=?;" % self.table, 75 with closing(self.conn.cursor()) as cursor:
106 (version, pkgarch)) 76 data=cursor.execute("SELECT value FROM %s WHERE version=? AND pkgarch=?;" % self.table,
107 row=data.fetchone() 77 (version, pkgarch))
108 if row is not None: 78 row=data.fetchone()
109 return True 79 if row is not None:
110 else: 80 return True
111 return False 81 else:
82 return False
112 83
113 def test_value(self, version, pkgarch, value): 84 def test_value(self, version, pkgarch, value):
114 """Returns whether the specified value is found in the database for the specified package and architecture""" 85 """Returns whether the specified value is found in the database for the specified package and architecture"""
115 86
116 # Just returns the value if found or None otherwise 87 # Just returns the value if found or None otherwise
117 data=self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? and value=?;" % self.table, 88 with closing(self.conn.cursor()) as cursor:
118 (version, pkgarch, value)) 89 data=cursor.execute("SELECT value FROM %s WHERE version=? AND pkgarch=? and value=?;" % self.table,
119 row=data.fetchone() 90 (version, pkgarch, value))
120 if row is not None: 91 row=data.fetchone()
121 return True 92 if row is not None:
122 else: 93 return True
123 return False 94 else:
95 return False
124 96
125 97
126 def find_package_max_value(self, version, pkgarch): 98 def find_package_max_value(self, version, pkgarch):
127 """Returns the greatest value for (version, pkgarch), or None if not found. Doesn't create a new value""" 99 """Returns the greatest value for (version, pkgarch), or None if not found. Doesn't create a new value"""
128 100
129 data = self._execute("SELECT value FROM %s where version=? AND pkgarch=?;" % (self.table), 101 with closing(self.conn.cursor()) as cursor:
130 (version, pkgarch)) 102 data = cursor.execute("SELECT value FROM %s where version=? AND pkgarch=?;" % (self.table),
131 rows = data.fetchall() 103 (version, pkgarch))
132 value = self._max_value(rows) 104 rows = data.fetchall()
133 return value 105 value = self._max_value(rows)
106 return value
134 107
135 def find_value(self, version, pkgarch, checksum, history=False): 108 def find_value(self, version, pkgarch, checksum, history=False):
136 """Returns the value for the specified checksum if found or None otherwise.""" 109 """Returns the value for the specified checksum if found or None otherwise."""
@@ -145,10 +118,11 @@ class PRTable(object):
145 """Returns the maximum (if is_max is True) or minimum (if is_max is False) value 118 """Returns the maximum (if is_max is True) or minimum (if is_max is False) value
146 for (version, pkgarch, checksum), or None if not found. Doesn't create a new value""" 119 for (version, pkgarch, checksum), or None if not found. Doesn't create a new value"""
147 120
148 data = self._execute("SELECT value FROM %s where version=? AND pkgarch=? AND checksum=?;" % (self.table), 121 with closing(self.conn.cursor()) as cursor:
149 (version, pkgarch, checksum)) 122 data = cursor.execute("SELECT value FROM %s where version=? AND pkgarch=? AND checksum=?;" % (self.table),
150 rows = data.fetchall() 123 (version, pkgarch, checksum))
151 return self._extremum_value(rows, is_max) 124 rows = data.fetchall()
125 return self._extremum_value(rows, is_max)
152 126
153 def find_max_value(self, version, pkgarch, checksum): 127 def find_max_value(self, version, pkgarch, checksum):
154 return self._find_extremum_value(version, pkgarch, checksum, True) 128 return self._find_extremum_value(version, pkgarch, checksum, True)
@@ -160,26 +134,27 @@ class PRTable(object):
160 """Take and increase the greatest "<base>.y" value for (version, pkgarch), or return "<base>.0" if not found. 134 """Take and increase the greatest "<base>.y" value for (version, pkgarch), or return "<base>.0" if not found.
161 This doesn't store a new value.""" 135 This doesn't store a new value."""
162 136
163 data = self._execute("SELECT value FROM %s where version=? AND pkgarch=? AND value LIKE '%s.%%';" % (self.table, base), 137 with closing(self.conn.cursor()) as cursor:
164 (version, pkgarch)) 138 data = cursor.execute("SELECT value FROM %s where version=? AND pkgarch=? AND value LIKE '%s.%%';" % (self.table, base),
165 rows = data.fetchall() 139 (version, pkgarch))
166 value = self._max_value(rows) 140 rows = data.fetchall()
141 value = self._max_value(rows)
167 142
168 if value is not None: 143 if value is not None:
169 return increase_revision(value) 144 return increase_revision(value)
170 else: 145 else:
171 return base + ".0" 146 return base + ".0"
172 147
173 def store_value(self, version, pkgarch, checksum, value): 148 def store_value(self, version, pkgarch, checksum, value):
174 """Store new value in the database""" 149 """Store new value in the database"""
175 150
176 try: 151 with closing(self.conn.cursor()) as cursor:
177 self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table), 152 try:
178 (version, pkgarch, checksum, value)) 153 cursor.execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
179 except sqlite3.IntegrityError as exc: 154 (version, pkgarch, checksum, value))
180 logger.error(str(exc)) 155 except sqlite3.IntegrityError as exc:
181 156 logger.error(str(exc))
182 self.dirty = True 157 self.conn.commit()
183 158
184 def _get_value(self, version, pkgarch, checksum, history): 159 def _get_value(self, version, pkgarch, checksum, history):
185 160
@@ -215,54 +190,56 @@ class PRTable(object):
215 return None 190 return None
216 191
217 val = None 192 val = None
218 data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, 193 with closing(self.conn.cursor()) as cursor:
194 data = cursor.execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
219 (version, pkgarch, checksum)) 195 (version, pkgarch, checksum))
220 row = data.fetchone() 196 row = data.fetchone()
221 if row is not None: 197 if row is not None:
222 val=row[0] 198 val=row[0]
223 else: 199 else:
224 #no value found, try to insert 200 #no value found, try to insert
225 try: 201 try:
226 self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table), 202 cursor.execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
227 (version, pkgarch, checksum, value)) 203 (version, pkgarch, checksum, value))
228 except sqlite3.IntegrityError as exc: 204 except sqlite3.IntegrityError as exc:
229 logger.error(str(exc)) 205 logger.error(str(exc))
230 206
231 self.dirty = True 207 self.conn.commit()
232 208
233 data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, 209 data = cursor.execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table,
234 (version, pkgarch, checksum)) 210 (version, pkgarch, checksum))
235 row = data.fetchone() 211 row = data.fetchone()
236 if row is not None: 212 if row is not None:
237 val = row[0] 213 val = row[0]
238 return val 214 return val
239 215
240 def _import_no_hist(self, version, pkgarch, checksum, value): 216 def _import_no_hist(self, version, pkgarch, checksum, value):
241 if self.read_only: 217 if self.read_only:
242 return None 218 return None
243 219
244 try: 220 with closing(self.conn.cursor()) as cursor:
245 #try to insert
246 self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
247 (version, pkgarch, checksum, value))
248 except sqlite3.IntegrityError as exc:
249 #already have the record, try to update
250 try: 221 try:
251 self._execute("UPDATE %s SET value=? WHERE version=? AND pkgarch=? AND checksum=? AND value<?" 222 #try to insert
252 % (self.table), 223 cursor.execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table),
253 (value, version, pkgarch, checksum, value)) 224 (version, pkgarch, checksum, value))
254 except sqlite3.IntegrityError as exc: 225 except sqlite3.IntegrityError as exc:
255 logger.error(str(exc)) 226 #already have the record, try to update
227 try:
228 cursor.execute("UPDATE %s SET value=? WHERE version=? AND pkgarch=? AND checksum=? AND value<?"
229 % (self.table),
230 (value, version, pkgarch, checksum, value))
231 except sqlite3.IntegrityError as exc:
232 logger.error(str(exc))
256 233
257 self.dirty = True 234 self.conn.commit()
258 235
259 data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=? AND value>=?;" % self.table, 236 data = cursor.execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=? AND value>=?;" % self.table,
260 (version, pkgarch, checksum, value)) 237 (version, pkgarch, checksum, value))
261 row=data.fetchone() 238 row=data.fetchone()
262 if row is not None: 239 if row is not None:
263 return row[0] 240 return row[0]
264 else: 241 else:
265 return None 242 return None
266 243
267 def importone(self, version, pkgarch, checksum, value, history=False): 244 def importone(self, version, pkgarch, checksum, value, history=False):
268 if history: 245 if history:
@@ -272,56 +249,57 @@ class PRTable(object):
272 249
273 def export(self, version, pkgarch, checksum, colinfo, history=False): 250 def export(self, version, pkgarch, checksum, colinfo, history=False):
274 metainfo = {} 251 metainfo = {}
275 #column info 252 with closing(self.conn.cursor()) as cursor:
276 if colinfo: 253 #column info
277 metainfo["tbl_name"] = self.table 254 if colinfo:
278 metainfo["core_ver"] = prserv.__version__ 255 metainfo["tbl_name"] = self.table
279 metainfo["col_info"] = [] 256 metainfo["core_ver"] = prserv.__version__
280 data = self._execute("PRAGMA table_info(%s);" % self.table) 257 metainfo["col_info"] = []
258 data = cursor.execute("PRAGMA table_info(%s);" % self.table)
259 for row in data:
260 col = {}
261 col["name"] = row["name"]
262 col["type"] = row["type"]
263 col["notnull"] = row["notnull"]
264 col["dflt_value"] = row["dflt_value"]
265 col["pk"] = row["pk"]
266 metainfo["col_info"].append(col)
267
268 #data info
269 datainfo = []
270
271 if history:
272 sqlstmt = "SELECT * FROM %s as T1 WHERE 1=1 " % self.table
273 else:
274 sqlstmt = "SELECT T1.version, T1.pkgarch, T1.checksum, T1.value FROM %s as T1, \
275 (SELECT version, pkgarch, max(value) as maxvalue FROM %s GROUP BY version, pkgarch) as T2 \
276 WHERE T1.version=T2.version AND T1.pkgarch=T2.pkgarch AND T1.value=T2.maxvalue " % (self.table, self.table)
277 sqlarg = []
278 where = ""
279 if version:
280 where += "AND T1.version=? "
281 sqlarg.append(str(version))
282 if pkgarch:
283 where += "AND T1.pkgarch=? "
284 sqlarg.append(str(pkgarch))
285 if checksum:
286 where += "AND T1.checksum=? "
287 sqlarg.append(str(checksum))
288
289 sqlstmt += where + ";"
290
291 if len(sqlarg):
292 data = cursor.execute(sqlstmt, tuple(sqlarg))
293 else:
294 data = cursor.execute(sqlstmt)
281 for row in data: 295 for row in data:
282 col = {} 296 if row["version"]:
283 col["name"] = row["name"] 297 col = {}
284 col["type"] = row["type"] 298 col["version"] = row["version"]
285 col["notnull"] = row["notnull"] 299 col["pkgarch"] = row["pkgarch"]
286 col["dflt_value"] = row["dflt_value"] 300 col["checksum"] = row["checksum"]
287 col["pk"] = row["pk"] 301 col["value"] = row["value"]
288 metainfo["col_info"].append(col) 302 datainfo.append(col)
289
290 #data info
291 datainfo = []
292
293 if history:
294 sqlstmt = "SELECT * FROM %s as T1 WHERE 1=1 " % self.table
295 else:
296 sqlstmt = "SELECT T1.version, T1.pkgarch, T1.checksum, T1.value FROM %s as T1, \
297 (SELECT version, pkgarch, max(value) as maxvalue FROM %s GROUP BY version, pkgarch) as T2 \
298 WHERE T1.version=T2.version AND T1.pkgarch=T2.pkgarch AND T1.value=T2.maxvalue " % (self.table, self.table)
299 sqlarg = []
300 where = ""
301 if version:
302 where += "AND T1.version=? "
303 sqlarg.append(str(version))
304 if pkgarch:
305 where += "AND T1.pkgarch=? "
306 sqlarg.append(str(pkgarch))
307 if checksum:
308 where += "AND T1.checksum=? "
309 sqlarg.append(str(checksum))
310
311 sqlstmt += where + ";"
312
313 if len(sqlarg):
314 data = self._execute(sqlstmt, tuple(sqlarg))
315 else:
316 data = self._execute(sqlstmt)
317 for row in data:
318 if row["version"]:
319 col = {}
320 col["version"] = row["version"]
321 col["pkgarch"] = row["pkgarch"]
322 col["checksum"] = row["checksum"]
323 col["value"] = row["value"]
324 datainfo.append(col)
325 return (metainfo, datainfo) 303 return (metainfo, datainfo)
326 304
327 def dump_db(self, fd): 305 def dump_db(self, fd):
@@ -345,14 +323,15 @@ class PRData(object):
345 raise e 323 raise e
346 uri = "file:%s%s" % (self.filename, "?mode=ro" if self.read_only else "") 324 uri = "file:%s%s" % (self.filename, "?mode=ro" if self.read_only else "")
347 logger.debug("Opening PRServ database '%s'" % (uri)) 325 logger.debug("Opening PRServ database '%s'" % (uri))
348 self.connection=sqlite3.connect(uri, uri=True, isolation_level="EXCLUSIVE", check_same_thread = False) 326 self.connection=sqlite3.connect(uri, uri=True)
349 self.connection.row_factory=sqlite3.Row 327 self.connection.row_factory=sqlite3.Row
350 if not self.read_only: 328 self.connection.execute("PRAGMA synchronous = OFF;")
351 self.connection.execute("pragma synchronous = off;") 329 self.connection.execute("PRAGMA journal_mode = WAL;")
352 self.connection.execute("PRAGMA journal_mode = MEMORY;") 330 self.connection.commit()
353 self._tables={} 331 self._tables={}
354 332
355 def disconnect(self): 333 def disconnect(self):
334 self.connection.commit()
356 self.connection.close() 335 self.connection.close()
357 336
358 def __getitem__(self, tblname): 337 def __getitem__(self, tblname):
@@ -370,3 +349,4 @@ class PRData(object):
370 del self._tables[tblname] 349 del self._tables[tblname]
371 logger.info("drop table %s" % (tblname)) 350 logger.info("drop table %s" % (tblname))
372 self.connection.execute("DROP TABLE IF EXISTS %s;" % tblname) 351 self.connection.execute("DROP TABLE IF EXISTS %s;" % tblname)
352 self.connection.commit()
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index 05573d06cc..3992056f88 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -42,10 +42,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
42 try: 42 try:
43 return await super().dispatch_message(msg) 43 return await super().dispatch_message(msg)
44 except: 44 except:
45 self.server.table.sync()
46 raise 45 raise
47 else:
48 self.server.table.sync_if_dirty()
49 46
50 async def handle_test_pr(self, request): 47 async def handle_test_pr(self, request):
51 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' 48 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
@@ -233,15 +230,9 @@ class PRServer(bb.asyncrpc.AsyncServer):
233 return tasks 230 return tasks
234 231
235 async def stop(self): 232 async def stop(self):
236 self.table.sync_if_dirty()
237 self.db.disconnect() 233 self.db.disconnect()
238 await super().stop() 234 await super().stop()
239 235
240 def signal_handler(self):
241 super().signal_handler()
242 if self.table:
243 self.table.sync()
244
245class PRServSingleton(object): 236class PRServSingleton(object):
246 def __init__(self, dbfile, logfile, host, port, upstream): 237 def __init__(self, dbfile, logfile, host, port, upstream):
247 self.dbfile = dbfile 238 self.dbfile = dbfile