summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r--bitbake/lib/hashserv/server.py116
1 files changed, 50 insertions, 66 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 45bf476bfe..13b754805b 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -165,8 +165,8 @@ class ServerCursor(object):
165 165
166 166
167class ServerClient(bb.asyncrpc.AsyncServerConnection): 167class ServerClient(bb.asyncrpc.AsyncServerConnection):
168 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): 168 def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only):
169 super().__init__(reader, writer, 'OEHASHEQUIV', logger) 169 super().__init__(socket, 'OEHASHEQUIV', logger)
170 self.db = db 170 self.db = db
171 self.request_stats = request_stats 171 self.request_stats = request_stats
172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
@@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
209 if k in msg: 209 if k in msg:
210 logger.debug('Handling %s' % k) 210 logger.debug('Handling %s' % k)
211 if 'stream' in k: 211 if 'stream' in k:
212 await self.handlers[k](msg[k]) 212 return await self.handlers[k](msg[k])
213 else: 213 else:
214 with self.request_stats.start_sample() as self.request_sample, \ 214 with self.request_stats.start_sample() as self.request_sample, \
215 self.request_sample.measure(): 215 self.request_sample.measure():
216 await self.handlers[k](msg[k]) 216 return await self.handlers[k](msg[k])
217 return
218 217
219 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 218 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
220 219
@@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
224 fetch_all = request.get('all', False) 223 fetch_all = request.get('all', False)
225 224
226 with closing(self.db.cursor()) as cursor: 225 with closing(self.db.cursor()) as cursor:
227 d = await self.get_unihash(cursor, method, taskhash, fetch_all) 226 return await self.get_unihash(cursor, method, taskhash, fetch_all)
228
229 self.write_message(d)
230 227
231 async def get_unihash(self, cursor, method, taskhash, fetch_all=False): 228 async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
232 d = None 229 d = None
@@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
274 with_unihash = request.get("with_unihash", True) 271 with_unihash = request.get("with_unihash", True)
275 272
276 with closing(self.db.cursor()) as cursor: 273 with closing(self.db.cursor()) as cursor:
277 d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) 274 return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash)
278
279 self.write_message(d)
280 275
281 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): 276 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True):
282 d = None 277 d = None
@@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
334 ) 329 )
335 330
336 async def handle_get_stream(self, request): 331 async def handle_get_stream(self, request):
337 self.write_message('ok') 332 await self.socket.send_message("ok")
338 333
339 while True: 334 while True:
340 upstream = None 335 upstream = None
341 336
342 l = await self.reader.readline() 337 l = await self.socket.recv()
343 if not l: 338 if not l:
344 return 339 break
345 340
346 try: 341 try:
347 # This inner loop is very sensitive and must be as fast as 342 # This inner loop is very sensitive and must be as fast as
@@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
352 request_measure = self.request_sample.measure() 347 request_measure = self.request_sample.measure()
353 request_measure.start() 348 request_measure.start()
354 349
355 l = l.decode('utf-8').rstrip()
356 if l == 'END': 350 if l == 'END':
357 self.writer.write('ok\n'.encode('utf-8')) 351 break
358 return
359 352
360 (method, taskhash) = l.split() 353 (method, taskhash) = l.split()
361 #logger.debug('Looking up %s %s' % (method, taskhash)) 354 #logger.debug('Looking up %s %s' % (method, taskhash))
@@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
366 cursor.close() 359 cursor.close()
367 360
368 if row is not None: 361 if row is not None:
369 msg = ('%s\n' % row['unihash']).encode('utf-8') 362 msg = row['unihash']
370 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 363 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
371 elif self.upstream_client is not None: 364 elif self.upstream_client is not None:
372 upstream = await self.upstream_client.get_unihash(method, taskhash) 365 upstream = await self.upstream_client.get_unihash(method, taskhash)
373 if upstream: 366 if upstream:
374 msg = ("%s\n" % upstream).encode("utf-8") 367 msg = upstream
375 else: 368 else:
376 msg = "\n".encode("utf-8") 369 msg = ""
377 else: 370 else:
378 msg = '\n'.encode('utf-8') 371 msg = ""
379 372
380 self.writer.write(msg) 373 await self.socket.send(msg)
381 finally: 374 finally:
382 request_measure.end() 375 request_measure.end()
383 self.request_sample.end() 376 self.request_sample.end()
384 377
385 await self.writer.drain()
386
387 # Post to the backfill queue after writing the result to minimize 378 # Post to the backfill queue after writing the result to minimize
388 # the turn around time on a request 379 # the turn around time on a request
389 if upstream is not None: 380 if upstream is not None:
390 await self.backfill_queue.put((method, taskhash)) 381 await self.backfill_queue.put((method, taskhash))
391 382
383 await self.socket.send("ok")
384 return self.NO_RESPONSE
385
392 async def handle_report(self, data): 386 async def handle_report(self, data):
393 with closing(self.db.cursor()) as cursor: 387 with closing(self.db.cursor()) as cursor:
394 outhash_data = { 388 outhash_data = {
@@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
468 'unihash': unihash, 462 'unihash': unihash,
469 } 463 }
470 464
471 self.write_message(d) 465 return d
472 466
473 async def handle_equivreport(self, data): 467 async def handle_equivreport(self, data):
474 with closing(self.db.cursor()) as cursor: 468 with closing(self.db.cursor()) as cursor:
@@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
491 485
492 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 486 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
493 487
494 self.write_message(d) 488 return d
495 489
496 490
497 async def handle_get_stats(self, request): 491 async def handle_get_stats(self, request):
498 d = { 492 return {
499 'requests': self.request_stats.todict(), 493 'requests': self.request_stats.todict(),
500 } 494 }
501 495
502 self.write_message(d)
503
504 async def handle_reset_stats(self, request): 496 async def handle_reset_stats(self, request):
505 d = { 497 d = {
506 'requests': self.request_stats.todict(), 498 'requests': self.request_stats.todict(),
507 } 499 }
508 500
509 self.request_stats.reset() 501 self.request_stats.reset()
510 self.write_message(d) 502 return d
511 503
512 async def handle_backfill_wait(self, request): 504 async def handle_backfill_wait(self, request):
513 d = { 505 d = {
514 'tasks': self.backfill_queue.qsize(), 506 'tasks': self.backfill_queue.qsize(),
515 } 507 }
516 await self.backfill_queue.join() 508 await self.backfill_queue.join()
517 self.write_message(d) 509 return d
518 510
519 async def handle_remove(self, request): 511 async def handle_remove(self, request):
520 condition = request["where"] 512 condition = request["where"]
@@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
541 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) 533 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
542 self.db.commit() 534 self.db.commit()
543 535
544 self.write_message({"count": count}) 536 return {"count": count}
545 537
546 async def handle_clean_unused(self, request): 538 async def handle_clean_unused(self, request):
547 max_age = request["max_age_seconds"] 539 max_age = request["max_age_seconds"]
@@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
558 ) 550 )
559 count = cursor.rowcount 551 count = cursor.rowcount
560 552
561 self.write_message({"count": count}) 553 return {"count": count}
562 554
563 def query_equivalent(self, cursor, method, taskhash): 555 def query_equivalent(self, cursor, method, taskhash):
564 # This is part of the inner loop and must be as fast as possible 556 # This is part of the inner loop and must be as fast as possible
@@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer):
583 self.db = db 575 self.db = db
584 self.upstream = upstream 576 self.upstream = upstream
585 self.read_only = read_only 577 self.read_only = read_only
578 self.backfill_queue = None
586 579
587 def accept_client(self, reader, writer): 580 def accept_client(self, socket):
588 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) 581 return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
589 582
590 @contextmanager 583 async def backfill_worker_task(self):
591 def _backfill_worker(self): 584 client = await create_async_client(self.upstream)
592 async def backfill_worker_task(): 585 try:
593 client = await create_async_client(self.upstream) 586 while True:
594 try: 587 item = await self.backfill_queue.get()
595 while True: 588 if item is None:
596 item = await self.backfill_queue.get()
597 if item is None:
598 self.backfill_queue.task_done()
599 break
600 method, taskhash = item
601 await copy_unihash_from_upstream(client, self.db, method, taskhash)
602 self.backfill_queue.task_done() 589 self.backfill_queue.task_done()
603 finally: 590 break
604 await client.close() 591 method, taskhash = item
592 await copy_unihash_from_upstream(client, self.db, method, taskhash)
593 self.backfill_queue.task_done()
594 finally:
595 await client.close()
605 596
606 async def join_worker(worker): 597 def start(self):
598 tasks = super().start()
599 if self.upstream:
600 self.backfill_queue = asyncio.Queue()
601 tasks += [self.backfill_worker_task()]
602 return tasks
603
604 async def stop(self):
605 if self.backfill_queue is not None:
607 await self.backfill_queue.put(None) 606 await self.backfill_queue.put(None)
608 await worker 607 await super().stop()
609
610 if self.upstream is not None:
611 worker = asyncio.ensure_future(backfill_worker_task())
612 try:
613 yield
614 finally:
615 self.loop.run_until_complete(join_worker(worker))
616 else:
617 yield
618
619 def run_loop_forever(self):
620 self.backfill_queue = asyncio.Queue()
621
622 with self._backfill_worker():
623 super().run_loop_forever()