diff options
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r-- | bitbake/lib/hashserv/server.py | 149 |
1 files changed, 119 insertions, 30 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 81050715ea..3ff4c51ccb 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -3,7 +3,7 @@ | |||
3 | # SPDX-License-Identifier: GPL-2.0-only | 3 | # SPDX-License-Identifier: GPL-2.0-only |
4 | # | 4 | # |
5 | 5 | ||
6 | from contextlib import closing | 6 | from contextlib import closing, contextmanager |
7 | from datetime import datetime | 7 | from datetime import datetime |
8 | import asyncio | 8 | import asyncio |
9 | import json | 9 | import json |
@@ -12,8 +12,9 @@ import math | |||
12 | import os | 12 | import os |
13 | import signal | 13 | import signal |
14 | import socket | 14 | import socket |
15 | import sys | ||
15 | import time | 16 | import time |
16 | from . import chunkify, DEFAULT_MAX_CHUNK | 17 | from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS |
17 | 18 | ||
18 | logger = logging.getLogger('hashserv.server') | 19 | logger = logging.getLogger('hashserv.server') |
19 | 20 | ||
@@ -111,16 +112,40 @@ class Stats(object): | |||
111 | class ClientError(Exception): | 112 | class ClientError(Exception): |
112 | pass | 113 | pass |
113 | 114 | ||
115 | def insert_task(cursor, data, ignore=False): | ||
116 | keys = sorted(data.keys()) | ||
117 | query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( | ||
118 | " OR IGNORE" if ignore else "", | ||
119 | ', '.join(keys), | ||
120 | ', '.join(':' + k for k in keys)) | ||
121 | cursor.execute(query, data) | ||
122 | |||
123 | async def copy_from_upstream(client, db, method, taskhash): | ||
124 | d = await client.get_taskhash(method, taskhash, True) | ||
125 | if d is not None: | ||
126 | # Filter out unknown columns | ||
127 | d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} | ||
128 | keys = sorted(d.keys()) | ||
129 | |||
130 | |||
131 | with closing(db.cursor()) as cursor: | ||
132 | insert_task(cursor, d) | ||
133 | db.commit() | ||
134 | |||
135 | return d | ||
136 | |||
114 | class ServerClient(object): | 137 | class ServerClient(object): |
115 | FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' | 138 | FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' |
116 | ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' | 139 | ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' |
117 | 140 | ||
118 | def __init__(self, reader, writer, db, request_stats): | 141 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream): |
119 | self.reader = reader | 142 | self.reader = reader |
120 | self.writer = writer | 143 | self.writer = writer |
121 | self.db = db | 144 | self.db = db |
122 | self.request_stats = request_stats | 145 | self.request_stats = request_stats |
123 | self.max_chunk = DEFAULT_MAX_CHUNK | 146 | self.max_chunk = DEFAULT_MAX_CHUNK |
147 | self.backfill_queue = backfill_queue | ||
148 | self.upstream = upstream | ||
124 | 149 | ||
125 | self.handlers = { | 150 | self.handlers = { |
126 | 'get': self.handle_get, | 151 | 'get': self.handle_get, |
@@ -130,10 +155,18 @@ class ServerClient(object): | |||
130 | 'get-stats': self.handle_get_stats, | 155 | 'get-stats': self.handle_get_stats, |
131 | 'reset-stats': self.handle_reset_stats, | 156 | 'reset-stats': self.handle_reset_stats, |
132 | 'chunk-stream': self.handle_chunk, | 157 | 'chunk-stream': self.handle_chunk, |
158 | 'backfill-wait': self.handle_backfill_wait, | ||
133 | } | 159 | } |
134 | 160 | ||
135 | async def process_requests(self): | 161 | async def process_requests(self): |
162 | if self.upstream is not None: | ||
163 | self.upstream_client = await create_async_client(self.upstream) | ||
164 | else: | ||
165 | self.upstream_client = None | ||
166 | |||
136 | try: | 167 | try: |
168 | |||
169 | |||
137 | self.addr = self.writer.get_extra_info('peername') | 170 | self.addr = self.writer.get_extra_info('peername') |
138 | logger.debug('Client %r connected' % (self.addr,)) | 171 | logger.debug('Client %r connected' % (self.addr,)) |
139 | 172 | ||
@@ -171,6 +204,9 @@ class ServerClient(object): | |||
171 | except ClientError as e: | 204 | except ClientError as e: |
172 | logger.error(str(e)) | 205 | logger.error(str(e)) |
173 | finally: | 206 | finally: |
207 | if self.upstream_client is not None: | ||
208 | await self.upstream_client.close() | ||
209 | |||
174 | self.writer.close() | 210 | self.writer.close() |
175 | 211 | ||
176 | async def dispatch_message(self, msg): | 212 | async def dispatch_message(self, msg): |
@@ -239,15 +275,19 @@ class ServerClient(object): | |||
239 | if row is not None: | 275 | if row is not None: |
240 | logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 276 | logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
241 | d = {k: row[k] for k in row.keys()} | 277 | d = {k: row[k] for k in row.keys()} |
242 | 278 | elif self.upstream_client is not None: | |
243 | self.write_message(d) | 279 | d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash) |
244 | else: | 280 | else: |
245 | self.write_message(None) | 281 | d = None |
282 | |||
283 | self.write_message(d) | ||
246 | 284 | ||
247 | async def handle_get_stream(self, request): | 285 | async def handle_get_stream(self, request): |
248 | self.write_message('ok') | 286 | self.write_message('ok') |
249 | 287 | ||
250 | while True: | 288 | while True: |
289 | upstream = None | ||
290 | |||
251 | l = await self.reader.readline() | 291 | l = await self.reader.readline() |
252 | if not l: | 292 | if not l: |
253 | return | 293 | return |
@@ -272,6 +312,12 @@ class ServerClient(object): | |||
272 | if row is not None: | 312 | if row is not None: |
273 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 313 | msg = ('%s\n' % row['unihash']).encode('utf-8') |
274 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 314 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
315 | elif self.upstream_client is not None: | ||
316 | upstream = await self.upstream_client.get_unihash(method, taskhash) | ||
317 | if upstream: | ||
318 | msg = ("%s\n" % upstream).encode("utf-8") | ||
319 | else: | ||
320 | msg = "\n".encode("utf-8") | ||
275 | else: | 321 | else: |
276 | msg = '\n'.encode('utf-8') | 322 | msg = '\n'.encode('utf-8') |
277 | 323 | ||
@@ -282,6 +328,11 @@ class ServerClient(object): | |||
282 | 328 | ||
283 | await self.writer.drain() | 329 | await self.writer.drain() |
284 | 330 | ||
331 | # Post to the backfill queue after writing the result to minimize | ||
332 | # the turn around time on a request | ||
333 | if upstream is not None: | ||
334 | await self.backfill_queue.put((method, taskhash)) | ||
335 | |||
285 | async def handle_report(self, data): | 336 | async def handle_report(self, data): |
286 | with closing(self.db.cursor()) as cursor: | 337 | with closing(self.db.cursor()) as cursor: |
287 | cursor.execute(''' | 338 | cursor.execute(''' |
@@ -324,11 +375,7 @@ class ServerClient(object): | |||
324 | if k in data: | 375 | if k in data: |
325 | insert_data[k] = data[k] | 376 | insert_data[k] = data[k] |
326 | 377 | ||
327 | cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % ( | 378 | insert_task(cursor, insert_data) |
328 | ', '.join(sorted(insert_data.keys())), | ||
329 | ', '.join(':' + k for k in sorted(insert_data.keys()))), | ||
330 | insert_data) | ||
331 | |||
332 | self.db.commit() | 379 | self.db.commit() |
333 | 380 | ||
334 | logger.info('Adding taskhash %s with unihash %s', | 381 | logger.info('Adding taskhash %s with unihash %s', |
@@ -358,11 +405,7 @@ class ServerClient(object): | |||
358 | if k in data: | 405 | if k in data: |
359 | insert_data[k] = data[k] | 406 | insert_data[k] = data[k] |
360 | 407 | ||
361 | cursor.execute('''INSERT OR IGNORE INTO tasks_v2 (%s) VALUES (%s)''' % ( | 408 | insert_task(cursor, insert_data, ignore=True) |
362 | ', '.join(sorted(insert_data.keys())), | ||
363 | ', '.join(':' + k for k in sorted(insert_data.keys()))), | ||
364 | insert_data) | ||
365 | |||
366 | self.db.commit() | 409 | self.db.commit() |
367 | 410 | ||
368 | # Fetch the unihash that will be reported for the taskhash. If the | 411 | # Fetch the unihash that will be reported for the taskhash. If the |
@@ -394,6 +437,13 @@ class ServerClient(object): | |||
394 | self.request_stats.reset() | 437 | self.request_stats.reset() |
395 | self.write_message(d) | 438 | self.write_message(d) |
396 | 439 | ||
440 | async def handle_backfill_wait(self, request): | ||
441 | d = { | ||
442 | 'tasks': self.backfill_queue.qsize(), | ||
443 | } | ||
444 | await self.backfill_queue.join() | ||
445 | self.write_message(d) | ||
446 | |||
397 | def query_equivalent(self, method, taskhash, query): | 447 | def query_equivalent(self, method, taskhash, query): |
398 | # This is part of the inner loop and must be as fast as possible | 448 | # This is part of the inner loop and must be as fast as possible |
399 | try: | 449 | try: |
@@ -405,7 +455,7 @@ class ServerClient(object): | |||
405 | 455 | ||
406 | 456 | ||
407 | class Server(object): | 457 | class Server(object): |
408 | def __init__(self, db, loop=None): | 458 | def __init__(self, db, loop=None, upstream=None): |
409 | self.request_stats = Stats() | 459 | self.request_stats = Stats() |
410 | self.db = db | 460 | self.db = db |
411 | 461 | ||
@@ -416,6 +466,8 @@ class Server(object): | |||
416 | self.loop = loop | 466 | self.loop = loop |
417 | self.close_loop = False | 467 | self.close_loop = False |
418 | 468 | ||
469 | self.upstream = upstream | ||
470 | |||
419 | self._cleanup_socket = None | 471 | self._cleanup_socket = None |
420 | 472 | ||
421 | def start_tcp_server(self, host, port): | 473 | def start_tcp_server(self, host, port): |
@@ -458,7 +510,7 @@ class Server(object): | |||
458 | async def handle_client(self, reader, writer): | 510 | async def handle_client(self, reader, writer): |
459 | # writer.transport.set_write_buffer_limits(0) | 511 | # writer.transport.set_write_buffer_limits(0) |
460 | try: | 512 | try: |
461 | client = ServerClient(reader, writer, self.db, self.request_stats) | 513 | client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream) |
462 | await client.process_requests() | 514 | await client.process_requests() |
463 | except Exception as e: | 515 | except Exception as e: |
464 | import traceback | 516 | import traceback |
@@ -467,23 +519,60 @@ class Server(object): | |||
467 | writer.close() | 519 | writer.close() |
468 | logger.info('Client disconnected') | 520 | logger.info('Client disconnected') |
469 | 521 | ||
522 | @contextmanager | ||
523 | def _backfill_worker(self): | ||
524 | async def backfill_worker_task(): | ||
525 | client = await create_async_client(self.upstream) | ||
526 | try: | ||
527 | while True: | ||
528 | item = await self.backfill_queue.get() | ||
529 | if item is None: | ||
530 | self.backfill_queue.task_done() | ||
531 | break | ||
532 | method, taskhash = item | ||
533 | await copy_from_upstream(client, self.db, method, taskhash) | ||
534 | self.backfill_queue.task_done() | ||
535 | finally: | ||
536 | await client.close() | ||
537 | |||
538 | async def join_worker(worker): | ||
539 | await self.backfill_queue.put(None) | ||
540 | await worker | ||
541 | |||
542 | if self.upstream is not None: | ||
543 | worker = asyncio.ensure_future(backfill_worker_task()) | ||
544 | try: | ||
545 | yield | ||
546 | finally: | ||
547 | self.loop.run_until_complete(join_worker(worker)) | ||
548 | else: | ||
549 | yield | ||
550 | |||
470 | def serve_forever(self): | 551 | def serve_forever(self): |
471 | def signal_handler(): | 552 | def signal_handler(): |
472 | self.loop.stop() | 553 | self.loop.stop() |
473 | 554 | ||
474 | self.loop.add_signal_handler(signal.SIGTERM, signal_handler) | 555 | asyncio.set_event_loop(self.loop) |
475 | |||
476 | try: | 556 | try: |
477 | self.loop.run_forever() | 557 | self.backfill_queue = asyncio.Queue() |
478 | except KeyboardInterrupt: | 558 | |
479 | pass | 559 | self.loop.add_signal_handler(signal.SIGTERM, signal_handler) |
480 | 560 | ||
481 | self.server.close() | 561 | with self._backfill_worker(): |
482 | self.loop.run_until_complete(self.server.wait_closed()) | 562 | try: |
483 | logger.info('Server shutting down') | 563 | self.loop.run_forever() |
564 | except KeyboardInterrupt: | ||
565 | pass | ||
484 | 566 | ||
485 | if self.close_loop: | 567 | self.server.close() |
486 | self.loop.close() | 568 | |
569 | self.loop.run_until_complete(self.server.wait_closed()) | ||
570 | logger.info('Server shutting down') | ||
571 | finally: | ||
572 | if self.close_loop: | ||
573 | if sys.version_info >= (3, 6): | ||
574 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
575 | self.loop.close() | ||
487 | 576 | ||
488 | if self._cleanup_socket is not None: | 577 | if self._cleanup_socket is not None: |
489 | self._cleanup_socket() | 578 | self._cleanup_socket() |