diff options
Diffstat (limited to 'bitbake/lib/hashserv/server.py')
-rw-r--r-- | bitbake/lib/hashserv/server.py | 116 |
1 files changed, 50 insertions, 66 deletions
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 45bf476bfe..13b754805b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -165,8 +165,8 @@ class ServerCursor(object): | |||
165 | 165 | ||
166 | 166 | ||
167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): |
169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(socket, 'OEHASHEQUIV', logger) |
170 | self.db = db | 170 | self.db = db |
171 | self.request_stats = request_stats | 171 | self.request_stats = request_stats |
172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
@@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
209 | if k in msg: | 209 | if k in msg: |
210 | logger.debug('Handling %s' % k) | 210 | logger.debug('Handling %s' % k) |
211 | if 'stream' in k: | 211 | if 'stream' in k: |
212 | await self.handlers[k](msg[k]) | 212 | return await self.handlers[k](msg[k]) |
213 | else: | 213 | else: |
214 | with self.request_stats.start_sample() as self.request_sample, \ | 214 | with self.request_stats.start_sample() as self.request_sample, \ |
215 | self.request_sample.measure(): | 215 | self.request_sample.measure(): |
216 | await self.handlers[k](msg[k]) | 216 | return await self.handlers[k](msg[k]) |
217 | return | ||
218 | 217 | ||
219 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
220 | 219 | ||
@@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
224 | fetch_all = request.get('all', False) | 223 | fetch_all = request.get('all', False) |
225 | 224 | ||
226 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
227 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) | 226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) |
228 | |||
229 | self.write_message(d) | ||
230 | 227 | ||
231 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
232 | d = None | 229 | d = None |
@@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
274 | with_unihash = request.get("with_unihash", True) | 271 | with_unihash = request.get("with_unihash", True) |
275 | 272 | ||
276 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
277 | d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | 274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) |
278 | |||
279 | self.write_message(d) | ||
280 | 275 | ||
281 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): |
282 | d = None | 277 | d = None |
@@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
334 | ) | 329 | ) |
335 | 330 | ||
336 | async def handle_get_stream(self, request): | 331 | async def handle_get_stream(self, request): |
337 | self.write_message('ok') | 332 | await self.socket.send_message("ok") |
338 | 333 | ||
339 | while True: | 334 | while True: |
340 | upstream = None | 335 | upstream = None |
341 | 336 | ||
342 | l = await self.reader.readline() | 337 | l = await self.socket.recv() |
343 | if not l: | 338 | if not l: |
344 | return | 339 | break |
345 | 340 | ||
346 | try: | 341 | try: |
347 | # This inner loop is very sensitive and must be as fast as | 342 | # This inner loop is very sensitive and must be as fast as |
@@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
352 | request_measure = self.request_sample.measure() | 347 | request_measure = self.request_sample.measure() |
353 | request_measure.start() | 348 | request_measure.start() |
354 | 349 | ||
355 | l = l.decode('utf-8').rstrip() | ||
356 | if l == 'END': | 350 | if l == 'END': |
357 | self.writer.write('ok\n'.encode('utf-8')) | 351 | break |
358 | return | ||
359 | 352 | ||
360 | (method, taskhash) = l.split() | 353 | (method, taskhash) = l.split() |
361 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 354 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
@@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
366 | cursor.close() | 359 | cursor.close() |
367 | 360 | ||
368 | if row is not None: | 361 | if row is not None: |
369 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 362 | msg = row['unihash'] |
370 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 363 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
371 | elif self.upstream_client is not None: | 364 | elif self.upstream_client is not None: |
372 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 365 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
373 | if upstream: | 366 | if upstream: |
374 | msg = ("%s\n" % upstream).encode("utf-8") | 367 | msg = upstream |
375 | else: | 368 | else: |
376 | msg = "\n".encode("utf-8") | 369 | msg = "" |
377 | else: | 370 | else: |
378 | msg = '\n'.encode('utf-8') | 371 | msg = "" |
379 | 372 | ||
380 | self.writer.write(msg) | 373 | await self.socket.send(msg) |
381 | finally: | 374 | finally: |
382 | request_measure.end() | 375 | request_measure.end() |
383 | self.request_sample.end() | 376 | self.request_sample.end() |
384 | 377 | ||
385 | await self.writer.drain() | ||
386 | |||
387 | # Post to the backfill queue after writing the result to minimize | 378 | # Post to the backfill queue after writing the result to minimize |
388 | # the turn around time on a request | 379 | # the turn around time on a request |
389 | if upstream is not None: | 380 | if upstream is not None: |
390 | await self.backfill_queue.put((method, taskhash)) | 381 | await self.backfill_queue.put((method, taskhash)) |
391 | 382 | ||
383 | await self.socket.send("ok") | ||
384 | return self.NO_RESPONSE | ||
385 | |||
392 | async def handle_report(self, data): | 386 | async def handle_report(self, data): |
393 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
394 | outhash_data = { | 388 | outhash_data = { |
@@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
468 | 'unihash': unihash, | 462 | 'unihash': unihash, |
469 | } | 463 | } |
470 | 464 | ||
471 | self.write_message(d) | 465 | return d |
472 | 466 | ||
473 | async def handle_equivreport(self, data): | 467 | async def handle_equivreport(self, data): |
474 | with closing(self.db.cursor()) as cursor: | 468 | with closing(self.db.cursor()) as cursor: |
@@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
491 | 485 | ||
492 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | 486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} |
493 | 487 | ||
494 | self.write_message(d) | 488 | return d |
495 | 489 | ||
496 | 490 | ||
497 | async def handle_get_stats(self, request): | 491 | async def handle_get_stats(self, request): |
498 | d = { | 492 | return { |
499 | 'requests': self.request_stats.todict(), | 493 | 'requests': self.request_stats.todict(), |
500 | } | 494 | } |
501 | 495 | ||
502 | self.write_message(d) | ||
503 | |||
504 | async def handle_reset_stats(self, request): | 496 | async def handle_reset_stats(self, request): |
505 | d = { | 497 | d = { |
506 | 'requests': self.request_stats.todict(), | 498 | 'requests': self.request_stats.todict(), |
507 | } | 499 | } |
508 | 500 | ||
509 | self.request_stats.reset() | 501 | self.request_stats.reset() |
510 | self.write_message(d) | 502 | return d |
511 | 503 | ||
512 | async def handle_backfill_wait(self, request): | 504 | async def handle_backfill_wait(self, request): |
513 | d = { | 505 | d = { |
514 | 'tasks': self.backfill_queue.qsize(), | 506 | 'tasks': self.backfill_queue.qsize(), |
515 | } | 507 | } |
516 | await self.backfill_queue.join() | 508 | await self.backfill_queue.join() |
517 | self.write_message(d) | 509 | return d |
518 | 510 | ||
519 | async def handle_remove(self, request): | 511 | async def handle_remove(self, request): |
520 | condition = request["where"] | 512 | condition = request["where"] |
@@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
541 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) |
542 | self.db.commit() | 534 | self.db.commit() |
543 | 535 | ||
544 | self.write_message({"count": count}) | 536 | return {"count": count} |
545 | 537 | ||
546 | async def handle_clean_unused(self, request): | 538 | async def handle_clean_unused(self, request): |
547 | max_age = request["max_age_seconds"] | 539 | max_age = request["max_age_seconds"] |
@@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
558 | ) | 550 | ) |
559 | count = cursor.rowcount | 551 | count = cursor.rowcount |
560 | 552 | ||
561 | self.write_message({"count": count}) | 553 | return {"count": count} |
562 | 554 | ||
563 | def query_equivalent(self, cursor, method, taskhash): | 555 | def query_equivalent(self, cursor, method, taskhash): |
564 | # This is part of the inner loop and must be as fast as possible | 556 | # This is part of the inner loop and must be as fast as possible |
@@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer): | |||
583 | self.db = db | 575 | self.db = db |
584 | self.upstream = upstream | 576 | self.upstream = upstream |
585 | self.read_only = read_only | 577 | self.read_only = read_only |
578 | self.backfill_queue = None | ||
586 | 579 | ||
587 | def accept_client(self, reader, writer): | 580 | def accept_client(self, socket): |
588 | return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) |
589 | 582 | ||
590 | @contextmanager | 583 | async def backfill_worker_task(self): |
591 | def _backfill_worker(self): | 584 | client = await create_async_client(self.upstream) |
592 | async def backfill_worker_task(): | 585 | try: |
593 | client = await create_async_client(self.upstream) | 586 | while True: |
594 | try: | 587 | item = await self.backfill_queue.get() |
595 | while True: | 588 | if item is None: |
596 | item = await self.backfill_queue.get() | ||
597 | if item is None: | ||
598 | self.backfill_queue.task_done() | ||
599 | break | ||
600 | method, taskhash = item | ||
601 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
602 | self.backfill_queue.task_done() | 589 | self.backfill_queue.task_done() |
603 | finally: | 590 | break |
604 | await client.close() | 591 | method, taskhash = item |
592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
593 | self.backfill_queue.task_done() | ||
594 | finally: | ||
595 | await client.close() | ||
605 | 596 | ||
606 | async def join_worker(worker): | 597 | def start(self): |
598 | tasks = super().start() | ||
599 | if self.upstream: | ||
600 | self.backfill_queue = asyncio.Queue() | ||
601 | tasks += [self.backfill_worker_task()] | ||
602 | return tasks | ||
603 | |||
604 | async def stop(self): | ||
605 | if self.backfill_queue is not None: | ||
607 | await self.backfill_queue.put(None) | 606 | await self.backfill_queue.put(None) |
608 | await worker | 607 | await super().stop() |
609 | |||
610 | if self.upstream is not None: | ||
611 | worker = asyncio.ensure_future(backfill_worker_task()) | ||
612 | try: | ||
613 | yield | ||
614 | finally: | ||
615 | self.loop.run_until_complete(join_worker(worker)) | ||
616 | else: | ||
617 | yield | ||
618 | |||
619 | def run_loop_forever(self): | ||
620 | self.backfill_queue = asyncio.Queue() | ||
621 | |||
622 | with self._backfill_worker(): | ||
623 | super().run_loop_forever() | ||