summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/server.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:19 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:33:02 +0000
commit8f8501ed403dec27acbe780b936bc087fc5006d0 (patch)
tree60e6415075c7c71eacec23ca7dda53e4a324b12e /bitbake/lib/hashserv/server.py
parentf97b686884166dd77d1818e70615027c6ba8c348 (diff)
downloadpoky-8f8501ed403dec27acbe780b936bc087fc5006d0.tar.gz
bitbake: asyncrpc: Abstract sockets
Rewrites the asyncrpc client and server code to make it possible to have other transport backends that are not stream based (e.g. websockets which are message based). The connection handling classes are now shared between both the client and server to make it easier to implement new transport mechanisms (Bitbake rev: 2aaeae53696e4c2f13a169830c3b7089cbad6eca) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r--bitbake/lib/hashserv/server.py116
1 files changed, 50 insertions, 66 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 45bf476bfe..13b754805b 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -165,8 +165,8 @@ class ServerCursor(object):
165 165
166 166
167class ServerClient(bb.asyncrpc.AsyncServerConnection): 167class ServerClient(bb.asyncrpc.AsyncServerConnection):
168 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): 168 def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only):
169 super().__init__(reader, writer, 'OEHASHEQUIV', logger) 169 super().__init__(socket, 'OEHASHEQUIV', logger)
170 self.db = db 170 self.db = db
171 self.request_stats = request_stats 171 self.request_stats = request_stats
172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
@@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
209 if k in msg: 209 if k in msg:
210 logger.debug('Handling %s' % k) 210 logger.debug('Handling %s' % k)
211 if 'stream' in k: 211 if 'stream' in k:
212 await self.handlers[k](msg[k]) 212 return await self.handlers[k](msg[k])
213 else: 213 else:
214 with self.request_stats.start_sample() as self.request_sample, \ 214 with self.request_stats.start_sample() as self.request_sample, \
215 self.request_sample.measure(): 215 self.request_sample.measure():
216 await self.handlers[k](msg[k]) 216 return await self.handlers[k](msg[k])
217 return
218 217
219 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 218 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
220 219
@@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
224 fetch_all = request.get('all', False) 223 fetch_all = request.get('all', False)
225 224
226 with closing(self.db.cursor()) as cursor: 225 with closing(self.db.cursor()) as cursor:
227 d = await self.get_unihash(cursor, method, taskhash, fetch_all) 226 return await self.get_unihash(cursor, method, taskhash, fetch_all)
228
229 self.write_message(d)
230 227
231 async def get_unihash(self, cursor, method, taskhash, fetch_all=False): 228 async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
232 d = None 229 d = None
@@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
274 with_unihash = request.get("with_unihash", True) 271 with_unihash = request.get("with_unihash", True)
275 272
276 with closing(self.db.cursor()) as cursor: 273 with closing(self.db.cursor()) as cursor:
277 d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) 274 return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash)
278
279 self.write_message(d)
280 275
281 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): 276 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True):
282 d = None 277 d = None
@@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
334 ) 329 )
335 330
336 async def handle_get_stream(self, request): 331 async def handle_get_stream(self, request):
337 self.write_message('ok') 332 await self.socket.send_message("ok")
338 333
339 while True: 334 while True:
340 upstream = None 335 upstream = None
341 336
342 l = await self.reader.readline() 337 l = await self.socket.recv()
343 if not l: 338 if not l:
344 return 339 break
345 340
346 try: 341 try:
347 # This inner loop is very sensitive and must be as fast as 342 # This inner loop is very sensitive and must be as fast as
@@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
352 request_measure = self.request_sample.measure() 347 request_measure = self.request_sample.measure()
353 request_measure.start() 348 request_measure.start()
354 349
355 l = l.decode('utf-8').rstrip()
356 if l == 'END': 350 if l == 'END':
357 self.writer.write('ok\n'.encode('utf-8')) 351 break
358 return
359 352
360 (method, taskhash) = l.split() 353 (method, taskhash) = l.split()
361 #logger.debug('Looking up %s %s' % (method, taskhash)) 354 #logger.debug('Looking up %s %s' % (method, taskhash))
@@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
366 cursor.close() 359 cursor.close()
367 360
368 if row is not None: 361 if row is not None:
369 msg = ('%s\n' % row['unihash']).encode('utf-8') 362 msg = row['unihash']
370 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 363 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
371 elif self.upstream_client is not None: 364 elif self.upstream_client is not None:
372 upstream = await self.upstream_client.get_unihash(method, taskhash) 365 upstream = await self.upstream_client.get_unihash(method, taskhash)
373 if upstream: 366 if upstream:
374 msg = ("%s\n" % upstream).encode("utf-8") 367 msg = upstream
375 else: 368 else:
376 msg = "\n".encode("utf-8") 369 msg = ""
377 else: 370 else:
378 msg = '\n'.encode('utf-8') 371 msg = ""
379 372
380 self.writer.write(msg) 373 await self.socket.send(msg)
381 finally: 374 finally:
382 request_measure.end() 375 request_measure.end()
383 self.request_sample.end() 376 self.request_sample.end()
384 377
385 await self.writer.drain()
386
387 # Post to the backfill queue after writing the result to minimize 378 # Post to the backfill queue after writing the result to minimize
388 # the turn around time on a request 379 # the turn around time on a request
389 if upstream is not None: 380 if upstream is not None:
390 await self.backfill_queue.put((method, taskhash)) 381 await self.backfill_queue.put((method, taskhash))
391 382
383 await self.socket.send("ok")
384 return self.NO_RESPONSE
385
392 async def handle_report(self, data): 386 async def handle_report(self, data):
393 with closing(self.db.cursor()) as cursor: 387 with closing(self.db.cursor()) as cursor:
394 outhash_data = { 388 outhash_data = {
@@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
468 'unihash': unihash, 462 'unihash': unihash,
469 } 463 }
470 464
471 self.write_message(d) 465 return d
472 466
473 async def handle_equivreport(self, data): 467 async def handle_equivreport(self, data):
474 with closing(self.db.cursor()) as cursor: 468 with closing(self.db.cursor()) as cursor:
@@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
491 485
492 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 486 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
493 487
494 self.write_message(d) 488 return d
495 489
496 490
497 async def handle_get_stats(self, request): 491 async def handle_get_stats(self, request):
498 d = { 492 return {
499 'requests': self.request_stats.todict(), 493 'requests': self.request_stats.todict(),
500 } 494 }
501 495
502 self.write_message(d)
503
504 async def handle_reset_stats(self, request): 496 async def handle_reset_stats(self, request):
505 d = { 497 d = {
506 'requests': self.request_stats.todict(), 498 'requests': self.request_stats.todict(),
507 } 499 }
508 500
509 self.request_stats.reset() 501 self.request_stats.reset()
510 self.write_message(d) 502 return d
511 503
512 async def handle_backfill_wait(self, request): 504 async def handle_backfill_wait(self, request):
513 d = { 505 d = {
514 'tasks': self.backfill_queue.qsize(), 506 'tasks': self.backfill_queue.qsize(),
515 } 507 }
516 await self.backfill_queue.join() 508 await self.backfill_queue.join()
517 self.write_message(d) 509 return d
518 510
519 async def handle_remove(self, request): 511 async def handle_remove(self, request):
520 condition = request["where"] 512 condition = request["where"]
@@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
541 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) 533 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
542 self.db.commit() 534 self.db.commit()
543 535
544 self.write_message({"count": count}) 536 return {"count": count}
545 537
546 async def handle_clean_unused(self, request): 538 async def handle_clean_unused(self, request):
547 max_age = request["max_age_seconds"] 539 max_age = request["max_age_seconds"]
@@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
558 ) 550 )
559 count = cursor.rowcount 551 count = cursor.rowcount
560 552
561 self.write_message({"count": count}) 553 return {"count": count}
562 554
563 def query_equivalent(self, cursor, method, taskhash): 555 def query_equivalent(self, cursor, method, taskhash):
564 # This is part of the inner loop and must be as fast as possible 556 # This is part of the inner loop and must be as fast as possible
@@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer):
583 self.db = db 575 self.db = db
584 self.upstream = upstream 576 self.upstream = upstream
585 self.read_only = read_only 577 self.read_only = read_only
578 self.backfill_queue = None
586 579
587 def accept_client(self, reader, writer): 580 def accept_client(self, socket):
588 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) 581 return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
589 582
590 @contextmanager 583 async def backfill_worker_task(self):
591 def _backfill_worker(self): 584 client = await create_async_client(self.upstream)
592 async def backfill_worker_task(): 585 try:
593 client = await create_async_client(self.upstream) 586 while True:
594 try: 587 item = await self.backfill_queue.get()
595 while True: 588 if item is None:
596 item = await self.backfill_queue.get()
597 if item is None:
598 self.backfill_queue.task_done()
599 break
600 method, taskhash = item
601 await copy_unihash_from_upstream(client, self.db, method, taskhash)
602 self.backfill_queue.task_done() 589 self.backfill_queue.task_done()
603 finally: 590 break
604 await client.close() 591 method, taskhash = item
592 await copy_unihash_from_upstream(client, self.db, method, taskhash)
593 self.backfill_queue.task_done()
594 finally:
595 await client.close()
605 596
606 async def join_worker(worker): 597 def start(self):
598 tasks = super().start()
599 if self.upstream:
600 self.backfill_queue = asyncio.Queue()
601 tasks += [self.backfill_worker_task()]
602 return tasks
603
604 async def stop(self):
605 if self.backfill_queue is not None:
607 await self.backfill_queue.put(None) 606 await self.backfill_queue.put(None)
608 await worker 607 await super().stop()
609
610 if self.upstream is not None:
611 worker = asyncio.ensure_future(backfill_worker_task())
612 try:
613 yield
614 finally:
615 self.loop.run_until_complete(join_worker(worker))
616 else:
617 yield
618
619 def run_loop_forever(self):
620 self.backfill_queue = asyncio.Queue()
621
622 with self._backfill_worker():
623 super().run_loop_forever()