diff options
| author | Joshua Watt <JPEWhacker@gmail.com> | 2023-11-03 08:26:25 -0600 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2023-11-09 17:33:02 +0000 |
| commit | baa3e5391daf41b6dd6e914a112abb00d3517da1 (patch) | |
| tree | 8280b21caf78db2783f586250776c4813f24cb87 /bitbake/lib/hashserv/server.py | |
| parent | e90fccfefd7693d8cdfa731fa7e170c8bd4b1a1b (diff) | |
| download | poky-baa3e5391daf41b6dd6e914a112abb00d3517da1.tar.gz | |
bitbake: hashserv: Abstract database
Abstracts the way the database backend is accessed by the hash
equivalence server to make it possible to use other backends
(Bitbake rev: 04b53deacf857488408bc82b9890b1e19874b5f1)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 491 |
1 files changed, 163 insertions, 328 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index e6a3f40577..84cf4f2283 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -3,18 +3,16 @@ | |||
| 3 | # SPDX-License-Identifier: GPL-2.0-only | 3 | # SPDX-License-Identifier: GPL-2.0-only |
| 4 | # | 4 | # |
| 5 | 5 | ||
| 6 | from contextlib import closing, contextmanager | ||
| 7 | from datetime import datetime, timedelta | 6 | from datetime import datetime, timedelta |
| 8 | import enum | ||
| 9 | import asyncio | 7 | import asyncio |
| 10 | import logging | 8 | import logging |
| 11 | import math | 9 | import math |
| 12 | import time | 10 | import time |
| 13 | from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS | 11 | from . import create_async_client |
| 14 | import bb.asyncrpc | 12 | import bb.asyncrpc |
| 15 | 13 | ||
| 16 | 14 | ||
| 17 | logger = logging.getLogger('hashserv.server') | 15 | logger = logging.getLogger("hashserv.server") |
| 18 | 16 | ||
| 19 | 17 | ||
| 20 | class Measurement(object): | 18 | class Measurement(object): |
| @@ -104,229 +102,136 @@ class Stats(object): | |||
| 104 | return math.sqrt(self.s / (self.num - 1)) | 102 | return math.sqrt(self.s / (self.num - 1)) |
| 105 | 103 | ||
| 106 | def todict(self): | 104 | def todict(self): |
| 107 | return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} | 105 | return { |
| 108 | 106 | k: getattr(self, k) | |
| 109 | 107 | for k in ("num", "total_time", "max_time", "average", "stdev") | |
| 110 | @enum.unique | 108 | } |
| 111 | class Resolve(enum.Enum): | ||
| 112 | FAIL = enum.auto() | ||
| 113 | IGNORE = enum.auto() | ||
| 114 | REPLACE = enum.auto() | ||
| 115 | |||
| 116 | |||
| 117 | def insert_table(cursor, table, data, on_conflict): | ||
| 118 | resolve = { | ||
| 119 | Resolve.FAIL: "", | ||
| 120 | Resolve.IGNORE: " OR IGNORE", | ||
| 121 | Resolve.REPLACE: " OR REPLACE", | ||
| 122 | }[on_conflict] | ||
| 123 | |||
| 124 | keys = sorted(data.keys()) | ||
| 125 | query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( | ||
| 126 | resolve=resolve, | ||
| 127 | table=table, | ||
| 128 | fields=", ".join(keys), | ||
| 129 | values=", ".join(":" + k for k in keys), | ||
| 130 | ) | ||
| 131 | prevrowid = cursor.lastrowid | ||
| 132 | cursor.execute(query, data) | ||
| 133 | logging.debug( | ||
| 134 | "Inserting %r into %s, %s", | ||
| 135 | data, | ||
| 136 | table, | ||
| 137 | on_conflict | ||
| 138 | ) | ||
| 139 | return (cursor.lastrowid, cursor.lastrowid != prevrowid) | ||
| 140 | |||
| 141 | def insert_unihash(cursor, data, on_conflict): | ||
| 142 | return insert_table(cursor, "unihashes_v2", data, on_conflict) | ||
| 143 | |||
| 144 | def insert_outhash(cursor, data, on_conflict): | ||
| 145 | return insert_table(cursor, "outhashes_v2", data, on_conflict) | ||
| 146 | |||
| 147 | async def copy_unihash_from_upstream(client, db, method, taskhash): | ||
| 148 | d = await client.get_taskhash(method, taskhash) | ||
| 149 | if d is not None: | ||
| 150 | with closing(db.cursor()) as cursor: | ||
| 151 | insert_unihash( | ||
| 152 | cursor, | ||
| 153 | {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
| 154 | Resolve.IGNORE, | ||
| 155 | ) | ||
| 156 | db.commit() | ||
| 157 | return d | ||
| 158 | |||
| 159 | |||
| 160 | class ServerCursor(object): | ||
| 161 | def __init__(self, db, cursor, upstream): | ||
| 162 | self.db = db | ||
| 163 | self.cursor = cursor | ||
| 164 | self.upstream = upstream | ||
| 165 | 109 | ||
| 166 | 110 | ||
| 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 111 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
| 168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): | 112 | def __init__( |
| 169 | super().__init__(socket, 'OEHASHEQUIV', logger) | 113 | self, |
| 170 | self.db = db | 114 | socket, |
| 115 | db_engine, | ||
| 116 | request_stats, | ||
| 117 | backfill_queue, | ||
| 118 | upstream, | ||
| 119 | read_only, | ||
| 120 | ): | ||
| 121 | super().__init__(socket, "OEHASHEQUIV", logger) | ||
| 122 | self.db_engine = db_engine | ||
| 171 | self.request_stats = request_stats | 123 | self.request_stats = request_stats |
| 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 124 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
| 173 | self.backfill_queue = backfill_queue | 125 | self.backfill_queue = backfill_queue |
| 174 | self.upstream = upstream | 126 | self.upstream = upstream |
| 175 | 127 | ||
| 176 | self.handlers.update({ | 128 | self.handlers.update( |
| 177 | 'get': self.handle_get, | 129 | { |
| 178 | 'get-outhash': self.handle_get_outhash, | 130 | "get": self.handle_get, |
| 179 | 'get-stream': self.handle_get_stream, | 131 | "get-outhash": self.handle_get_outhash, |
| 180 | 'get-stats': self.handle_get_stats, | 132 | "get-stream": self.handle_get_stream, |
| 181 | }) | 133 | "get-stats": self.handle_get_stats, |
| 134 | } | ||
| 135 | ) | ||
| 182 | 136 | ||
| 183 | if not read_only: | 137 | if not read_only: |
| 184 | self.handlers.update({ | 138 | self.handlers.update( |
| 185 | 'report': self.handle_report, | 139 | { |
| 186 | 'report-equiv': self.handle_equivreport, | 140 | "report": self.handle_report, |
| 187 | 'reset-stats': self.handle_reset_stats, | 141 | "report-equiv": self.handle_equivreport, |
| 188 | 'backfill-wait': self.handle_backfill_wait, | 142 | "reset-stats": self.handle_reset_stats, |
| 189 | 'remove': self.handle_remove, | 143 | "backfill-wait": self.handle_backfill_wait, |
| 190 | 'clean-unused': self.handle_clean_unused, | 144 | "remove": self.handle_remove, |
| 191 | }) | 145 | "clean-unused": self.handle_clean_unused, |
| 146 | } | ||
| 147 | ) | ||
| 192 | 148 | ||
| 193 | def validate_proto_version(self): | 149 | def validate_proto_version(self): |
| 194 | return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) | 150 | return self.proto_version > (1, 0) and self.proto_version <= (1, 1) |
| 195 | 151 | ||
| 196 | async def process_requests(self): | 152 | async def process_requests(self): |
| 197 | if self.upstream is not None: | 153 | async with self.db_engine.connect(self.logger) as db: |
| 198 | self.upstream_client = await create_async_client(self.upstream) | 154 | self.db = db |
| 199 | else: | 155 | if self.upstream is not None: |
| 200 | self.upstream_client = None | 156 | self.upstream_client = await create_async_client(self.upstream) |
| 201 | 157 | else: | |
| 202 | await super().process_requests() | 158 | self.upstream_client = None |
| 203 | 159 | ||
| 204 | if self.upstream_client is not None: | 160 | try: |
| 205 | await self.upstream_client.close() | 161 | await super().process_requests() |
| 162 | finally: | ||
| 163 | if self.upstream_client is not None: | ||
| 164 | await self.upstream_client.close() | ||
| 206 | 165 | ||
| 207 | async def dispatch_message(self, msg): | 166 | async def dispatch_message(self, msg): |
| 208 | for k in self.handlers.keys(): | 167 | for k in self.handlers.keys(): |
| 209 | if k in msg: | 168 | if k in msg: |
| 210 | self.logger.debug('Handling %s' % k) | 169 | self.logger.debug("Handling %s" % k) |
| 211 | if 'stream' in k: | 170 | if "stream" in k: |
| 212 | return await self.handlers[k](msg[k]) | 171 | return await self.handlers[k](msg[k]) |
| 213 | else: | 172 | else: |
| 214 | with self.request_stats.start_sample() as self.request_sample, \ | 173 | with self.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): |
| 215 | self.request_sample.measure(): | ||
| 216 | return await self.handlers[k](msg[k]) | 174 | return await self.handlers[k](msg[k]) |
| 217 | 175 | ||
| 218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 176 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
| 219 | 177 | ||
| 220 | async def handle_get(self, request): | 178 | async def handle_get(self, request): |
| 221 | method = request['method'] | 179 | method = request["method"] |
| 222 | taskhash = request['taskhash'] | 180 | taskhash = request["taskhash"] |
| 223 | fetch_all = request.get('all', False) | 181 | fetch_all = request.get("all", False) |
| 224 | 182 | ||
| 225 | with closing(self.db.cursor()) as cursor: | 183 | return await self.get_unihash(method, taskhash, fetch_all) |
| 226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) | ||
| 227 | 184 | ||
| 228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 185 | async def get_unihash(self, method, taskhash, fetch_all=False): |
| 229 | d = None | 186 | d = None |
| 230 | 187 | ||
| 231 | if fetch_all: | 188 | if fetch_all: |
| 232 | cursor.execute( | 189 | row = await self.db.get_unihash_by_taskhash_full(method, taskhash) |
| 233 | ''' | ||
| 234 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
| 235 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
| 236 | WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash | ||
| 237 | ORDER BY outhashes_v2.created ASC | ||
| 238 | LIMIT 1 | ||
| 239 | ''', | ||
| 240 | { | ||
| 241 | 'method': method, | ||
| 242 | 'taskhash': taskhash, | ||
| 243 | } | ||
| 244 | |||
| 245 | ) | ||
| 246 | row = cursor.fetchone() | ||
| 247 | |||
| 248 | if row is not None: | 190 | if row is not None: |
| 249 | d = {k: row[k] for k in row.keys()} | 191 | d = {k: row[k] for k in row.keys()} |
| 250 | elif self.upstream_client is not None: | 192 | elif self.upstream_client is not None: |
| 251 | d = await self.upstream_client.get_taskhash(method, taskhash, True) | 193 | d = await self.upstream_client.get_taskhash(method, taskhash, True) |
| 252 | self.update_unified(cursor, d) | 194 | await self.update_unified(d) |
| 253 | self.db.commit() | ||
| 254 | else: | 195 | else: |
| 255 | row = self.query_equivalent(cursor, method, taskhash) | 196 | row = await self.db.get_equivalent(method, taskhash) |
| 256 | 197 | ||
| 257 | if row is not None: | 198 | if row is not None: |
| 258 | d = {k: row[k] for k in row.keys()} | 199 | d = {k: row[k] for k in row.keys()} |
| 259 | elif self.upstream_client is not None: | 200 | elif self.upstream_client is not None: |
| 260 | d = await self.upstream_client.get_taskhash(method, taskhash) | 201 | d = await self.upstream_client.get_taskhash(method, taskhash) |
| 261 | d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} | 202 | await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) |
| 262 | insert_unihash(cursor, d, Resolve.IGNORE) | ||
| 263 | self.db.commit() | ||
| 264 | 203 | ||
| 265 | return d | 204 | return d |
| 266 | 205 | ||
| 267 | async def handle_get_outhash(self, request): | 206 | async def handle_get_outhash(self, request): |
| 268 | method = request['method'] | 207 | method = request["method"] |
| 269 | outhash = request['outhash'] | 208 | outhash = request["outhash"] |
| 270 | taskhash = request['taskhash'] | 209 | taskhash = request["taskhash"] |
| 271 | with_unihash = request.get("with_unihash", True) | 210 | with_unihash = request.get("with_unihash", True) |
| 272 | 211 | ||
| 273 | with closing(self.db.cursor()) as cursor: | 212 | return await self.get_outhash(method, outhash, taskhash, with_unihash) |
| 274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | ||
| 275 | 213 | ||
| 276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 214 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
| 277 | d = None | 215 | d = None |
| 278 | if with_unihash: | 216 | if with_unihash: |
| 279 | cursor.execute( | 217 | row = await self.db.get_unihash_by_outhash(method, outhash) |
| 280 | ''' | ||
| 281 | SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
| 282 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
| 283 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
| 284 | ORDER BY outhashes_v2.created ASC | ||
| 285 | LIMIT 1 | ||
| 286 | ''', | ||
| 287 | { | ||
| 288 | 'method': method, | ||
| 289 | 'outhash': outhash, | ||
| 290 | } | ||
| 291 | ) | ||
| 292 | else: | 218 | else: |
| 293 | cursor.execute( | 219 | row = await self.db.get_outhash(method, outhash) |
| 294 | """ | ||
| 295 | SELECT * FROM outhashes_v2 | ||
| 296 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash | ||
| 297 | ORDER BY outhashes_v2.created ASC | ||
| 298 | LIMIT 1 | ||
| 299 | """, | ||
| 300 | { | ||
| 301 | 'method': method, | ||
| 302 | 'outhash': outhash, | ||
| 303 | } | ||
| 304 | ) | ||
| 305 | row = cursor.fetchone() | ||
| 306 | 220 | ||
| 307 | if row is not None: | 221 | if row is not None: |
| 308 | d = {k: row[k] for k in row.keys()} | 222 | d = {k: row[k] for k in row.keys()} |
| 309 | elif self.upstream_client is not None: | 223 | elif self.upstream_client is not None: |
| 310 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) | 224 | d = await self.upstream_client.get_outhash(method, outhash, taskhash) |
| 311 | self.update_unified(cursor, d) | 225 | await self.update_unified(d) |
| 312 | self.db.commit() | ||
| 313 | 226 | ||
| 314 | return d | 227 | return d |
| 315 | 228 | ||
| 316 | def update_unified(self, cursor, data): | 229 | async def update_unified(self, data): |
| 317 | if data is None: | 230 | if data is None: |
| 318 | return | 231 | return |
| 319 | 232 | ||
| 320 | insert_unihash( | 233 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
| 321 | cursor, | 234 | await self.db.insert_outhash(data) |
| 322 | {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, | ||
| 323 | Resolve.IGNORE | ||
| 324 | ) | ||
| 325 | insert_outhash( | ||
| 326 | cursor, | ||
| 327 | {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, | ||
| 328 | Resolve.IGNORE | ||
| 329 | ) | ||
| 330 | 235 | ||
| 331 | async def handle_get_stream(self, request): | 236 | async def handle_get_stream(self, request): |
| 332 | await self.socket.send_message("ok") | 237 | await self.socket.send_message("ok") |
| @@ -347,20 +252,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 347 | request_measure = self.request_sample.measure() | 252 | request_measure = self.request_sample.measure() |
| 348 | request_measure.start() | 253 | request_measure.start() |
| 349 | 254 | ||
| 350 | if l == 'END': | 255 | if l == "END": |
| 351 | break | 256 | break |
| 352 | 257 | ||
| 353 | (method, taskhash) = l.split() | 258 | (method, taskhash) = l.split() |
| 354 | #self.logger.debug('Looking up %s %s' % (method, taskhash)) | 259 | # self.logger.debug('Looking up %s %s' % (method, taskhash)) |
| 355 | cursor = self.db.cursor() | 260 | row = await self.db.get_equivalent(method, taskhash) |
| 356 | try: | ||
| 357 | row = self.query_equivalent(cursor, method, taskhash) | ||
| 358 | finally: | ||
| 359 | cursor.close() | ||
| 360 | 261 | ||
| 361 | if row is not None: | 262 | if row is not None: |
| 362 | msg = row['unihash'] | 263 | msg = row["unihash"] |
| 363 | #self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 264 | # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
| 364 | elif self.upstream_client is not None: | 265 | elif self.upstream_client is not None: |
| 365 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 266 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
| 366 | if upstream: | 267 | if upstream: |
| @@ -384,118 +285,81 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 384 | return self.NO_RESPONSE | 285 | return self.NO_RESPONSE |
| 385 | 286 | ||
| 386 | async def handle_report(self, data): | 287 | async def handle_report(self, data): |
| 387 | with closing(self.db.cursor()) as cursor: | 288 | outhash_data = { |
| 388 | outhash_data = { | 289 | "method": data["method"], |
| 389 | 'method': data['method'], | 290 | "outhash": data["outhash"], |
| 390 | 'outhash': data['outhash'], | 291 | "taskhash": data["taskhash"], |
| 391 | 'taskhash': data['taskhash'], | 292 | "created": datetime.now(), |
| 392 | 'created': datetime.now() | 293 | } |
| 393 | } | ||
| 394 | 294 | ||
| 395 | for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): | 295 | for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"): |
| 396 | if k in data: | 296 | if k in data: |
| 397 | outhash_data[k] = data[k] | 297 | outhash_data[k] = data[k] |
| 398 | |||
| 399 | # Insert the new entry, unless it already exists | ||
| 400 | (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) | ||
| 401 | |||
| 402 | if inserted: | ||
| 403 | # If this row is new, check if it is equivalent to another | ||
| 404 | # output hash | ||
| 405 | cursor.execute( | ||
| 406 | ''' | ||
| 407 | SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 | ||
| 408 | INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash | ||
| 409 | -- Select any matching output hash except the one we just inserted | ||
| 410 | WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash | ||
| 411 | -- Pick the oldest hash | ||
| 412 | ORDER BY outhashes_v2.created ASC | ||
| 413 | LIMIT 1 | ||
| 414 | ''', | ||
| 415 | { | ||
| 416 | 'method': data['method'], | ||
| 417 | 'outhash': data['outhash'], | ||
| 418 | 'taskhash': data['taskhash'], | ||
| 419 | } | ||
| 420 | ) | ||
| 421 | row = cursor.fetchone() | ||
| 422 | 298 | ||
| 423 | if row is not None: | 299 | # Insert the new entry, unless it already exists |
| 424 | # A matching output hash was found. Set our taskhash to the | 300 | if await self.db.insert_outhash(outhash_data): |
| 425 | # same unihash since they are equivalent | 301 | # If this row is new, check if it is equivalent to another |
| 426 | unihash = row['unihash'] | 302 | # output hash |
| 427 | resolve = Resolve.IGNORE | 303 | row = await self.db.get_equivalent_for_outhash( |
| 428 | else: | 304 | data["method"], data["outhash"], data["taskhash"] |
| 429 | # No matching output hash was found. This is probably the | 305 | ) |
| 430 | # first outhash to be added. | ||
| 431 | unihash = data['unihash'] | ||
| 432 | resolve = Resolve.IGNORE | ||
| 433 | |||
| 434 | # Query upstream to see if it has a unihash we can use | ||
| 435 | if self.upstream_client is not None: | ||
| 436 | upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) | ||
| 437 | if upstream_data is not None: | ||
| 438 | unihash = upstream_data['unihash'] | ||
| 439 | |||
| 440 | |||
| 441 | insert_unihash( | ||
| 442 | cursor, | ||
| 443 | { | ||
| 444 | 'method': data['method'], | ||
| 445 | 'taskhash': data['taskhash'], | ||
| 446 | 'unihash': unihash, | ||
| 447 | }, | ||
| 448 | resolve | ||
| 449 | ) | ||
| 450 | |||
| 451 | unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) | ||
| 452 | if unihash_data is not None: | ||
| 453 | unihash = unihash_data['unihash'] | ||
| 454 | else: | ||
| 455 | unihash = data['unihash'] | ||
| 456 | |||
| 457 | self.db.commit() | ||
| 458 | 306 | ||
| 459 | d = { | 307 | if row is not None: |
| 460 | 'taskhash': data['taskhash'], | 308 | # A matching output hash was found. Set our taskhash to the |
| 461 | 'method': data['method'], | 309 | # same unihash since they are equivalent |
| 462 | 'unihash': unihash, | 310 | unihash = row["unihash"] |
| 463 | } | 311 | else: |
| 312 | # No matching output hash was found. This is probably the | ||
| 313 | # first outhash to be added. | ||
| 314 | unihash = data["unihash"] | ||
| 315 | |||
| 316 | # Query upstream to see if it has a unihash we can use | ||
| 317 | if self.upstream_client is not None: | ||
| 318 | upstream_data = await self.upstream_client.get_outhash( | ||
| 319 | data["method"], data["outhash"], data["taskhash"] | ||
| 320 | ) | ||
| 321 | if upstream_data is not None: | ||
| 322 | unihash = upstream_data["unihash"] | ||
| 323 | |||
| 324 | await self.db.insert_unihash(data["method"], data["taskhash"], unihash) | ||
| 325 | |||
| 326 | unihash_data = await self.get_unihash(data["method"], data["taskhash"]) | ||
| 327 | if unihash_data is not None: | ||
| 328 | unihash = unihash_data["unihash"] | ||
| 329 | else: | ||
| 330 | unihash = data["unihash"] | ||
| 464 | 331 | ||
| 465 | return d | 332 | return { |
| 333 | "taskhash": data["taskhash"], | ||
| 334 | "method": data["method"], | ||
| 335 | "unihash": unihash, | ||
| 336 | } | ||
| 466 | 337 | ||
| 467 | async def handle_equivreport(self, data): | 338 | async def handle_equivreport(self, data): |
| 468 | with closing(self.db.cursor()) as cursor: | 339 | await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) |
| 469 | insert_data = { | 340 | |
| 470 | 'method': data['method'], | 341 | # Fetch the unihash that will be reported for the taskhash. If the |
| 471 | 'taskhash': data['taskhash'], | 342 | # unihash matches, it means this row was inserted (or the mapping |
| 472 | 'unihash': data['unihash'], | 343 | # was already valid) |
| 473 | } | 344 | row = await self.db.get_equivalent(data["method"], data["taskhash"]) |
| 474 | insert_unihash(cursor, insert_data, Resolve.IGNORE) | 345 | |
| 475 | self.db.commit() | 346 | if row["unihash"] == data["unihash"]: |
| 476 | 347 | self.logger.info( | |
| 477 | # Fetch the unihash that will be reported for the taskhash. If the | 348 | "Adding taskhash equivalence for %s with unihash %s", |
| 478 | # unihash matches, it means this row was inserted (or the mapping | 349 | data["taskhash"], |
| 479 | # was already valid) | 350 | row["unihash"], |
| 480 | row = self.query_equivalent(cursor, data['method'], data['taskhash']) | 351 | ) |
| 481 | |||
| 482 | if row['unihash'] == data['unihash']: | ||
| 483 | self.logger.info('Adding taskhash equivalence for %s with unihash %s', | ||
| 484 | data['taskhash'], row['unihash']) | ||
| 485 | |||
| 486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | ||
| 487 | |||
| 488 | return d | ||
| 489 | 352 | ||
| 353 | return {k: row[k] for k in ("taskhash", "method", "unihash")} | ||
| 490 | 354 | ||
| 491 | async def handle_get_stats(self, request): | 355 | async def handle_get_stats(self, request): |
| 492 | return { | 356 | return { |
| 493 | 'requests': self.request_stats.todict(), | 357 | "requests": self.request_stats.todict(), |
| 494 | } | 358 | } |
| 495 | 359 | ||
| 496 | async def handle_reset_stats(self, request): | 360 | async def handle_reset_stats(self, request): |
| 497 | d = { | 361 | d = { |
| 498 | 'requests': self.request_stats.todict(), | 362 | "requests": self.request_stats.todict(), |
| 499 | } | 363 | } |
| 500 | 364 | ||
| 501 | self.request_stats.reset() | 365 | self.request_stats.reset() |
| @@ -503,7 +367,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 503 | 367 | ||
| 504 | async def handle_backfill_wait(self, request): | 368 | async def handle_backfill_wait(self, request): |
| 505 | d = { | 369 | d = { |
| 506 | 'tasks': self.backfill_queue.qsize(), | 370 | "tasks": self.backfill_queue.qsize(), |
| 507 | } | 371 | } |
| 508 | await self.backfill_queue.join() | 372 | await self.backfill_queue.join() |
| 509 | return d | 373 | return d |
| @@ -513,92 +377,63 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 513 | if not isinstance(condition, dict): | 377 | if not isinstance(condition, dict): |
| 514 | raise TypeError("Bad condition type %s" % type(condition)) | 378 | raise TypeError("Bad condition type %s" % type(condition)) |
| 515 | 379 | ||
| 516 | def do_remove(columns, table_name, cursor): | 380 | return {"count": await self.db.remove(condition)} |
| 517 | nonlocal condition | ||
| 518 | where = {} | ||
| 519 | for c in columns: | ||
| 520 | if c in condition and condition[c] is not None: | ||
| 521 | where[c] = condition[c] | ||
| 522 | |||
| 523 | if where: | ||
| 524 | query = ('DELETE FROM %s WHERE ' % table_name) + ' AND '.join("%s=:%s" % (k, k) for k in where.keys()) | ||
| 525 | cursor.execute(query, where) | ||
| 526 | return cursor.rowcount | ||
| 527 | |||
| 528 | return 0 | ||
| 529 | |||
| 530 | count = 0 | ||
| 531 | with closing(self.db.cursor()) as cursor: | ||
| 532 | count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) | ||
| 533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | ||
| 534 | self.db.commit() | ||
| 535 | |||
| 536 | return {"count": count} | ||
| 537 | 381 | ||
| 538 | async def handle_clean_unused(self, request): | 382 | async def handle_clean_unused(self, request): |
| 539 | max_age = request["max_age_seconds"] | 383 | max_age = request["max_age_seconds"] |
| 540 | with closing(self.db.cursor()) as cursor: | 384 | oldest = datetime.now() - timedelta(seconds=-max_age) |
| 541 | cursor.execute( | 385 | return {"count": await self.db.clean_unused(oldest)} |
| 542 | """ | ||
| 543 | DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( | ||
| 544 | SELECT unihashes_v2.id FROM unihashes_v2 WHERE unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash LIMIT 1 | ||
| 545 | ) | ||
| 546 | """, | ||
| 547 | { | ||
| 548 | "oldest": datetime.now() - timedelta(seconds=-max_age) | ||
| 549 | } | ||
| 550 | ) | ||
| 551 | count = cursor.rowcount | ||
| 552 | |||
| 553 | return {"count": count} | ||
| 554 | |||
| 555 | def query_equivalent(self, cursor, method, taskhash): | ||
| 556 | # This is part of the inner loop and must be as fast as possible | ||
| 557 | cursor.execute( | ||
| 558 | 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', | ||
| 559 | { | ||
| 560 | 'method': method, | ||
| 561 | 'taskhash': taskhash, | ||
| 562 | } | ||
| 563 | ) | ||
| 564 | return cursor.fetchone() | ||
| 565 | 386 | ||
| 566 | 387 | ||
| 567 | class Server(bb.asyncrpc.AsyncServer): | 388 | class Server(bb.asyncrpc.AsyncServer): |
| 568 | def __init__(self, db, upstream=None, read_only=False): | 389 | def __init__(self, db_engine, upstream=None, read_only=False): |
| 569 | if upstream and read_only: | 390 | if upstream and read_only: |
| 570 | raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") | 391 | raise bb.asyncrpc.ServerError( |
| 392 | "Read-only hashserv cannot pull from an upstream server" | ||
| 393 | ) | ||
| 571 | 394 | ||
| 572 | super().__init__(logger) | 395 | super().__init__(logger) |
| 573 | 396 | ||
| 574 | self.request_stats = Stats() | 397 | self.request_stats = Stats() |
| 575 | self.db = db | 398 | self.db_engine = db_engine |
| 576 | self.upstream = upstream | 399 | self.upstream = upstream |
| 577 | self.read_only = read_only | 400 | self.read_only = read_only |
| 578 | self.backfill_queue = None | 401 | self.backfill_queue = None |
| 579 | 402 | ||
| 580 | def accept_client(self, socket): | 403 | def accept_client(self, socket): |
| 581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 404 | return ServerClient( |
| 405 | socket, | ||
| 406 | self.db_engine, | ||
| 407 | self.request_stats, | ||
| 408 | self.backfill_queue, | ||
| 409 | self.upstream, | ||
| 410 | self.read_only, | ||
| 411 | ) | ||
| 582 | 412 | ||
| 583 | async def backfill_worker_task(self): | 413 | async def backfill_worker_task(self): |
| 584 | client = await create_async_client(self.upstream) | 414 | async with await create_async_client( |
| 585 | try: | 415 | self.upstream |
| 416 | ) as client, self.db_engine.connect(logger) as db: | ||
| 586 | while True: | 417 | while True: |
| 587 | item = await self.backfill_queue.get() | 418 | item = await self.backfill_queue.get() |
| 588 | if item is None: | 419 | if item is None: |
| 589 | self.backfill_queue.task_done() | 420 | self.backfill_queue.task_done() |
| 590 | break | 421 | break |
| 422 | |||
| 591 | method, taskhash = item | 423 | method, taskhash = item |
| 592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | 424 | d = await client.get_taskhash(method, taskhash) |
| 425 | if d is not None: | ||
| 426 | await db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) | ||
| 593 | self.backfill_queue.task_done() | 427 | self.backfill_queue.task_done() |
| 594 | finally: | ||
| 595 | await client.close() | ||
| 596 | 428 | ||
| 597 | def start(self): | 429 | def start(self): |
| 598 | tasks = super().start() | 430 | tasks = super().start() |
| 599 | if self.upstream: | 431 | if self.upstream: |
| 600 | self.backfill_queue = asyncio.Queue() | 432 | self.backfill_queue = asyncio.Queue() |
| 601 | tasks += [self.backfill_worker_task()] | 433 | tasks += [self.backfill_worker_task()] |
| 434 | |||
| 435 | self.loop.run_until_complete(self.db_engine.create()) | ||
| 436 | |||
| 602 | return tasks | 437 | return tasks |
| 603 | 438 | ||
| 604 | async def stop(self): | 439 | async def stop(self): |
