summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv')
-rw-r--r--bitbake/lib/hashserv/__init__.py21
-rw-r--r--bitbake/lib/hashserv/client.py38
-rw-r--r--bitbake/lib/hashserv/server.py116
3 files changed, 69 insertions, 106 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 9cb3fd57a5..3a4018353f 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://"
15ADDR_TYPE_UNIX = 0 15ADDR_TYPE_UNIX = 0
16ADDR_TYPE_TCP = 1 16ADDR_TYPE_TCP = 1
17 17
18# The Python async server defaults to a 64K receive buffer, so we hardcode our
19# maximum chunk size. It would be better if the client and server reported to
20# each other what the maximum chunk sizes were, but that will slow down the
21# connection setup with a round trip delay so I'd rather not do that unless it
22# is necessary
23DEFAULT_MAX_CHUNK = 32 * 1024
24
25UNIHASH_TABLE_DEFINITION = ( 18UNIHASH_TABLE_DEFINITION = (
26 ("method", "TEXT NOT NULL", "UNIQUE"), 19 ("method", "TEXT NOT NULL", "UNIQUE"),
27 ("taskhash", "TEXT NOT NULL", "UNIQUE"), 20 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
@@ -102,20 +95,6 @@ def parse_address(addr):
102 return (ADDR_TYPE_TCP, (host, int(port))) 95 return (ADDR_TYPE_TCP, (host, int(port)))
103 96
104 97
105def chunkify(msg, max_chunk):
106 if len(msg) < max_chunk - 1:
107 yield ''.join((msg, "\n"))
108 else:
109 yield ''.join((json.dumps({
110 'chunk-stream': None
111 }), "\n"))
112
113 args = [iter(msg)] * (max_chunk - 1)
114 for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
115 yield ''.join(itertools.chain(m, "\n"))
116 yield "\n"
117
118
119def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): 98def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
120 from . import server 99 from . import server
121 db = setup_database(dbname, sync=sync) 100 db = setup_database(dbname, sync=sync)
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index f676d267fa..5f7d22ab13 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
28 28
29 async def send_stream(self, msg): 29 async def send_stream(self, msg):
30 async def proc(): 30 async def proc():
31 self.writer.write(("%s\n" % msg).encode("utf-8")) 31 await self.socket.send(msg)
32 await self.writer.drain() 32 return await self.socket.recv()
33 l = await self.reader.readline()
34 if not l:
35 raise ConnectionError("Connection closed")
36 return l.decode("utf-8").rstrip()
37 33
38 return await self._send_wrapper(proc) 34 return await self._send_wrapper(proc)
39 35
40 async def _set_mode(self, new_mode): 36 async def _set_mode(self, new_mode):
37 async def stream_to_normal():
38 await self.socket.send("END")
39 return await self.socket.recv()
40
41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
42 r = await self.send_stream("END") 42 r = await self._send_wrapper(stream_to_normal)
43 if r != "ok": 43 if r != "ok":
44 raise ConnectionError("Bad response from server %r" % r) 44 raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r)
45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
46 r = await self.send_message({"get-stream": None}) 46 r = await self.invoke({"get-stream": None})
47 if r != "ok": 47 if r != "ok":
48 raise ConnectionError("Bad response from server %r" % r) 48 raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r)
49 elif new_mode != self.mode: 49 elif new_mode != self.mode:
50 raise Exception( 50 raise Exception(
51 "Undefined mode transition %r -> %r" % (self.mode, new_mode) 51 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
@@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
67 m["method"] = method 67 m["method"] = method
68 m["outhash"] = outhash 68 m["outhash"] = outhash
69 m["unihash"] = unihash 69 m["unihash"] = unihash
70 return await self.send_message({"report": m}) 70 return await self.invoke({"report": m})
71 71
72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
73 await self._set_mode(self.MODE_NORMAL) 73 await self._set_mode(self.MODE_NORMAL)
@@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
75 m["taskhash"] = taskhash 75 m["taskhash"] = taskhash
76 m["method"] = method 76 m["method"] = method
77 m["unihash"] = unihash 77 m["unihash"] = unihash
78 return await self.send_message({"report-equiv": m}) 78 return await self.invoke({"report-equiv": m})
79 79
80 async def get_taskhash(self, method, taskhash, all_properties=False): 80 async def get_taskhash(self, method, taskhash, all_properties=False):
81 await self._set_mode(self.MODE_NORMAL) 81 await self._set_mode(self.MODE_NORMAL)
82 return await self.send_message( 82 return await self.invoke(
83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
84 ) 84 )
85 85
86 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 86 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
87 await self._set_mode(self.MODE_NORMAL) 87 await self._set_mode(self.MODE_NORMAL)
88 return await self.send_message( 88 return await self.invoke(
89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} 89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}}
90 ) 90 )
91 91
92 async def get_stats(self): 92 async def get_stats(self):
93 await self._set_mode(self.MODE_NORMAL) 93 await self._set_mode(self.MODE_NORMAL)
94 return await self.send_message({"get-stats": None}) 94 return await self.invoke({"get-stats": None})
95 95
96 async def reset_stats(self): 96 async def reset_stats(self):
97 await self._set_mode(self.MODE_NORMAL) 97 await self._set_mode(self.MODE_NORMAL)
98 return await self.send_message({"reset-stats": None}) 98 return await self.invoke({"reset-stats": None})
99 99
100 async def backfill_wait(self): 100 async def backfill_wait(self):
101 await self._set_mode(self.MODE_NORMAL) 101 await self._set_mode(self.MODE_NORMAL)
102 return (await self.send_message({"backfill-wait": None}))["tasks"] 102 return (await self.invoke({"backfill-wait": None}))["tasks"]
103 103
104 async def remove(self, where): 104 async def remove(self, where):
105 await self._set_mode(self.MODE_NORMAL) 105 await self._set_mode(self.MODE_NORMAL)
106 return await self.send_message({"remove": {"where": where}}) 106 return await self.invoke({"remove": {"where": where}})
107 107
108 async def clean_unused(self, max_age): 108 async def clean_unused(self, max_age):
109 await self._set_mode(self.MODE_NORMAL) 109 await self._set_mode(self.MODE_NORMAL)
110 return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) 110 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
111 111
112 112
113class Client(bb.asyncrpc.Client): 113class Client(bb.asyncrpc.Client):
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 45bf476bfe..13b754805b 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -165,8 +165,8 @@ class ServerCursor(object):
165 165
166 166
167class ServerClient(bb.asyncrpc.AsyncServerConnection): 167class ServerClient(bb.asyncrpc.AsyncServerConnection):
168 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): 168 def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only):
169 super().__init__(reader, writer, 'OEHASHEQUIV', logger) 169 super().__init__(socket, 'OEHASHEQUIV', logger)
170 self.db = db 170 self.db = db
171 self.request_stats = request_stats 171 self.request_stats = request_stats
172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
@@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
209 if k in msg: 209 if k in msg:
210 logger.debug('Handling %s' % k) 210 logger.debug('Handling %s' % k)
211 if 'stream' in k: 211 if 'stream' in k:
212 await self.handlers[k](msg[k]) 212 return await self.handlers[k](msg[k])
213 else: 213 else:
214 with self.request_stats.start_sample() as self.request_sample, \ 214 with self.request_stats.start_sample() as self.request_sample, \
215 self.request_sample.measure(): 215 self.request_sample.measure():
216 await self.handlers[k](msg[k]) 216 return await self.handlers[k](msg[k])
217 return
218 217
219 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 218 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
220 219
@@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
224 fetch_all = request.get('all', False) 223 fetch_all = request.get('all', False)
225 224
226 with closing(self.db.cursor()) as cursor: 225 with closing(self.db.cursor()) as cursor:
227 d = await self.get_unihash(cursor, method, taskhash, fetch_all) 226 return await self.get_unihash(cursor, method, taskhash, fetch_all)
228
229 self.write_message(d)
230 227
231 async def get_unihash(self, cursor, method, taskhash, fetch_all=False): 228 async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
232 d = None 229 d = None
@@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
274 with_unihash = request.get("with_unihash", True) 271 with_unihash = request.get("with_unihash", True)
275 272
276 with closing(self.db.cursor()) as cursor: 273 with closing(self.db.cursor()) as cursor:
277 d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) 274 return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash)
278
279 self.write_message(d)
280 275
281 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): 276 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True):
282 d = None 277 d = None
@@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
334 ) 329 )
335 330
336 async def handle_get_stream(self, request): 331 async def handle_get_stream(self, request):
337 self.write_message('ok') 332 await self.socket.send_message("ok")
338 333
339 while True: 334 while True:
340 upstream = None 335 upstream = None
341 336
342 l = await self.reader.readline() 337 l = await self.socket.recv()
343 if not l: 338 if not l:
344 return 339 break
345 340
346 try: 341 try:
347 # This inner loop is very sensitive and must be as fast as 342 # This inner loop is very sensitive and must be as fast as
@@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
352 request_measure = self.request_sample.measure() 347 request_measure = self.request_sample.measure()
353 request_measure.start() 348 request_measure.start()
354 349
355 l = l.decode('utf-8').rstrip()
356 if l == 'END': 350 if l == 'END':
357 self.writer.write('ok\n'.encode('utf-8')) 351 break
358 return
359 352
360 (method, taskhash) = l.split() 353 (method, taskhash) = l.split()
361 #logger.debug('Looking up %s %s' % (method, taskhash)) 354 #logger.debug('Looking up %s %s' % (method, taskhash))
@@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
366 cursor.close() 359 cursor.close()
367 360
368 if row is not None: 361 if row is not None:
369 msg = ('%s\n' % row['unihash']).encode('utf-8') 362 msg = row['unihash']
370 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 363 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
371 elif self.upstream_client is not None: 364 elif self.upstream_client is not None:
372 upstream = await self.upstream_client.get_unihash(method, taskhash) 365 upstream = await self.upstream_client.get_unihash(method, taskhash)
373 if upstream: 366 if upstream:
374 msg = ("%s\n" % upstream).encode("utf-8") 367 msg = upstream
375 else: 368 else:
376 msg = "\n".encode("utf-8") 369 msg = ""
377 else: 370 else:
378 msg = '\n'.encode('utf-8') 371 msg = ""
379 372
380 self.writer.write(msg) 373 await self.socket.send(msg)
381 finally: 374 finally:
382 request_measure.end() 375 request_measure.end()
383 self.request_sample.end() 376 self.request_sample.end()
384 377
385 await self.writer.drain()
386
387 # Post to the backfill queue after writing the result to minimize 378 # Post to the backfill queue after writing the result to minimize
388 # the turn around time on a request 379 # the turn around time on a request
389 if upstream is not None: 380 if upstream is not None:
390 await self.backfill_queue.put((method, taskhash)) 381 await self.backfill_queue.put((method, taskhash))
391 382
383 await self.socket.send("ok")
384 return self.NO_RESPONSE
385
392 async def handle_report(self, data): 386 async def handle_report(self, data):
393 with closing(self.db.cursor()) as cursor: 387 with closing(self.db.cursor()) as cursor:
394 outhash_data = { 388 outhash_data = {
@@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
468 'unihash': unihash, 462 'unihash': unihash,
469 } 463 }
470 464
471 self.write_message(d) 465 return d
472 466
473 async def handle_equivreport(self, data): 467 async def handle_equivreport(self, data):
474 with closing(self.db.cursor()) as cursor: 468 with closing(self.db.cursor()) as cursor:
@@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
491 485
492 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 486 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
493 487
494 self.write_message(d) 488 return d
495 489
496 490
497 async def handle_get_stats(self, request): 491 async def handle_get_stats(self, request):
498 d = { 492 return {
499 'requests': self.request_stats.todict(), 493 'requests': self.request_stats.todict(),
500 } 494 }
501 495
502 self.write_message(d)
503
504 async def handle_reset_stats(self, request): 496 async def handle_reset_stats(self, request):
505 d = { 497 d = {
506 'requests': self.request_stats.todict(), 498 'requests': self.request_stats.todict(),
507 } 499 }
508 500
509 self.request_stats.reset() 501 self.request_stats.reset()
510 self.write_message(d) 502 return d
511 503
512 async def handle_backfill_wait(self, request): 504 async def handle_backfill_wait(self, request):
513 d = { 505 d = {
514 'tasks': self.backfill_queue.qsize(), 506 'tasks': self.backfill_queue.qsize(),
515 } 507 }
516 await self.backfill_queue.join() 508 await self.backfill_queue.join()
517 self.write_message(d) 509 return d
518 510
519 async def handle_remove(self, request): 511 async def handle_remove(self, request):
520 condition = request["where"] 512 condition = request["where"]
@@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
541 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) 533 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
542 self.db.commit() 534 self.db.commit()
543 535
544 self.write_message({"count": count}) 536 return {"count": count}
545 537
546 async def handle_clean_unused(self, request): 538 async def handle_clean_unused(self, request):
547 max_age = request["max_age_seconds"] 539 max_age = request["max_age_seconds"]
@@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
558 ) 550 )
559 count = cursor.rowcount 551 count = cursor.rowcount
560 552
561 self.write_message({"count": count}) 553 return {"count": count}
562 554
563 def query_equivalent(self, cursor, method, taskhash): 555 def query_equivalent(self, cursor, method, taskhash):
564 # This is part of the inner loop and must be as fast as possible 556 # This is part of the inner loop and must be as fast as possible
@@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer):
583 self.db = db 575 self.db = db
584 self.upstream = upstream 576 self.upstream = upstream
585 self.read_only = read_only 577 self.read_only = read_only
578 self.backfill_queue = None
586 579
587 def accept_client(self, reader, writer): 580 def accept_client(self, socket):
588 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) 581 return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
589 582
590 @contextmanager 583 async def backfill_worker_task(self):
591 def _backfill_worker(self): 584 client = await create_async_client(self.upstream)
592 async def backfill_worker_task(): 585 try:
593 client = await create_async_client(self.upstream) 586 while True:
594 try: 587 item = await self.backfill_queue.get()
595 while True: 588 if item is None:
596 item = await self.backfill_queue.get()
597 if item is None:
598 self.backfill_queue.task_done()
599 break
600 method, taskhash = item
601 await copy_unihash_from_upstream(client, self.db, method, taskhash)
602 self.backfill_queue.task_done() 589 self.backfill_queue.task_done()
603 finally: 590 break
604 await client.close() 591 method, taskhash = item
592 await copy_unihash_from_upstream(client, self.db, method, taskhash)
593 self.backfill_queue.task_done()
594 finally:
595 await client.close()
605 596
606 async def join_worker(worker): 597 def start(self):
598 tasks = super().start()
599 if self.upstream:
600 self.backfill_queue = asyncio.Queue()
601 tasks += [self.backfill_worker_task()]
602 return tasks
603
604 async def stop(self):
605 if self.backfill_queue is not None:
607 await self.backfill_queue.put(None) 606 await self.backfill_queue.put(None)
608 await worker 607 await super().stop()
609
610 if self.upstream is not None:
611 worker = asyncio.ensure_future(backfill_worker_task())
612 try:
613 yield
614 finally:
615 self.loop.run_until_complete(join_worker(worker))
616 else:
617 yield
618
619 def run_loop_forever(self):
620 self.backfill_queue = asyncio.Queue()
621
622 with self._backfill_worker():
623 super().run_loop_forever()