diff options
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 90 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 491 | ||||
-rw-r--r-- | bitbake/lib/hashserv/sqlite.py | 259 |
3 files changed, 439 insertions, 401 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 56b9c6bc82..90d8cff15f 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -6,7 +6,6 @@ | |||
6 | import asyncio | 6 | import asyncio |
7 | from contextlib import closing | 7 | from contextlib import closing |
8 | import re | 8 | import re |
9 | import sqlite3 | ||
10 | import itertools | 9 | import itertools |
11 | import json | 10 | import json |
12 | from urllib.parse import urlparse | 11 | from urllib.parse import urlparse |
@@ -19,92 +18,34 @@ ADDR_TYPE_UNIX = 0 | |||
19 | ADDR_TYPE_TCP = 1 | 18 | ADDR_TYPE_TCP = 1 |
20 | ADDR_TYPE_WS = 2 | 19 | ADDR_TYPE_WS = 2 |
21 | 20 | ||
22 | UNIHASH_TABLE_DEFINITION = ( | ||
23 | ("method", "TEXT NOT NULL", "UNIQUE"), | ||
24 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | ||
25 | ("unihash", "TEXT NOT NULL", ""), | ||
26 | ) | ||
27 | |||
28 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) | ||
29 | |||
30 | OUTHASH_TABLE_DEFINITION = ( | ||
31 | ("method", "TEXT NOT NULL", "UNIQUE"), | ||
32 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | ||
33 | ("outhash", "TEXT NOT NULL", "UNIQUE"), | ||
34 | ("created", "DATETIME", ""), | ||
35 | |||
36 | # Optional fields | ||
37 | ("owner", "TEXT", ""), | ||
38 | ("PN", "TEXT", ""), | ||
39 | ("PV", "TEXT", ""), | ||
40 | ("PR", "TEXT", ""), | ||
41 | ("task", "TEXT", ""), | ||
42 | ("outhash_siginfo", "TEXT", ""), | ||
43 | ) | ||
44 | |||
45 | OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) | ||
46 | |||
47 | def _make_table(cursor, name, definition): | ||
48 | cursor.execute(''' | ||
49 | CREATE TABLE IF NOT EXISTS {name} ( | ||
50 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
51 | {fields} | ||
52 | UNIQUE({unique}) | ||
53 | ) | ||
54 | '''.format( | ||
55 | name=name, | ||
56 | fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), | ||
57 | unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags) | ||
58 | )) | ||
59 | |||
60 | |||
61 | def setup_database(database, sync=True): | ||
62 | db = sqlite3.connect(database) | ||
63 | db.row_factory = sqlite3.Row | ||
64 | |||
65 | with closing(db.cursor()) as cursor: | ||
66 | _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) | ||
67 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) | ||
68 | |||
69 | cursor.execute('PRAGMA journal_mode = WAL') | ||
70 | cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) | ||
71 | |||
72 | # Drop old indexes | ||
73 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') | ||
74 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup') | ||
75 | cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2') | ||
76 | cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2') | ||
77 | |||
78 | # TODO: Upgrade from tasks_v2? | ||
79 | cursor.execute('DROP TABLE IF EXISTS tasks_v2') | ||
80 | |||
81 | # Create new indexes | ||
82 | cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)') | ||
83 | cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)') | ||
84 | |||
85 | return db | ||
86 | |||
87 | 21 | ||
88 | def parse_address(addr): | 22 | def parse_address(addr): |
89 | if addr.startswith(UNIX_PREFIX): | 23 | if addr.startswith(UNIX_PREFIX): |
90 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) | 24 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) |
91 | elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): | 25 | elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): |
92 | return (ADDR_TYPE_WS, (addr,)) | 26 | return (ADDR_TYPE_WS, (addr,)) |
93 | else: | 27 | else: |
94 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) | 28 | m = re.match(r"\[(?P<host>[^\]]*)\]:(?P<port>\d+)$", addr) |
95 | if m is not None: | 29 | if m is not None: |
96 | host = m.group('host') | 30 | host = m.group("host") |
97 | port = m.group('port') | 31 | port = m.group("port") |
98 | else: | 32 | else: |
99 | host, port = addr.split(':') | 33 | host, port = addr.split(":") |
100 | 34 | ||
101 | return (ADDR_TYPE_TCP, (host, int(port))) | 35 | return (ADDR_TYPE_TCP, (host, int(port))) |
102 | 36 | ||
103 | 37 | ||
104 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 38 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): |
39 | def sqlite_engine(): | ||
40 | from .sqlite import DatabaseEngine | ||
41 | |||
42 | return DatabaseEngine(dbname, sync) | ||
43 | |||
105 | from . import server | 44 | from . import server |
106 | db = setup_database(dbname, sync=sync) | 45 | |
107 | s = server.Server(db, upstream=upstream, read_only=read_only) | 46 | db_engine = sqlite_engine() |
47 | |||
48 | s = server.Server(db_engine, upstream=upstream, read_only=read_only) | ||
108 | 49 | ||
109 | (typ, a) = parse_address(addr) | 50 | (typ, a) = parse_address(addr) |
110 | if typ == ADDR_TYPE_UNIX: | 51 | if typ == ADDR_TYPE_UNIX: |
@@ -120,6 +61,7 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | |||
120 | 61 | ||
121 | def create_client(addr): | 62 | def create_client(addr): |
122 | from . import client | 63 | from . import client |
64 | |||
123 | c = client.Client() | 65 | c = client.Client() |
124 | 66 | ||
125 | (typ, a) = parse_address(addr) | 67 | (typ, a) = parse_address(addr) |
@@ -132,8 +74,10 @@ def create_client(addr): | |||
132 | 74 | ||
133 | return c | 75 | return c |
134 | 76 | ||
77 | |||
135 | async def create_async_client(addr): | 78 | async def create_async_client(addr): |
136 | from . import client | 79 | from . import client |
80 | |||
137 | c = client.AsyncClient() | 81 | c = client.AsyncClient() |
138 | 82 | ||
139 | (typ, a) = parse_address(addr) | 83 | (typ, a) = parse_address(addr) |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index e6a3f40577..84cf4f2283 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -3,18 +3,16 @@ | |||
3 | # SPDX-License-Identifier: GPL-2.0-only | 3 | # SPDX-License-Identifier: GPL-2.0-only |
4 | # | 4 | # |
5 | 5 | ||
6 | from contextlib import closing, contextmanager | ||
7 | from datetime import datetime, timedelta | 6 | from datetime import datetime, timedelta |
8 | import enum | ||
9 | import asyncio | 7 | import asyncio |
10 | import logging | 8 | import logging |
11 | import math | 9 | import math |
12 | import time | 10 | import time |
13 | from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS | 11 | from . import create_async_client |
14 | import bb.asyncrpc | 12 | import bb.asyncrpc |
15 | 13 | ||
16 | 14 | ||
17 | logger = logging.getLogger('hashserv.server') | 15 | logger = logging.getLogger("hashserv.server") |
18 | 16 | ||
19 | 17 | ||
20 | class Measurement(object): | 18 | class Measurement(object): |
@@ -104,229 +102,136 @@ class Stats(object): | |||
104 | return math.sqrt(self.s / (self.num - 1)) | 102 | return math.sqrt(self.s / (self.num - 1)) |
105 | 103 | ||
106 | def todict(self): | 104 | def todict(self): |
107 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} | 105 | return { |
108 | 106 | k: getattr(self, k) | |
109 | 107 | for k in ("num", "total_time", "max_time", "average", "stdev") | |
110 | @enum.unique | 108 | } |
111 | class Resolve(enum.Enum): | ||
112 | FAIL = enum.auto() | ||
113 | IGNORE = enum.auto() | ||
114 | REPLACE = enum.auto() | ||
115 | |||
116 | |||
117 | def insert_table(cursor, table, data, on_conflict): | ||
118 | resolve = { | ||
119 | Resolve.FAIL: "", | ||
120 | Resolve.IGNORE: " OR IGNORE", | ||
121 | Resolve.REPLACE: " OR REPLACE", | ||
122 | }[on_conflict] | ||
123 | |||
124 | keys = sorted(data.keys()) | ||
125 | query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( | ||
126 | resolve=resolve, | ||
127 | table=table, | ||
128 | fields=", ".join(keys), | ||
129 | values=", ".join(":" + k for k in keys), | ||
130 | ) | ||
131 | prevrowid = cursor.lastrowid | ||
132 | cursor.execute(query, data) | ||
133 | logging.debug( | ||
134 | "Inserting %r into %s, %s", | ||
135 | data, | ||
136 | table, | ||
137 | on_conflict | ||
138 | ) | ||
139 | return (cursor.lastrowid, cursor.lastrowid != prevrowid) | ||
140 | |||
141 | def insert_unihash(cursor, data, on_conflict): | ||
142 | return insert_table(cursor, "unihashes_v2", data, on_conflict) | ||
143 | |||
144 | def insert_outhash(cursor, data, on_conflict): | ||
145 | return insert_table(cursor, "outhashes_v2", data, on_conflict) | ||
146 | |||
147 | async def copy_unihash_from_upstream(client, db, method, taskhash): | ||
148 | d = await client.get_taskhash(method, taskhash) | ||
149 | if d is not None: | ||
150 | with closing(db.cursor()) as cursor: | ||
151 | insert_unihash( | ||
152 | cursor, | ||
153 | {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
154 | Resolve.IGNORE, | ||
155 | ) | ||
156 | db.commit() | ||
157 | return d | ||
158 | |||
159 | |||
160 | class ServerCursor(object): | ||
161 | def __init__(self, db, cursor, upstream): | ||
162 | self.db = db | ||
163 | self.cursor = cursor | ||
164 | self.upstream = upstream | ||
165 | 109 | ||
166 | 110 | ||
167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 111 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): | 112 | def __init__( |
169 | super().__init__(socket, 'OEHASHEQUIV', logger) | 113 | self, |
170 | self.db = db | 114 | socket, |
115 | db_engine, | ||
116 | request_stats, | ||
117 | backfill_queue, | ||
118 | upstream, | ||
119 | read_only, | ||
120 | ): | ||
121 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
122 | self.db_engine = db_engine | ||
171 | self.request_stats = request_stats | 123 | self.request_stats = request_stats |
172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 124 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
173 | self.backfill_queue = backfill_queue | 125 | self.backfill_queue = backfill_queue |
174 | self.upstream = upstream | 126 | self.upstream = upstream |
175 | 127 | ||
176 | self.handlers.update({ | 128 | self.handlers.update( |
177 | 'get': self.handle_get, | 129 | { |
178 | 'get-outhash': self.handle_get_outhash, | 130 | "get": self.handle_get, |
179 | 'get-stream': self.handle_get_stream, | 131 | "get-outhash": self.handle_get_outhash, |
180 | 'get-stats': self.handle_get_stats, | 132 | "get-stream": self.handle_get_stream, |
181 | }) | 133 | "get-stats": self.handle_get_stats, |
134 | } | ||
135 | ) | ||
182 | 136 | ||
183 | if not read_only: | 137 | if not read_only: |
184 | self.handlers.update({ | 138 | self.handlers.update( |
185 | 'report': self.handle_report, | 139 | { |
186 | 'report-equiv': self.handle_equivreport, | 140 | "report": self.handle_report, |
187 | 'reset-stats': self.handle_reset_stats, | 141 | "report-equiv": self.handle_equivreport, |
188 | 'backfill-wait': self.handle_backfill_wait, | 142 | "reset-stats": self.handle_reset_stats, |
189 | 'remove': self.handle_remove, | 143 | "backfill-wait": self.handle_backfill_wait, |
190 | 'clean-unused': self.handle_clean_unused, | 144 | "remove": self.handle_remove, |
191 | }) | 145 | "clean-unused": self.handle_clean_unused, |
146 | } | ||
147 | ) | ||
192 | 148 | ||
193 | def validate_proto_version(self): | 149 | def validate_proto_version(self): |
194 | return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) | 150 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
195 | 151 | ||
196 | async def process_requests(self): | 152 | async def process_requests(self): |
197 | if self.upstream is not None: | 153 | async with self.db_engine.connect(self.logger) as db: |
198 | self.upstream_client = await create_async_client(self.upstream) | 154 | self.db = db |
199 | else: | 155 | if self.upstream is not None: |
200 | self.upstream_client = None | 156 | self.upstream_client = await create_async_client(self.upstream) |
201 | 157 | else: | |
202 | await super().process_requests() | 158 | self.upstream_client = None |
203 | 159 | ||
204 | if self.upstream_client is not None: | 160 | try: |
205 | await self.upstream_client.close() | 161 | await super().process_requests() |
162 | finally: | ||
163 | if self.upstream_client is not None: | ||
164 | await self.upstream_client.close() | ||
206 | 165 | ||
207 | async def dispatch_message(self, msg): | 166 | async def dispatch_message(self, msg): |
208 | for k in self.handlers.keys(): | 167 | for k in self.handlers.keys(): |
209 | if k in msg: | 168 | if k in msg: |
210 | self.logger.debug('Handling %s' % k) | 169 | self.logger.debug("Handling %s" % k) |
211 | if 'stream' in k: | 170 | if "stream" in k: |
212 | return await self.handlers[k](msg[k]) | 171 | return await self.handlers[k](msg[k]) |
213 | else: | 172 | else: |
214 | with self.request_stats.start_sample() as self.request_sample, \ | 173 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
215 | self.request_sample.measure(): | ||
216 | return await self.handlers[k](msg[k]) | 174 | return await self.handlers[k](msg[k]) |
217 | 175 | ||
218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 176 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
219 | 177 | ||
220 | async def handle_get(self, request): | 178 | async def handle_get(self, request): |
221 | method = request['method'] | 179 | method = request["method"] |
222 | taskhash = request['taskhash'] | 180 | taskhash = request["taskhash"] |
223 | fetch_all = request.get('all', False) | 181 | fetch_all = request.get("all", False) |
224 | 182 | ||
225 | with closing(self.db.cursor()) as cursor: | 183 | return await self.get_unihash(method, taskhash, fetch_all) |
226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) | ||
227 | 184 | ||
228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 185 | async def get_unihash(self, method, taskhash, fetch_all=False): |
229 | d = None | 186 | d = None |
230 | 187 | ||
231 | if fetch_all: | 188 | if fetch_all: |
232 | cursor.execute( | 189 | row = await self.db.get_unihash_by_taskhash_full(method, taskhash) |
233 | ''' | ||
234 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
235 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
236 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | ||
237 | ORDER BY outhashes_v2.created ASC | ||
238 | LIMIT 1 | ||
239 | ''', | ||
240 | { | ||
241 | 'method': method, | ||
242 | 'taskhash': taskhash, | ||
243 | } | ||
244 | |||
245 | ) | ||
246 | row = cursor.fetchone() | ||
247 | |||
248 | if row is not None: | 190 | if row is not None: |
249 | d = {k: row[k] for k in row.keys()} | 191 | d = {k: row[k] for k in row.keys()} |
250 | elif self.upstream_client is not None: | 192 | elif self.upstream_client is not None: |
251 | d = await self.upstream_client.get_taskhash(method, taskhash, True) | 193 | d = await self.upstream_client.get_taskhash(method, taskhash, True) |
252 | self.update_unified(cursor, d) | 194 | await self.update_unified(d) |
253 | self.db.commit() | ||
254 | else: | 195 | else: |
255 | row = self.query_equivalent(cursor, method, taskhash) | 196 | row = await self.db.get_equivalent(method, taskhash) |
256 | 197 | ||
257 | if row is not None: | 198 | if row is not None: |
258 | d = {k: row[k] for k in row.keys()} | 199 | d = {k: row[k] for k in row.keys()} |
259 | elif self.upstream_client is not None: | 200 | elif self.upstream_client is not None: |
260 | d = await self.upstream_client.get_taskhash(method, taskhash) | 201 | d = await self.upstream_client.get_taskhash(method, taskhash) |
261 | d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} | 202 | await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) |
262 | insert_unihash(cursor, d, Resolve.IGNORE) | ||
263 | self.db.commit() | ||
264 | 203 | ||
265 | return d | 204 | return d |
266 | 205 | ||
267 | async def handle_get_outhash(self, request): | 206 | async def handle_get_outhash(self, request): |
268 | method = request['method'] | 207 | method = request["method"] |
269 | outhash = request['outhash'] | 208 | outhash = request["outhash"] |
270 | taskhash = request['taskhash'] | 209 | taskhash = request["taskhash"] |
271 | with_unihash = request.get("with_unihash", True) | 210 | with_unihash = request.get("with_unihash", True) |
272 | 211 | ||
273 | with closing(self.db.cursor()) as cursor: | 212 | return await self.get_outhash(method, outhash, taskhash, with_unihash) |
274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | ||
275 | 213 | ||
276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 214 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
277 | d = None | 215 | d = None |
278 | if with_unihash: | 216 | if with_unihash: |
279 | cursor.execute( | 217 | row = await self.db.get_unihash_by_outhash(method, outhash) |
280 | ''' | ||
281 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
282 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
283 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
284 | ORDER BY outhashes_v2.created ASC | ||
285 | LIMIT 1 | ||
286 | ''', | ||
287 | { | ||
288 | 'method': method, | ||
289 | 'outhash': outhash, | ||
290 | } | ||
291 | ) | ||
292 | else: | 218 | else: |
293 | cursor.execute( | 219 | row = await self.db.get_outhash(method, outhash) |
294 | """ | ||
295 | SELECT * FROM outhashes_v2 | ||
296 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
297 | ORDER BY outhashes_v2.created ASC | ||
298 | LIMIT 1 | ||
299 | """, | ||
300 | { | ||
301 | 'method': method, | ||
302 | 'outhash': outhash, | ||
303 | } | ||
304 | ) | ||
305 | row = cursor.fetchone() | ||
306 | 220 | ||
307 | if row is not None: | 221 | if row is not None: |
308 | d = {k: row[k] for k in row.keys()} | 222 | d = {k: row[k] for k in row.keys()} |
309 | elif self.upstream_client is not None: | 223 | elif self.upstream_client is not None: |
310 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) | 224 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) |
311 | self.update_unified(cursor, d) | 225 | await self.update_unified(d) |
312 | self.db.commit() | ||
313 | 226 | ||
314 | return d | 227 | return d |
315 | 228 | ||
316 | def update_unified(self, cursor, data): | 229 | async def update_unified(self, data): |
317 | if data is None: | 230 | if data is None: |
318 | return | 231 | return |
319 | 232 | ||
320 | insert_unihash( | 233 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
321 | cursor, | 234 | await self.db.insert_outhash(data) |
322 | {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
323 | Resolve.IGNORE | ||
324 | ) | ||
325 | insert_outhash( | ||
326 | cursor, | ||
327 | {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, | ||
328 | Resolve.IGNORE | ||
329 | ) | ||
330 | 235 | ||
331 | async def handle_get_stream(self, request): | 236 | async def handle_get_stream(self, request): |
332 | await self.socket.send_message("ok") | 237 | await self.socket.send_message("ok") |
@@ -347,20 +252,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
347 | request_measure = self.request_sample.measure() | 252 | request_measure = self.request_sample.measure() |
348 | request_measure.start() | 253 | request_measure.start() |
349 | 254 | ||
350 | if l == 'END': | 255 | if l == "END": |
351 | break | 256 | break |
352 | 257 | ||
353 | (method, taskhash) = l.split() | 258 | (method, taskhash) = l.split() |
354 | #self.logger.debug('Looking up %s %s' % (method, taskhash)) | 259 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) |
355 | cursor = self.db.cursor() | 260 | row = await self.db.get_equivalent(method, taskhash) |
356 | try: | ||
357 | row = self.query_equivalent(cursor, method, taskhash) | ||
358 | finally: | ||
359 | cursor.close() | ||
360 | 261 | ||
361 | if row is not None: | 262 | if row is not None: |
362 | msg = row['unihash'] | 263 | msg = row["unihash"] |
363 | #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 264 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
364 | elif self.upstream_client is not None: | 265 | elif self.upstream_client is not None: |
365 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 266 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
366 | if upstream: | 267 | if upstream: |
@@ -384,118 +285,81 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
384 | return self.NO_RESPONSE | 285 | return self.NO_RESPONSE |
385 | 286 | ||
386 | async def handle_report(self, data): | 287 | async def handle_report(self, data): |
387 | with closing(self.db.cursor()) as cursor: | 288 | outhash_data = { |
388 | outhash_data = { | 289 | "method": data["method"], |
389 | 'method': data['method'], | 290 | "outhash": data["outhash"], |
390 | 'outhash': data['outhash'], | 291 | "taskhash": data["taskhash"], |
391 | 'taskhash': data['taskhash'], | 292 | "created": datetime.now(), |
392 | 'created': datetime.now() | 293 | } |
393 | } | ||
394 | 294 | ||
395 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | 295 | for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"): |
396 | if k in data: | 296 | if k in data: |
397 | outhash_data[k] = data[k] | 297 | outhash_data[k] = data[k] |
398 | |||
399 | # Insert the new entry, unless it already exists | ||
400 | (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) | ||
401 | |||
402 | if inserted: | ||
403 | # If this row is new, check if it is equivalent to another | ||
404 | # output hash | ||
405 | cursor.execute( | ||
406 | ''' | ||
407 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
408 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
409 | -- Select any matching output hash except the one we just inserted | ||
410 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | ||
411 | -- Pick the oldest hash | ||
412 | ORDER BY outhashes_v2.created ASC | ||
413 | LIMIT 1 | ||
414 | ''', | ||
415 | { | ||
416 | 'method': data['method'], | ||
417 | 'outhash': data['outhash'], | ||
418 | 'taskhash': data['taskhash'], | ||
419 | } | ||
420 | ) | ||
421 | row = cursor.fetchone() | ||
422 | 298 | ||
423 | if row is not None: | 299 | # Insert the new entry, unless it already exists |
424 | # A matching output hash was found. Set our taskhash to the | 300 | if await self.db.insert_outhash(outhash_data): |
425 | # same unihash since they are equivalent | 301 | # If this row is new, check if it is equivalent to another |
426 | unihash = row['unihash'] | 302 | # output hash |
427 | resolve = Resolve.IGNORE | 303 | row = await self.db.get_equivalent_for_outhash( |
428 | else: | 304 | data["method"], data["outhash"], data["taskhash"] |
429 | # No matching output hash was found. This is probably the | 305 | ) |
430 | # first outhash to be added. | ||
431 | unihash = data['unihash'] | ||
432 | resolve = Resolve.IGNORE | ||
433 | |||
434 | # Query upstream to see if it has a unihash we can use | ||
435 | if self.upstream_client is not None: | ||
436 | upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) | ||
437 | if upstream_data is not None: | ||
438 | unihash = upstream_data['unihash'] | ||
439 | |||
440 | |||
441 | insert_unihash( | ||
442 | cursor, | ||
443 | { | ||
444 | 'method': data['method'], | ||
445 | 'taskhash': data['taskhash'], | ||
446 | 'unihash': unihash, | ||
447 | }, | ||
448 | resolve | ||
449 | ) | ||
450 | |||
451 | unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) | ||
452 | if unihash_data is not None: | ||
453 | unihash = unihash_data['unihash'] | ||
454 | else: | ||
455 | unihash = data['unihash'] | ||
456 | |||
457 | self.db.commit() | ||
458 | 306 | ||
459 | d = { | 307 | if row is not None: |
460 | 'taskhash': data['taskhash'], | 308 | # A matching output hash was found. Set our taskhash to the |
461 | 'method': data['method'], | 309 | # same unihash since they are equivalent |
462 | 'unihash': unihash, | 310 | unihash = row["unihash"] |
463 | } | 311 | else: |
312 | # No matching output hash was found. This is probably the | ||
313 | # first outhash to be added. | ||
314 | unihash = data["unihash"] | ||
315 | |||
316 | # Query upstream to see if it has a unihash we can use | ||
317 | if self.upstream_client is not None: | ||
318 | upstream_data = await self.upstream_client.get_outhash( | ||
319 | data["method"], data["outhash"], data["taskhash"] | ||
320 | ) | ||
321 | if upstream_data is not None: | ||
322 | unihash = upstream_data["unihash"] | ||
323 | |||
324 | await self.db.insert_unihash(data["method"], data["taskhash"], unihash) | ||
325 | |||
326 | unihash_data = await self.get_unihash(data["method"], data["taskhash"]) | ||
327 | if unihash_data is not None: | ||
328 | unihash = unihash_data["unihash"] | ||
329 | else: | ||
330 | unihash = data["unihash"] | ||
464 | 331 | ||
465 | return d | 332 | return { |
333 | "taskhash": data["taskhash"], | ||
334 | "method": data["method"], | ||
335 | "unihash": unihash, | ||
336 | } | ||
466 | 337 | ||
467 | async def handle_equivreport(self, data): | 338 | async def handle_equivreport(self, data): |
468 | with closing(self.db.cursor()) as cursor: | 339 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
469 | insert_data = { | 340 | |
470 | 'method': data['method'], | 341 | # Fetch the unihash that will be reported for the taskhash. If the |
471 | 'taskhash': data['taskhash'], | 342 | # unihash matches, it means this row was inserted (or the mapping |
472 | 'unihash': data['unihash'], | 343 | # was already valid) |
473 | } | 344 | row = await self.db.get_equivalent(data["method"], data["taskhash"]) |
474 | insert_unihash(cursor, insert_data, Resolve.IGNORE) | 345 | |
475 | self.db.commit() | 346 | if row["unihash"] == data["unihash"]: |
476 | 347 | self.logger.info( | |
477 | # Fetch the unihash that will be reported for the taskhash. If the | 348 | "Adding taskhash equivalence for %s with unihash %s", |
478 | # unihash matches, it means this row was inserted (or the mapping | 349 | data["taskhash"], |
479 | # was already valid) | 350 | row["unihash"], |
480 | row = self.query_equivalent(cursor, data['method'], data['taskhash']) | 351 | ) |
481 | |||
482 | if row['unihash'] == data['unihash']: | ||
483 | self.logger.info('Adding taskhash equivalence for %s with unihash %s', | ||
484 | data['taskhash'], row['unihash']) | ||
485 | |||
486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | ||
487 | |||
488 | return d | ||
489 | 352 | ||
353 | return {k: row[k] for k in ("taskhash", "method", "unihash")} | ||
490 | 354 | ||
491 | async def handle_get_stats(self, request): | 355 | async def handle_get_stats(self, request): |
492 | return { | 356 | return { |
493 | 'requests': self.request_stats.todict(), | 357 | "requests": self.request_stats.todict(), |
494 | } | 358 | } |
495 | 359 | ||
496 | async def handle_reset_stats(self, request): | 360 | async def handle_reset_stats(self, request): |
497 | d = { | 361 | d = { |
498 | 'requests': self.request_stats.todict(), | 362 | "requests": self.request_stats.todict(), |
499 | } | 363 | } |
500 | 364 | ||
501 | self.request_stats.reset() | 365 | self.request_stats.reset() |
@@ -503,7 +367,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
503 | 367 | ||
504 | async def handle_backfill_wait(self, request): | 368 | async def handle_backfill_wait(self, request): |
505 | d = { | 369 | d = { |
506 | 'tasks': self.backfill_queue.qsize(), | 370 | "tasks": self.backfill_queue.qsize(), |
507 | } | 371 | } |
508 | await self.backfill_queue.join() | 372 | await self.backfill_queue.join() |
509 | return d | 373 | return d |
@@ -513,92 +377,63 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
513 | if not isinstance(condition, dict): | 377 | if not isinstance(condition, dict): |
514 | raise TypeError("Bad condition type %s" % type(condition)) | 378 | raise TypeError("Bad condition type %s" % type(condition)) |
515 | 379 | ||
516 | def do_remove(columns, table_name, cursor): | 380 | return {"count": await self.db.remove(condition)} |
517 | nonlocal condition | ||
518 | where = {} | ||
519 | for c in columns: | ||
520 | if c in condition and condition[c] is not None: | ||
521 | where[c] = condition[c] | ||
522 | |||
523 | if where: | ||
524 | query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys()) | ||
525 | cursor.execute(query, where) | ||
526 | return cursor.rowcount | ||
527 | |||
528 | return 0 | ||
529 | |||
530 | count = 0 | ||
531 | with closing(self.db.cursor()) as cursor: | ||
532 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) | ||
533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | ||
534 | self.db.commit() | ||
535 | |||
536 | return {"count": count} | ||
537 | 381 | ||
538 | async def handle_clean_unused(self, request): | 382 | async def handle_clean_unused(self, request): |
539 | max_age = request["max_age_seconds"] | 383 | max_age = request["max_age_seconds"] |
540 | with closing(self.db.cursor()) as cursor: | 384 | oldest = datetime.now() - timedelta(seconds=-max_age) |
541 | cursor.execute( | 385 | return {"count": await self.db.clean_unused(oldest)} |
542 | """ | ||
543 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( | ||
544 | SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 | ||
545 | ) | ||
546 | """, | ||
547 | { | ||
548 | "oldest": datetime.now() - timedelta(seconds=-max_age) | ||
549 | } | ||
550 | ) | ||
551 | count = cursor.rowcount | ||
552 | |||
553 | return {"count": count} | ||
554 | |||
555 | def query_equivalent(self, cursor, method, taskhash): | ||
556 | # This is part of the inner loop and must be as fast as possible | ||
557 | cursor.execute( | ||
558 | 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', | ||
559 | { | ||
560 | 'method': method, | ||
561 | 'taskhash': taskhash, | ||
562 | } | ||
563 | ) | ||
564 | return cursor.fetchone() | ||
565 | 386 | ||
566 | 387 | ||
567 | class Server(bb.asyncrpc.AsyncServer): | 388 | class Server(bb.asyncrpc.AsyncServer): |
568 | def __init__(self, db, upstream=None, read_only=False): | 389 | def __init__(self, db_engine, upstream=None, read_only=False): |
569 | if upstream and read_only: | 390 | if upstream and read_only: |
570 | raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") | 391 | raise bb.asyncrpc.ServerError( |
392 | "Read-only hashserv cannot pull from an upstream server" | ||
393 | ) | ||
571 | 394 | ||
572 | super().__init__(logger) | 395 | super().__init__(logger) |
573 | 396 | ||
574 | self.request_stats = Stats() | 397 | self.request_stats = Stats() |
575 | self.db = db | 398 | self.db_engine = db_engine |
576 | self.upstream = upstream | 399 | self.upstream = upstream |
577 | self.read_only = read_only | 400 | self.read_only = read_only |
578 | self.backfill_queue = None | 401 | self.backfill_queue = None |
579 | 402 | ||
580 | def accept_client(self, socket): | 403 | def accept_client(self, socket): |
581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 404 | return ServerClient( |
405 | socket, | ||
406 | self.db_engine, | ||
407 | self.request_stats, | ||
408 | self.backfill_queue, | ||
409 | self.upstream, | ||
410 | self.read_only, | ||
411 | ) | ||
582 | 412 | ||
583 | async def backfill_worker_task(self): | 413 | async def backfill_worker_task(self): |
584 | client = await create_async_client(self.upstream) | 414 | async with await create_async_client( |
585 | try: | 415 | self.upstream |
416 | ) as client, self.db_engine.connect(logger) as db: | ||
586 | while True: | 417 | while True: |
587 | item = await self.backfill_queue.get() | 418 | item = await self.backfill_queue.get() |
588 | if item is None: | 419 | if item is None: |
589 | self.backfill_queue.task_done() | 420 | self.backfill_queue.task_done() |
590 | break | 421 | break |
422 | |||
591 | method, taskhash = item | 423 | method, taskhash = item |
592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | 424 | d = await client.get_taskhash(method, taskhash) |
425 | if d is not None: | ||
426 | await db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) | ||
593 | self.backfill_queue.task_done() | 427 | self.backfill_queue.task_done() |
594 | finally: | ||
595 | await client.close() | ||
596 | 428 | ||
597 | def start(self): | 429 | def start(self): |
598 | tasks = super().start() | 430 | tasks = super().start() |
599 | if self.upstream: | 431 | if self.upstream: |
600 | self.backfill_queue = asyncio.Queue() | 432 | self.backfill_queue = asyncio.Queue() |
601 | tasks += [self.backfill_worker_task()] | 433 | tasks += [self.backfill_worker_task()] |
434 | |||
435 | self.loop.run_until_complete(self.db_engine.create()) | ||
436 | |||
602 | return tasks | 437 | return tasks |
603 | 438 | ||
604 | async def stop(self): | 439 | async def stop(self): |
diff --git a/bitbake/lib/hashserv/sqlite.py b/bitbake/lib/hashserv/sqlite.py new file mode 100644 index 0000000000..6809c53706 --- /dev/null +++ b/bitbake/lib/hashserv/sqlite.py | |||
@@ -0,0 +1,259 @@ | |||
1 | #! /usr/bin/env python3 | ||
2 | # | ||
3 | # Copyright (C) 2023 Garmin Ltd. | ||
4 | # | ||
5 | # SPDX-License-Identifier: GPL-2.0-only | ||
6 | # | ||
7 | import sqlite3 | ||
8 | import logging | ||
9 | from contextlib import closing | ||
10 | |||
11 | logger = logging.getLogger("hashserv.sqlite") | ||
12 | |||
13 | UNIHASH_TABLE_DEFINITION = ( | ||
14 | ("method", "TEXT NOT NULL", "UNIQUE"), | ||
15 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | ||
16 | ("unihash", "TEXT NOT NULL", ""), | ||
17 | ) | ||
18 | |||
19 | UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) | ||
20 | |||
21 | OUTHASH_TABLE_DEFINITION = ( | ||
22 | ("method", "TEXT NOT NULL", "UNIQUE"), | ||
23 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | ||
24 | ("outhash", "TEXT NOT NULL", "UNIQUE"), | ||
25 | ("created", "DATETIME", ""), | ||
26 | # Optional fields | ||
27 | ("owner", "TEXT", ""), | ||
28 | ("PN", "TEXT", ""), | ||
29 | ("PV", "TEXT", ""), | ||
30 | ("PR", "TEXT", ""), | ||
31 | ("task", "TEXT", ""), | ||
32 | ("outhash_siginfo", "TEXT", ""), | ||
33 | ) | ||
34 | |||
35 | OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) | ||
36 | |||
37 | |||
38 | def _make_table(cursor, name, definition): | ||
39 | cursor.execute( | ||
40 | """ | ||
41 | CREATE TABLE IF NOT EXISTS {name} ( | ||
42 | id INTEGER PRIMARY KEY AUTOINCREMENT, | ||
43 | {fields} | ||
44 | UNIQUE({unique}) | ||
45 | ) | ||
46 | """.format( | ||
47 | name=name, | ||
48 | fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), | ||
49 | unique=", ".join( | ||
50 | name for name, _, flags in definition if "UNIQUE" in flags | ||
51 | ), | ||
52 | ) | ||
53 | ) | ||
54 | |||
55 | |||
56 | class DatabaseEngine(object): | ||
57 | def __init__(self, dbname, sync): | ||
58 | self.dbname = dbname | ||
59 | self.logger = logger | ||
60 | self.sync = sync | ||
61 | |||
62 | async def create(self): | ||
63 | db = sqlite3.connect(self.dbname) | ||
64 | db.row_factory = sqlite3.Row | ||
65 | |||
66 | with closing(db.cursor()) as cursor: | ||
67 | _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION) | ||
68 | _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) | ||
69 | |||
70 | cursor.execute("PRAGMA journal_mode = WAL") | ||
71 | cursor.execute( | ||
72 | "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF") | ||
73 | ) | ||
74 | |||
75 | # Drop old indexes | ||
76 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup") | ||
77 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup") | ||
78 | cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") | ||
79 | cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") | ||
80 | |||
81 | # TODO: Upgrade from tasks_v2? | ||
82 | cursor.execute("DROP TABLE IF EXISTS tasks_v2") | ||
83 | |||
84 | # Create new indexes | ||
85 | cursor.execute( | ||
86 | "CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)" | ||
87 | ) | ||
88 | cursor.execute( | ||
89 | "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" | ||
90 | ) | ||
91 | |||
92 | def connect(self, logger): | ||
93 | return Database(logger, self.dbname) | ||
94 | |||
95 | |||
96 | class Database(object): | ||
97 | def __init__(self, logger, dbname, sync=True): | ||
98 | self.dbname = dbname | ||
99 | self.logger = logger | ||
100 | |||
101 | self.db = sqlite3.connect(self.dbname) | ||
102 | self.db.row_factory = sqlite3.Row | ||
103 | |||
104 | async def __aenter__(self): | ||
105 | return self | ||
106 | |||
107 | async def __aexit__(self, exc_type, exc_value, traceback): | ||
108 | await self.close() | ||
109 | |||
110 | async def close(self): | ||
111 | self.db.close() | ||
112 | |||
113 | async def get_unihash_by_taskhash_full(self, method, taskhash): | ||
114 | with closing(self.db.cursor()) as cursor: | ||
115 | cursor.execute( | ||
116 | """ | ||
117 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
118 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
119 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | ||
120 | ORDER BY outhashes_v2.created ASC | ||
121 | LIMIT 1 | ||
122 | """, | ||
123 | { | ||
124 | "method": method, | ||
125 | "taskhash": taskhash, | ||
126 | }, | ||
127 | ) | ||
128 | return cursor.fetchone() | ||
129 | |||
130 | async def get_unihash_by_outhash(self, method, outhash): | ||
131 | with closing(self.db.cursor()) as cursor: | ||
132 | cursor.execute( | ||
133 | """ | ||
134 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
135 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
136 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
137 | ORDER BY outhashes_v2.created ASC | ||
138 | LIMIT 1 | ||
139 | """, | ||
140 | { | ||
141 | "method": method, | ||
142 | "outhash": outhash, | ||
143 | }, | ||
144 | ) | ||
145 | return cursor.fetchone() | ||
146 | |||
147 | async def get_outhash(self, method, outhash): | ||
148 | with closing(self.db.cursor()) as cursor: | ||
149 | cursor.execute( | ||
150 | """ | ||
151 | SELECT * FROM outhashes_v2 | ||
152 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
153 | ORDER BY outhashes_v2.created ASC | ||
154 | LIMIT 1 | ||
155 | """, | ||
156 | { | ||
157 | "method": method, | ||
158 | "outhash": outhash, | ||
159 | }, | ||
160 | ) | ||
161 | return cursor.fetchone() | ||
162 | |||
163 | async def get_equivalent_for_outhash(self, method, outhash, taskhash): | ||
164 | with closing(self.db.cursor()) as cursor: | ||
165 | cursor.execute( | ||
166 | """ | ||
167 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
168 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
169 | -- Select any matching output hash except the one we just inserted | ||
170 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | ||
171 | -- Pick the oldest hash | ||
172 | ORDER BY outhashes_v2.created ASC | ||
173 | LIMIT 1 | ||
174 | """, | ||
175 | { | ||
176 | "method": method, | ||
177 | "outhash": outhash, | ||
178 | "taskhash": taskhash, | ||
179 | }, | ||
180 | ) | ||
181 | return cursor.fetchone() | ||
182 | |||
183 | async def get_equivalent(self, method, taskhash): | ||
184 | with closing(self.db.cursor()) as cursor: | ||
185 | cursor.execute( | ||
186 | "SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash", | ||
187 | { | ||
188 | "method": method, | ||
189 | "taskhash": taskhash, | ||
190 | }, | ||
191 | ) | ||
192 | return cursor.fetchone() | ||
193 | |||
194 | async def remove(self, condition): | ||
195 | def do_remove(columns, table_name, cursor): | ||
196 | where = {} | ||
197 | for c in columns: | ||
198 | if c in condition and condition[c] is not None: | ||
199 | where[c] = condition[c] | ||
200 | |||
201 | if where: | ||
202 | query = ("DELETE FROM %s WHERE " % table_name) + " AND ".join( | ||
203 | "%s=:%s" % (k, k) for k in where.keys() | ||
204 | ) | ||
205 | cursor.execute(query, where) | ||
206 | return cursor.rowcount | ||
207 | |||
208 | return 0 | ||
209 | |||
210 | count = 0 | ||
211 | with closing(self.db.cursor()) as cursor: | ||
212 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) | ||
213 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | ||
214 | self.db.commit() | ||
215 | |||
216 | return count | ||
217 | |||
218 | async def clean_unused(self, oldest): | ||
219 | with closing(self.db.cursor()) as cursor: | ||
220 | cursor.execute( | ||
221 | """ | ||
222 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( | ||
223 | SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 | ||
224 | ) | ||
225 | """, | ||
226 | { | ||
227 | "oldest": oldest, | ||
228 | }, | ||
229 | ) | ||
230 | return cursor.rowcount | ||
231 | |||
232 | async def insert_unihash(self, method, taskhash, unihash): | ||
233 | with closing(self.db.cursor()) as cursor: | ||
234 | prevrowid = cursor.lastrowid | ||
235 | cursor.execute( | ||
236 | """ | ||
237 | INSERT OR IGNORE INTO unihashes_v2 (method, taskhash, unihash) VALUES(:method, :taskhash, :unihash) | ||
238 | """, | ||
239 | { | ||
240 | "method": method, | ||
241 | "taskhash": taskhash, | ||
242 | "unihash": unihash, | ||
243 | }, | ||
244 | ) | ||
245 | self.db.commit() | ||
246 | return cursor.lastrowid != prevrowid | ||
247 | |||
248 | async def insert_outhash(self, data): | ||
249 | data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS} | ||
250 | keys = sorted(data.keys()) | ||
251 | query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format( | ||
252 | fields=", ".join(keys), | ||
253 | values=", ".join(":" + k for k in keys), | ||
254 | ) | ||
255 | with closing(self.db.cursor()) as cursor: | ||
256 | prevrowid = cursor.lastrowid | ||
257 | cursor.execute(query, data) | ||
258 | self.db.commit() | ||
259 | return cursor.lastrowid != prevrowid | ||