summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/server.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2020-11-10 08:59:56 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2020-11-24 15:26:12 +0000
commit96b548a79d87120655da3ac5501b8ad4726cf1a4 (patch)
tree06938cfe533173ad02664dc2f187867a165c5871 /bitbake/lib/hashserv/server.py
parent859f43e176dcaaa652e24a2289abd75e18c077cf (diff)
downloadpoky-96b548a79d87120655da3ac5501b8ad4726cf1a4.tar.gz
bitbake: bitbake: hashserve: Add support for readonly upstream
Adds support for an upstream server to be specified. The upstream server will be queried for equivalent hashes whenever a miss is found in the local server. If the server returns a match, it is merged into the local database. In order to keep the get stream queries as fast as possible since they are the critical path when bitbake is preparing the run queue, missing tasks provided by the server are not immediately pulled from the upstream server, but instead are put into a queue to be backfilled by a worker task later. (Bitbake rev: e6d6c0b39393e9bdf378c1eba141f815e26b724b) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r--bitbake/lib/hashserv/server.py149
1 files changed, 119 insertions, 30 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 81050715ea..3ff4c51ccb 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -3,7 +3,7 @@
3# SPDX-License-Identifier: GPL-2.0-only 3# SPDX-License-Identifier: GPL-2.0-only
4# 4#
5 5
6from contextlib import closing 6from contextlib import closing, contextmanager
7from datetime import datetime 7from datetime import datetime
8import asyncio 8import asyncio
9import json 9import json
@@ -12,8 +12,9 @@ import math
12import os 12import os
13import signal 13import signal
14import socket 14import socket
15import sys
15import time 16import time
16from . import chunkify, DEFAULT_MAX_CHUNK 17from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS
17 18
18logger = logging.getLogger('hashserv.server') 19logger = logging.getLogger('hashserv.server')
19 20
@@ -111,16 +112,40 @@ class Stats(object):
111class ClientError(Exception): 112class ClientError(Exception):
112 pass 113 pass
113 114
115def insert_task(cursor, data, ignore=False):
116 keys = sorted(data.keys())
117 query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % (
118 " OR IGNORE" if ignore else "",
119 ', '.join(keys),
120 ', '.join(':' + k for k in keys))
121 cursor.execute(query, data)
122
123async def copy_from_upstream(client, db, method, taskhash):
124 d = await client.get_taskhash(method, taskhash, True)
125 if d is not None:
126 # Filter out unknown columns
127 d = {k: v for k, v in d.items() if k in TABLE_COLUMNS}
128 keys = sorted(d.keys())
129
130
131 with closing(db.cursor()) as cursor:
132 insert_task(cursor, d)
133 db.commit()
134
135 return d
136
114class ServerClient(object): 137class ServerClient(object):
115 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' 138 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
116 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' 139 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'
117 140
118 def __init__(self, reader, writer, db, request_stats): 141 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream):
119 self.reader = reader 142 self.reader = reader
120 self.writer = writer 143 self.writer = writer
121 self.db = db 144 self.db = db
122 self.request_stats = request_stats 145 self.request_stats = request_stats
123 self.max_chunk = DEFAULT_MAX_CHUNK 146 self.max_chunk = DEFAULT_MAX_CHUNK
147 self.backfill_queue = backfill_queue
148 self.upstream = upstream
124 149
125 self.handlers = { 150 self.handlers = {
126 'get': self.handle_get, 151 'get': self.handle_get,
@@ -130,10 +155,18 @@ class ServerClient(object):
130 'get-stats': self.handle_get_stats, 155 'get-stats': self.handle_get_stats,
131 'reset-stats': self.handle_reset_stats, 156 'reset-stats': self.handle_reset_stats,
132 'chunk-stream': self.handle_chunk, 157 'chunk-stream': self.handle_chunk,
158 'backfill-wait': self.handle_backfill_wait,
133 } 159 }
134 160
135 async def process_requests(self): 161 async def process_requests(self):
162 if self.upstream is not None:
163 self.upstream_client = await create_async_client(self.upstream)
164 else:
165 self.upstream_client = None
166
136 try: 167 try:
168
169
137 self.addr = self.writer.get_extra_info('peername') 170 self.addr = self.writer.get_extra_info('peername')
138 logger.debug('Client %r connected' % (self.addr,)) 171 logger.debug('Client %r connected' % (self.addr,))
139 172
@@ -171,6 +204,9 @@ class ServerClient(object):
171 except ClientError as e: 204 except ClientError as e:
172 logger.error(str(e)) 205 logger.error(str(e))
173 finally: 206 finally:
207 if self.upstream_client is not None:
208 await self.upstream_client.close()
209
174 self.writer.close() 210 self.writer.close()
175 211
176 async def dispatch_message(self, msg): 212 async def dispatch_message(self, msg):
@@ -239,15 +275,19 @@ class ServerClient(object):
239 if row is not None: 275 if row is not None:
240 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 276 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
241 d = {k: row[k] for k in row.keys()} 277 d = {k: row[k] for k in row.keys()}
242 278 elif self.upstream_client is not None:
243 self.write_message(d) 279 d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash)
244 else: 280 else:
245 self.write_message(None) 281 d = None
282
283 self.write_message(d)
246 284
247 async def handle_get_stream(self, request): 285 async def handle_get_stream(self, request):
248 self.write_message('ok') 286 self.write_message('ok')
249 287
250 while True: 288 while True:
289 upstream = None
290
251 l = await self.reader.readline() 291 l = await self.reader.readline()
252 if not l: 292 if not l:
253 return 293 return
@@ -272,6 +312,12 @@ class ServerClient(object):
272 if row is not None: 312 if row is not None:
273 msg = ('%s\n' % row['unihash']).encode('utf-8') 313 msg = ('%s\n' % row['unihash']).encode('utf-8')
274 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 314 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
315 elif self.upstream_client is not None:
316 upstream = await self.upstream_client.get_unihash(method, taskhash)
317 if upstream:
318 msg = ("%s\n" % upstream).encode("utf-8")
319 else:
320 msg = "\n".encode("utf-8")
275 else: 321 else:
276 msg = '\n'.encode('utf-8') 322 msg = '\n'.encode('utf-8')
277 323
@@ -282,6 +328,11 @@ class ServerClient(object):
282 328
283 await self.writer.drain() 329 await self.writer.drain()
284 330
331 # Post to the backfill queue after writing the result to minimize
332 # the turn around time on a request
333 if upstream is not None:
334 await self.backfill_queue.put((method, taskhash))
335
285 async def handle_report(self, data): 336 async def handle_report(self, data):
286 with closing(self.db.cursor()) as cursor: 337 with closing(self.db.cursor()) as cursor:
287 cursor.execute(''' 338 cursor.execute('''
@@ -324,11 +375,7 @@ class ServerClient(object):
324 if k in data: 375 if k in data:
325 insert_data[k] = data[k] 376 insert_data[k] = data[k]
326 377
327 cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % ( 378 insert_task(cursor, insert_data)
328 ', '.join(sorted(insert_data.keys())),
329 ', '.join(':' + k for k in sorted(insert_data.keys()))),
330 insert_data)
331
332 self.db.commit() 379 self.db.commit()
333 380
334 logger.info('Adding taskhash %s with unihash %s', 381 logger.info('Adding taskhash %s with unihash %s',
@@ -358,11 +405,7 @@ class ServerClient(object):
358 if k in data: 405 if k in data:
359 insert_data[k] = data[k] 406 insert_data[k] = data[k]
360 407
361 cursor.execute('''INSERT OR IGNORE INTO tasks_v2 (%s) VALUES (%s)''' % ( 408 insert_task(cursor, insert_data, ignore=True)
362 ', '.join(sorted(insert_data.keys())),
363 ', '.join(':' + k for k in sorted(insert_data.keys()))),
364 insert_data)
365
366 self.db.commit() 409 self.db.commit()
367 410
368 # Fetch the unihash that will be reported for the taskhash. If the 411 # Fetch the unihash that will be reported for the taskhash. If the
@@ -394,6 +437,13 @@ class ServerClient(object):
394 self.request_stats.reset() 437 self.request_stats.reset()
395 self.write_message(d) 438 self.write_message(d)
396 439
440 async def handle_backfill_wait(self, request):
441 d = {
442 'tasks': self.backfill_queue.qsize(),
443 }
444 await self.backfill_queue.join()
445 self.write_message(d)
446
397 def query_equivalent(self, method, taskhash, query): 447 def query_equivalent(self, method, taskhash, query):
398 # This is part of the inner loop and must be as fast as possible 448 # This is part of the inner loop and must be as fast as possible
399 try: 449 try:
@@ -405,7 +455,7 @@ class ServerClient(object):
405 455
406 456
407class Server(object): 457class Server(object):
408 def __init__(self, db, loop=None): 458 def __init__(self, db, loop=None, upstream=None):
409 self.request_stats = Stats() 459 self.request_stats = Stats()
410 self.db = db 460 self.db = db
411 461
@@ -416,6 +466,8 @@ class Server(object):
416 self.loop = loop 466 self.loop = loop
417 self.close_loop = False 467 self.close_loop = False
418 468
469 self.upstream = upstream
470
419 self._cleanup_socket = None 471 self._cleanup_socket = None
420 472
421 def start_tcp_server(self, host, port): 473 def start_tcp_server(self, host, port):
@@ -458,7 +510,7 @@ class Server(object):
458 async def handle_client(self, reader, writer): 510 async def handle_client(self, reader, writer):
459 # writer.transport.set_write_buffer_limits(0) 511 # writer.transport.set_write_buffer_limits(0)
460 try: 512 try:
461 client = ServerClient(reader, writer, self.db, self.request_stats) 513 client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream)
462 await client.process_requests() 514 await client.process_requests()
463 except Exception as e: 515 except Exception as e:
464 import traceback 516 import traceback
@@ -467,23 +519,60 @@ class Server(object):
467 writer.close() 519 writer.close()
468 logger.info('Client disconnected') 520 logger.info('Client disconnected')
469 521
522 @contextmanager
523 def _backfill_worker(self):
524 async def backfill_worker_task():
525 client = await create_async_client(self.upstream)
526 try:
527 while True:
528 item = await self.backfill_queue.get()
529 if item is None:
530 self.backfill_queue.task_done()
531 break
532 method, taskhash = item
533 await copy_from_upstream(client, self.db, method, taskhash)
534 self.backfill_queue.task_done()
535 finally:
536 await client.close()
537
538 async def join_worker(worker):
539 await self.backfill_queue.put(None)
540 await worker
541
542 if self.upstream is not None:
543 worker = asyncio.ensure_future(backfill_worker_task())
544 try:
545 yield
546 finally:
547 self.loop.run_until_complete(join_worker(worker))
548 else:
549 yield
550
470 def serve_forever(self): 551 def serve_forever(self):
471 def signal_handler(): 552 def signal_handler():
472 self.loop.stop() 553 self.loop.stop()
473 554
474 self.loop.add_signal_handler(signal.SIGTERM, signal_handler) 555 asyncio.set_event_loop(self.loop)
475
476 try: 556 try:
477 self.loop.run_forever() 557 self.backfill_queue = asyncio.Queue()
478 except KeyboardInterrupt: 558
479 pass 559 self.loop.add_signal_handler(signal.SIGTERM, signal_handler)
480 560
481 self.server.close() 561 with self._backfill_worker():
482 self.loop.run_until_complete(self.server.wait_closed()) 562 try:
483 logger.info('Server shutting down') 563 self.loop.run_forever()
564 except KeyboardInterrupt:
565 pass
484 566
485 if self.close_loop: 567 self.server.close()
486 self.loop.close() 568
569 self.loop.run_until_complete(self.server.wait_closed())
570 logger.info('Server shutting down')
571 finally:
572 if self.close_loop:
573 if sys.version_info >= (3, 6):
574 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
575 self.loop.close()
487 576
488 if self._cleanup_socket is not None: 577 if self._cleanup_socket is not None:
489 self._cleanup_socket() 578 self._cleanup_socket()