diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2023-11-03 08:26:25 -0600 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2023-11-09 17:33:02 +0000 |
commit | baa3e5391daf41b6dd6e914a112abb00d3517da1 (patch) | |
tree | 8280b21caf78db2783f586250776c4813f24cb87 /bitbake/lib/hashserv/server.py | |
parent | e90fccfefd7693d8cdfa731fa7e170c8bd4b1a1b (diff) | |
download | poky-baa3e5391daf41b6dd6e914a112abb00d3517da1.tar.gz |
bitbake: hashserv: Abstract database
Abstracts the way the database backend is accessed by the hash
equivalence server to make it possible to use other backends
(Bitbake rev: 04b53deacf857488408bc82b9890b1e19874b5f1)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r-- | bitbake/lib/hashserv/server.py | 491 |
1 files changed, 163 insertions, 328 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index e6a3f40577..84cf4f2283 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -3,18 +3,16 @@ | |||
3 | # SPDX-License-Identifier: GPL-2.0-only | 3 | # SPDX-License-Identifier: GPL-2.0-only |
4 | # | 4 | # |
5 | 5 | ||
6 | from contextlib import closing, contextmanager | ||
7 | from datetime import datetime, timedelta | 6 | from datetime import datetime, timedelta |
8 | import enum | ||
9 | import asyncio | 7 | import asyncio |
10 | import logging | 8 | import logging |
11 | import math | 9 | import math |
12 | import time | 10 | import time |
13 | from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS | 11 | from . import create_async_client |
14 | import bb.asyncrpc | 12 | import bb.asyncrpc |
15 | 13 | ||
16 | 14 | ||
17 | logger = logging.getLogger('hashserv.server') | 15 | logger = logging.getLogger("hashserv.server") |
18 | 16 | ||
19 | 17 | ||
20 | class Measurement(object): | 18 | class Measurement(object): |
@@ -104,229 +102,136 @@ class Stats(object): | |||
104 | return math.sqrt(self.s / (self.num - 1)) | 102 | return math.sqrt(self.s / (self.num - 1)) |
105 | 103 | ||
106 | def todict(self): | 104 | def todict(self): |
107 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} | 105 | return { |
108 | 106 | k: getattr(self, k) | |
109 | 107 | for k in ("num", "total_time", "max_time", "average", "stdev") | |
110 | @enum.unique | 108 | } |
111 | class Resolve(enum.Enum): | ||
112 | FAIL = enum.auto() | ||
113 | IGNORE = enum.auto() | ||
114 | REPLACE = enum.auto() | ||
115 | |||
116 | |||
117 | def insert_table(cursor, table, data, on_conflict): | ||
118 | resolve = { | ||
119 | Resolve.FAIL: "", | ||
120 | Resolve.IGNORE: " OR IGNORE", | ||
121 | Resolve.REPLACE: " OR REPLACE", | ||
122 | }[on_conflict] | ||
123 | |||
124 | keys = sorted(data.keys()) | ||
125 | query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( | ||
126 | resolve=resolve, | ||
127 | table=table, | ||
128 | fields=", ".join(keys), | ||
129 | values=", ".join(":" + k for k in keys), | ||
130 | ) | ||
131 | prevrowid = cursor.lastrowid | ||
132 | cursor.execute(query, data) | ||
133 | logging.debug( | ||
134 | "Inserting %r into %s, %s", | ||
135 | data, | ||
136 | table, | ||
137 | on_conflict | ||
138 | ) | ||
139 | return (cursor.lastrowid, cursor.lastrowid != prevrowid) | ||
140 | |||
141 | def insert_unihash(cursor, data, on_conflict): | ||
142 | return insert_table(cursor, "unihashes_v2", data, on_conflict) | ||
143 | |||
144 | def insert_outhash(cursor, data, on_conflict): | ||
145 | return insert_table(cursor, "outhashes_v2", data, on_conflict) | ||
146 | |||
147 | async def copy_unihash_from_upstream(client, db, method, taskhash): | ||
148 | d = await client.get_taskhash(method, taskhash) | ||
149 | if d is not None: | ||
150 | with closing(db.cursor()) as cursor: | ||
151 | insert_unihash( | ||
152 | cursor, | ||
153 | {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
154 | Resolve.IGNORE, | ||
155 | ) | ||
156 | db.commit() | ||
157 | return d | ||
158 | |||
159 | |||
160 | class ServerCursor(object): | ||
161 | def __init__(self, db, cursor, upstream): | ||
162 | self.db = db | ||
163 | self.cursor = cursor | ||
164 | self.upstream = upstream | ||
165 | 109 | ||
166 | 110 | ||
167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 111 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): | 112 | def __init__( |
169 | super().__init__(socket, 'OEHASHEQUIV', logger) | 113 | self, |
170 | self.db = db | 114 | socket, |
115 | db_engine, | ||
116 | request_stats, | ||
117 | backfill_queue, | ||
118 | upstream, | ||
119 | read_only, | ||
120 | ): | ||
121 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
122 | self.db_engine = db_engine | ||
171 | self.request_stats = request_stats | 123 | self.request_stats = request_stats |
172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 124 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
173 | self.backfill_queue = backfill_queue | 125 | self.backfill_queue = backfill_queue |
174 | self.upstream = upstream | 126 | self.upstream = upstream |
175 | 127 | ||
176 | self.handlers.update({ | 128 | self.handlers.update( |
177 | 'get': self.handle_get, | 129 | { |
178 | 'get-outhash': self.handle_get_outhash, | 130 | "get": self.handle_get, |
179 | 'get-stream': self.handle_get_stream, | 131 | "get-outhash": self.handle_get_outhash, |
180 | 'get-stats': self.handle_get_stats, | 132 | "get-stream": self.handle_get_stream, |
181 | }) | 133 | "get-stats": self.handle_get_stats, |
134 | } | ||
135 | ) | ||
182 | 136 | ||
183 | if not read_only: | 137 | if not read_only: |
184 | self.handlers.update({ | 138 | self.handlers.update( |
185 | 'report': self.handle_report, | 139 | { |
186 | 'report-equiv': self.handle_equivreport, | 140 | "report": self.handle_report, |
187 | 'reset-stats': self.handle_reset_stats, | 141 | "report-equiv": self.handle_equivreport, |
188 | 'backfill-wait': self.handle_backfill_wait, | 142 | "reset-stats": self.handle_reset_stats, |
189 | 'remove': self.handle_remove, | 143 | "backfill-wait": self.handle_backfill_wait, |
190 | 'clean-unused': self.handle_clean_unused, | 144 | "remove": self.handle_remove, |
191 | }) | 145 | "clean-unused": self.handle_clean_unused, |
146 | } | ||
147 | ) | ||
192 | 148 | ||
193 | def validate_proto_version(self): | 149 | def validate_proto_version(self): |
194 | return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) | 150 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
195 | 151 | ||
196 | async def process_requests(self): | 152 | async def process_requests(self): |
197 | if self.upstream is not None: | 153 | async with self.db_engine.connect(self.logger) as db: |
198 | self.upstream_client = await create_async_client(self.upstream) | 154 | self.db = db |
199 | else: | 155 | if self.upstream is not None: |
200 | self.upstream_client = None | 156 | self.upstream_client = await create_async_client(self.upstream) |
201 | 157 | else: | |
202 | await super().process_requests() | 158 | self.upstream_client = None |
203 | 159 | ||
204 | if self.upstream_client is not None: | 160 | try: |
205 | await self.upstream_client.close() | 161 | await super().process_requests() |
162 | finally: | ||
163 | if self.upstream_client is not None: | ||
164 | await self.upstream_client.close() | ||
206 | 165 | ||
207 | async def dispatch_message(self, msg): | 166 | async def dispatch_message(self, msg): |
208 | for k in self.handlers.keys(): | 167 | for k in self.handlers.keys(): |
209 | if k in msg: | 168 | if k in msg: |
210 | self.logger.debug('Handling %s' % k) | 169 | self.logger.debug("Handling %s" % k) |
211 | if 'stream' in k: | 170 | if "stream" in k: |
212 | return await self.handlers[k](msg[k]) | 171 | return await self.handlers[k](msg[k]) |
213 | else: | 172 | else: |
214 | with self.request_stats.start_sample() as self.request_sample, \ | 173 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
215 | self.request_sample.measure(): | ||
216 | return await self.handlers[k](msg[k]) | 174 | return await self.handlers[k](msg[k]) |
217 | 175 | ||
218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 176 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
219 | 177 | ||
220 | async def handle_get(self, request): | 178 | async def handle_get(self, request): |
221 | method = request['method'] | 179 | method = request["method"] |
222 | taskhash = request['taskhash'] | 180 | taskhash = request["taskhash"] |
223 | fetch_all = request.get('all', False) | 181 | fetch_all = request.get("all", False) |
224 | 182 | ||
225 | with closing(self.db.cursor()) as cursor: | 183 | return await self.get_unihash(method, taskhash, fetch_all) |
226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) | ||
227 | 184 | ||
228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 185 | async def get_unihash(self, method, taskhash, fetch_all=False): |
229 | d = None | 186 | d = None |
230 | 187 | ||
231 | if fetch_all: | 188 | if fetch_all: |
232 | cursor.execute( | 189 | row = await self.db.get_unihash_by_taskhash_full(method, taskhash) |
233 | ''' | ||
234 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
235 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
236 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | ||
237 | ORDER BY outhashes_v2.created ASC | ||
238 | LIMIT 1 | ||
239 | ''', | ||
240 | { | ||
241 | 'method': method, | ||
242 | 'taskhash': taskhash, | ||
243 | } | ||
244 | |||
245 | ) | ||
246 | row = cursor.fetchone() | ||
247 | |||
248 | if row is not None: | 190 | if row is not None: |
249 | d = {k: row[k] for k in row.keys()} | 191 | d = {k: row[k] for k in row.keys()} |
250 | elif self.upstream_client is not None: | 192 | elif self.upstream_client is not None: |
251 | d = await self.upstream_client.get_taskhash(method, taskhash, True) | 193 | d = await self.upstream_client.get_taskhash(method, taskhash, True) |
252 | self.update_unified(cursor, d) | 194 | await self.update_unified(d) |
253 | self.db.commit() | ||
254 | else: | 195 | else: |
255 | row = self.query_equivalent(cursor, method, taskhash) | 196 | row = await self.db.get_equivalent(method, taskhash) |
256 | 197 | ||
257 | if row is not None: | 198 | if row is not None: |
258 | d = {k: row[k] for k in row.keys()} | 199 | d = {k: row[k] for k in row.keys()} |
259 | elif self.upstream_client is not None: | 200 | elif self.upstream_client is not None: |
260 | d = await self.upstream_client.get_taskhash(method, taskhash) | 201 | d = await self.upstream_client.get_taskhash(method, taskhash) |
261 | d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} | 202 | await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) |
262 | insert_unihash(cursor, d, Resolve.IGNORE) | ||
263 | self.db.commit() | ||
264 | 203 | ||
265 | return d | 204 | return d |
266 | 205 | ||
267 | async def handle_get_outhash(self, request): | 206 | async def handle_get_outhash(self, request): |
268 | method = request['method'] | 207 | method = request["method"] |
269 | outhash = request['outhash'] | 208 | outhash = request["outhash"] |
270 | taskhash = request['taskhash'] | 209 | taskhash = request["taskhash"] |
271 | with_unihash = request.get("with_unihash", True) | 210 | with_unihash = request.get("with_unihash", True) |
272 | 211 | ||
273 | with closing(self.db.cursor()) as cursor: | 212 | return await self.get_outhash(method, outhash, taskhash, with_unihash) |
274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | ||
275 | 213 | ||
276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 214 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
277 | d = None | 215 | d = None |
278 | if with_unihash: | 216 | if with_unihash: |
279 | cursor.execute( | 217 | row = await self.db.get_unihash_by_outhash(method, outhash) |
280 | ''' | ||
281 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
282 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
283 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
284 | ORDER BY outhashes_v2.created ASC | ||
285 | LIMIT 1 | ||
286 | ''', | ||
287 | { | ||
288 | 'method': method, | ||
289 | 'outhash': outhash, | ||
290 | } | ||
291 | ) | ||
292 | else: | 218 | else: |
293 | cursor.execute( | 219 | row = await self.db.get_outhash(method, outhash) |
294 | """ | ||
295 | SELECT * FROM outhashes_v2 | ||
296 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
297 | ORDER BY outhashes_v2.created ASC | ||
298 | LIMIT 1 | ||
299 | """, | ||
300 | { | ||
301 | 'method': method, | ||
302 | 'outhash': outhash, | ||
303 | } | ||
304 | ) | ||
305 | row = cursor.fetchone() | ||
306 | 220 | ||
307 | if row is not None: | 221 | if row is not None: |
308 | d = {k: row[k] for k in row.keys()} | 222 | d = {k: row[k] for k in row.keys()} |
309 | elif self.upstream_client is not None: | 223 | elif self.upstream_client is not None: |
310 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) | 224 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) |
311 | self.update_unified(cursor, d) | 225 | await self.update_unified(d) |
312 | self.db.commit() | ||
313 | 226 | ||
314 | return d | 227 | return d |
315 | 228 | ||
316 | def update_unified(self, cursor, data): | 229 | async def update_unified(self, data): |
317 | if data is None: | 230 | if data is None: |
318 | return | 231 | return |
319 | 232 | ||
320 | insert_unihash( | 233 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
321 | cursor, | 234 | await self.db.insert_outhash(data) |
322 | {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
323 | Resolve.IGNORE | ||
324 | ) | ||
325 | insert_outhash( | ||
326 | cursor, | ||
327 | {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, | ||
328 | Resolve.IGNORE | ||
329 | ) | ||
330 | 235 | ||
331 | async def handle_get_stream(self, request): | 236 | async def handle_get_stream(self, request): |
332 | await self.socket.send_message("ok") | 237 | await self.socket.send_message("ok") |
@@ -347,20 +252,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
347 | request_measure = self.request_sample.measure() | 252 | request_measure = self.request_sample.measure() |
348 | request_measure.start() | 253 | request_measure.start() |
349 | 254 | ||
350 | if l == 'END': | 255 | if l == "END": |
351 | break | 256 | break |
352 | 257 | ||
353 | (method, taskhash) = l.split() | 258 | (method, taskhash) = l.split() |
354 | #self.logger.debug('Looking up %s %s' % (method, taskhash)) | 259 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) |
355 | cursor = self.db.cursor() | 260 | row = await self.db.get_equivalent(method, taskhash) |
356 | try: | ||
357 | row = self.query_equivalent(cursor, method, taskhash) | ||
358 | finally: | ||
359 | cursor.close() | ||
360 | 261 | ||
361 | if row is not None: | 262 | if row is not None: |
362 | msg = row['unihash'] | 263 | msg = row["unihash"] |
363 | #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 264 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
364 | elif self.upstream_client is not None: | 265 | elif self.upstream_client is not None: |
365 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 266 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
366 | if upstream: | 267 | if upstream: |
@@ -384,118 +285,81 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
384 | return self.NO_RESPONSE | 285 | return self.NO_RESPONSE |
385 | 286 | ||
386 | async def handle_report(self, data): | 287 | async def handle_report(self, data): |
387 | with closing(self.db.cursor()) as cursor: | 288 | outhash_data = { |
388 | outhash_data = { | 289 | "method": data["method"], |
389 | 'method': data['method'], | 290 | "outhash": data["outhash"], |
390 | 'outhash': data['outhash'], | 291 | "taskhash": data["taskhash"], |
391 | 'taskhash': data['taskhash'], | 292 | "created": datetime.now(), |
392 | 'created': datetime.now() | 293 | } |
393 | } | ||
394 | 294 | ||
395 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | 295 | for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"): |
396 | if k in data: | 296 | if k in data: |
397 | outhash_data[k] = data[k] | 297 | outhash_data[k] = data[k] |
398 | |||
399 | # Insert the new entry, unless it already exists | ||
400 | (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) | ||
401 | |||
402 | if inserted: | ||
403 | # If this row is new, check if it is equivalent to another | ||
404 | # output hash | ||
405 | cursor.execute( | ||
406 | ''' | ||
407 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
408 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
409 | -- Select any matching output hash except the one we just inserted | ||
410 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | ||
411 | -- Pick the oldest hash | ||
412 | ORDER BY outhashes_v2.created ASC | ||
413 | LIMIT 1 | ||
414 | ''', | ||
415 | { | ||
416 | 'method': data['method'], | ||
417 | 'outhash': data['outhash'], | ||
418 | 'taskhash': data['taskhash'], | ||
419 | } | ||
420 | ) | ||
421 | row = cursor.fetchone() | ||
422 | 298 | ||
423 | if row is not None: | 299 | # Insert the new entry, unless it already exists |
424 | # A matching output hash was found. Set our taskhash to the | 300 | if await self.db.insert_outhash(outhash_data): |
425 | # same unihash since they are equivalent | 301 | # If this row is new, check if it is equivalent to another |
426 | unihash = row['unihash'] | 302 | # output hash |
427 | resolve = Resolve.IGNORE | 303 | row = await self.db.get_equivalent_for_outhash( |
428 | else: | 304 | data["method"], data["outhash"], data["taskhash"] |
429 | # No matching output hash was found. This is probably the | 305 | ) |
430 | # first outhash to be added. | ||
431 | unihash = data['unihash'] | ||
432 | resolve = Resolve.IGNORE | ||
433 | |||
434 | # Query upstream to see if it has a unihash we can use | ||
435 | if self.upstream_client is not None: | ||
436 | upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) | ||
437 | if upstream_data is not None: | ||
438 | unihash = upstream_data['unihash'] | ||
439 | |||
440 | |||
441 | insert_unihash( | ||
442 | cursor, | ||
443 | { | ||
444 | 'method': data['method'], | ||
445 | 'taskhash': data['taskhash'], | ||
446 | 'unihash': unihash, | ||
447 | }, | ||
448 | resolve | ||
449 | ) | ||
450 | |||
451 | unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) | ||
452 | if unihash_data is not None: | ||
453 | unihash = unihash_data['unihash'] | ||
454 | else: | ||
455 | unihash = data['unihash'] | ||
456 | |||
457 | self.db.commit() | ||
458 | 306 | ||
459 | d = { | 307 | if row is not None: |
460 | 'taskhash': data['taskhash'], | 308 | # A matching output hash was found. Set our taskhash to the |
461 | 'method': data['method'], | 309 | # same unihash since they are equivalent |
462 | 'unihash': unihash, | 310 | unihash = row["unihash"] |
463 | } | 311 | else: |
312 | # No matching output hash was found. This is probably the | ||
313 | # first outhash to be added. | ||
314 | unihash = data["unihash"] | ||
315 | |||
316 | # Query upstream to see if it has a unihash we can use | ||
317 | if self.upstream_client is not None: | ||
318 | upstream_data = await self.upstream_client.get_outhash( | ||
319 | data["method"], data["outhash"], data["taskhash"] | ||
320 | ) | ||
321 | if upstream_data is not None: | ||
322 | unihash = upstream_data["unihash"] | ||
323 | |||
324 | await self.db.insert_unihash(data["method"], data["taskhash"], unihash) | ||
325 | |||
326 | unihash_data = await self.get_unihash(data["method"], data["taskhash"]) | ||
327 | if unihash_data is not None: | ||
328 | unihash = unihash_data["unihash"] | ||
329 | else: | ||
330 | unihash = data["unihash"] | ||
464 | 331 | ||
465 | return d | 332 | return { |
333 | "taskhash": data["taskhash"], | ||
334 | "method": data["method"], | ||
335 | "unihash": unihash, | ||
336 | } | ||
466 | 337 | ||
467 | async def handle_equivreport(self, data): | 338 | async def handle_equivreport(self, data): |
468 | with closing(self.db.cursor()) as cursor: | 339 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
469 | insert_data = { | 340 | |
470 | 'method': data['method'], | 341 | # Fetch the unihash that will be reported for the taskhash. If the |
471 | 'taskhash': data['taskhash'], | 342 | # unihash matches, it means this row was inserted (or the mapping |
472 | 'unihash': data['unihash'], | 343 | # was already valid) |
473 | } | 344 | row = await self.db.get_equivalent(data["method"], data["taskhash"]) |
474 | insert_unihash(cursor, insert_data, Resolve.IGNORE) | 345 | |
475 | self.db.commit() | 346 | if row["unihash"] == data["unihash"]: |
476 | 347 | self.logger.info( | |
477 | # Fetch the unihash that will be reported for the taskhash. If the | 348 | "Adding taskhash equivalence for %s with unihash %s", |
478 | # unihash matches, it means this row was inserted (or the mapping | 349 | data["taskhash"], |
479 | # was already valid) | 350 | row["unihash"], |
480 | row = self.query_equivalent(cursor, data['method'], data['taskhash']) | 351 | ) |
481 | |||
482 | if row['unihash'] == data['unihash']: | ||
483 | self.logger.info('Adding taskhash equivalence for %s with unihash %s', | ||
484 | data['taskhash'], row['unihash']) | ||
485 | |||
486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | ||
487 | |||
488 | return d | ||
489 | 352 | ||
353 | return {k: row[k] for k in ("taskhash", "method", "unihash")} | ||
490 | 354 | ||
491 | async def handle_get_stats(self, request): | 355 | async def handle_get_stats(self, request): |
492 | return { | 356 | return { |
493 | 'requests': self.request_stats.todict(), | 357 | "requests": self.request_stats.todict(), |
494 | } | 358 | } |
495 | 359 | ||
496 | async def handle_reset_stats(self, request): | 360 | async def handle_reset_stats(self, request): |
497 | d = { | 361 | d = { |
498 | 'requests': self.request_stats.todict(), | 362 | "requests": self.request_stats.todict(), |
499 | } | 363 | } |
500 | 364 | ||
501 | self.request_stats.reset() | 365 | self.request_stats.reset() |
@@ -503,7 +367,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
503 | 367 | ||
504 | async def handle_backfill_wait(self, request): | 368 | async def handle_backfill_wait(self, request): |
505 | d = { | 369 | d = { |
506 | 'tasks': self.backfill_queue.qsize(), | 370 | "tasks": self.backfill_queue.qsize(), |
507 | } | 371 | } |
508 | await self.backfill_queue.join() | 372 | await self.backfill_queue.join() |
509 | return d | 373 | return d |
@@ -513,92 +377,63 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
513 | if not isinstance(condition, dict): | 377 | if not isinstance(condition, dict): |
514 | raise TypeError("Bad condition type %s" % type(condition)) | 378 | raise TypeError("Bad condition type %s" % type(condition)) |
515 | 379 | ||
516 | def do_remove(columns, table_name, cursor): | 380 | return {"count": await self.db.remove(condition)} |
517 | nonlocal condition | ||
518 | where = {} | ||
519 | for c in columns: | ||
520 | if c in condition and condition[c] is not None: | ||
521 | where[c] = condition[c] | ||
522 | |||
523 | if where: | ||
524 | query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys()) | ||
525 | cursor.execute(query, where) | ||
526 | return cursor.rowcount | ||
527 | |||
528 | return 0 | ||
529 | |||
530 | count = 0 | ||
531 | with closing(self.db.cursor()) as cursor: | ||
532 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) | ||
533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | ||
534 | self.db.commit() | ||
535 | |||
536 | return {"count": count} | ||
537 | 381 | ||
538 | async def handle_clean_unused(self, request): | 382 | async def handle_clean_unused(self, request): |
539 | max_age = request["max_age_seconds"] | 383 | max_age = request["max_age_seconds"] |
540 | with closing(self.db.cursor()) as cursor: | 384 | oldest = datetime.now() - timedelta(seconds=-max_age) |
541 | cursor.execute( | 385 | return {"count": await self.db.clean_unused(oldest)} |
542 | """ | ||
543 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( | ||
544 | SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 | ||
545 | ) | ||
546 | """, | ||
547 | { | ||
548 | "oldest": datetime.now() - timedelta(seconds=-max_age) | ||
549 | } | ||
550 | ) | ||
551 | count = cursor.rowcount | ||
552 | |||
553 | return {"count": count} | ||
554 | |||
555 | def query_equivalent(self, cursor, method, taskhash): | ||
556 | # This is part of the inner loop and must be as fast as possible | ||
557 | cursor.execute( | ||
558 | 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', | ||
559 | { | ||
560 | 'method': method, | ||
561 | 'taskhash': taskhash, | ||
562 | } | ||
563 | ) | ||
564 | return cursor.fetchone() | ||
565 | 386 | ||
566 | 387 | ||
567 | class Server(bb.asyncrpc.AsyncServer): | 388 | class Server(bb.asyncrpc.AsyncServer): |
568 | def __init__(self, db, upstream=None, read_only=False): | 389 | def __init__(self, db_engine, upstream=None, read_only=False): |
569 | if upstream and read_only: | 390 | if upstream and read_only: |
570 | raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") | 391 | raise bb.asyncrpc.ServerError( |
392 | "Read-only hashserv cannot pull from an upstream server" | ||
393 | ) | ||
571 | 394 | ||
572 | super().__init__(logger) | 395 | super().__init__(logger) |
573 | 396 | ||
574 | self.request_stats = Stats() | 397 | self.request_stats = Stats() |
575 | self.db = db | 398 | self.db_engine = db_engine |
576 | self.upstream = upstream | 399 | self.upstream = upstream |
577 | self.read_only = read_only | 400 | self.read_only = read_only |
578 | self.backfill_queue = None | 401 | self.backfill_queue = None |
579 | 402 | ||
580 | def accept_client(self, socket): | 403 | def accept_client(self, socket): |
581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 404 | return ServerClient( |
405 | socket, | ||
406 | self.db_engine, | ||
407 | self.request_stats, | ||
408 | self.backfill_queue, | ||
409 | self.upstream, | ||
410 | self.read_only, | ||
411 | ) | ||
582 | 412 | ||
583 | async def backfill_worker_task(self): | 413 | async def backfill_worker_task(self): |
584 | client = await create_async_client(self.upstream) | 414 | async with await create_async_client( |
585 | try: | 415 | self.upstream |
416 | ) as client, self.db_engine.connect(logger) as db: | ||
586 | while True: | 417 | while True: |
587 | item = await self.backfill_queue.get() | 418 | item = await self.backfill_queue.get() |
588 | if item is None: | 419 | if item is None: |
589 | self.backfill_queue.task_done() | 420 | self.backfill_queue.task_done() |
590 | break | 421 | break |
422 | |||
591 | method, taskhash = item | 423 | method, taskhash = item |
592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | 424 | d = await client.get_taskhash(method, taskhash) |
425 | if d is not None: | ||
426 | await db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) | ||
593 | self.backfill_queue.task_done() | 427 | self.backfill_queue.task_done() |
594 | finally: | ||
595 | await client.close() | ||
596 | 428 | ||
597 | def start(self): | 429 | def start(self): |
598 | tasks = super().start() | 430 | tasks = super().start() |
599 | if self.upstream: | 431 | if self.upstream: |
600 | self.backfill_queue = asyncio.Queue() | 432 | self.backfill_queue = asyncio.Queue() |
601 | tasks += [self.backfill_worker_task()] | 433 | tasks += [self.backfill_worker_task()] |
434 | |||
435 | self.loop.run_until_complete(self.db_engine.create()) | ||
436 | |||
602 | return tasks | 437 | return tasks |
603 | 438 | ||
604 | async def stop(self): | 439 | async def stop(self): |