diff options
| author | Joshua Watt <JPEWhacker@gmail.com> | 2023-11-03 08:26:19 -0600 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2023-11-09 17:33:02 +0000 |
| commit | 8f8501ed403dec27acbe780b936bc087fc5006d0 (patch) | |
| tree | 60e6415075c7c71eacec23ca7dda53e4a324b12e /bitbake/lib/hashserv | |
| parent | f97b686884166dd77d1818e70615027c6ba8c348 (diff) | |
| download | poky-8f8501ed403dec27acbe780b936bc087fc5006d0.tar.gz | |
bitbake: asyncrpc: Abstract sockets
Rewrites the asyncrpc client and server code to make it possible to have
other transport backends that are not stream based (e.g. websockets
which are message based). The connection handling classes are now shared
between both the client and server to make it easier to implement new
transport mechanisms
(Bitbake rev: 2aaeae53696e4c2f13a169830c3b7089cbad6eca)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv')
| -rw-r--r-- | bitbake/lib/hashserv/__init__.py | 21 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 38 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 116 |
3 files changed, 69 insertions, 106 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 9cb3fd57a5..3a4018353f 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
| @@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://" | |||
| 15 | ADDR_TYPE_UNIX = 0 | 15 | ADDR_TYPE_UNIX = 0 |
| 16 | ADDR_TYPE_TCP = 1 | 16 | ADDR_TYPE_TCP = 1 |
| 17 | 17 | ||
| 18 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
| 19 | # maximum chunk size. It would be better if the client and server reported to | ||
| 20 | # each other what the maximum chunk sizes were, but that will slow down the | ||
| 21 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
| 22 | # is necessary | ||
| 23 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
| 24 | |||
| 25 | UNIHASH_TABLE_DEFINITION = ( | 18 | UNIHASH_TABLE_DEFINITION = ( |
| 26 | ("method", "TEXT NOT NULL", "UNIQUE"), | 19 | ("method", "TEXT NOT NULL", "UNIQUE"), |
| 27 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | 20 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
| @@ -102,20 +95,6 @@ def parse_address(addr): | |||
| 102 | return (ADDR_TYPE_TCP, (host, int(port))) | 95 | return (ADDR_TYPE_TCP, (host, int(port))) |
| 103 | 96 | ||
| 104 | 97 | ||
| 105 | def chunkify(msg, max_chunk): | ||
| 106 | if len(msg) < max_chunk - 1: | ||
| 107 | yield ''.join((msg, "\n")) | ||
| 108 | else: | ||
| 109 | yield ''.join((json.dumps({ | ||
| 110 | 'chunk-stream': None | ||
| 111 | }), "\n")) | ||
| 112 | |||
| 113 | args = [iter(msg)] * (max_chunk - 1) | ||
| 114 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | ||
| 115 | yield ''.join(itertools.chain(m, "\n")) | ||
| 116 | yield "\n" | ||
| 117 | |||
| 118 | |||
| 119 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 98 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): |
| 120 | from . import server | 99 | from . import server |
| 121 | db = setup_database(dbname, sync=sync) | 100 | db = setup_database(dbname, sync=sync) |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index f676d267fa..5f7d22ab13 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 28 | 28 | ||
| 29 | async def send_stream(self, msg): | 29 | async def send_stream(self, msg): |
| 30 | async def proc(): | 30 | async def proc(): |
| 31 | self.writer.write(("%s\n" % msg).encode("utf-8")) | 31 | await self.socket.send(msg) |
| 32 | await self.writer.drain() | 32 | return await self.socket.recv() |
| 33 | l = await self.reader.readline() | ||
| 34 | if not l: | ||
| 35 | raise ConnectionError("Connection closed") | ||
| 36 | return l.decode("utf-8").rstrip() | ||
| 37 | 33 | ||
| 38 | return await self._send_wrapper(proc) | 34 | return await self._send_wrapper(proc) |
| 39 | 35 | ||
| 40 | async def _set_mode(self, new_mode): | 36 | async def _set_mode(self, new_mode): |
| 37 | async def stream_to_normal(): | ||
| 38 | await self.socket.send("END") | ||
| 39 | return await self.socket.recv() | ||
| 40 | |||
| 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: |
| 42 | r = await self.send_stream("END") | 42 | r = await self._send_wrapper(stream_to_normal) |
| 43 | if r != "ok": | 43 | if r != "ok": |
| 44 | raise ConnectionError("Bad response from server %r" % r) | 44 | raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) |
| 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: |
| 46 | r = await self.send_message({"get-stream": None}) | 46 | r = await self.invoke({"get-stream": None}) |
| 47 | if r != "ok": | 47 | if r != "ok": |
| 48 | raise ConnectionError("Bad response from server %r" % r) | 48 | raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) |
| 49 | elif new_mode != self.mode: | 49 | elif new_mode != self.mode: |
| 50 | raise Exception( | 50 | raise Exception( |
| 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) |
| @@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 67 | m["method"] = method | 67 | m["method"] = method |
| 68 | m["outhash"] = outhash | 68 | m["outhash"] = outhash |
| 69 | m["unihash"] = unihash | 69 | m["unihash"] = unihash |
| 70 | return await self.send_message({"report": m}) | 70 | return await self.invoke({"report": m}) |
| 71 | 71 | ||
| 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
| 73 | await self._set_mode(self.MODE_NORMAL) | 73 | await self._set_mode(self.MODE_NORMAL) |
| @@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 75 | m["taskhash"] = taskhash | 75 | m["taskhash"] = taskhash |
| 76 | m["method"] = method | 76 | m["method"] = method |
| 77 | m["unihash"] = unihash | 77 | m["unihash"] = unihash |
| 78 | return await self.send_message({"report-equiv": m}) | 78 | return await self.invoke({"report-equiv": m}) |
| 79 | 79 | ||
| 80 | async def get_taskhash(self, method, taskhash, all_properties=False): | 80 | async def get_taskhash(self, method, taskhash, all_properties=False): |
| 81 | await self._set_mode(self.MODE_NORMAL) | 81 | await self._set_mode(self.MODE_NORMAL) |
| 82 | return await self.send_message( | 82 | return await self.invoke( |
| 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
| 84 | ) | 84 | ) |
| 85 | 85 | ||
| 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
| 87 | await self._set_mode(self.MODE_NORMAL) | 87 | await self._set_mode(self.MODE_NORMAL) |
| 88 | return await self.send_message( | 88 | return await self.invoke( |
| 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} | 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} |
| 90 | ) | 90 | ) |
| 91 | 91 | ||
| 92 | async def get_stats(self): | 92 | async def get_stats(self): |
| 93 | await self._set_mode(self.MODE_NORMAL) | 93 | await self._set_mode(self.MODE_NORMAL) |
| 94 | return await self.send_message({"get-stats": None}) | 94 | return await self.invoke({"get-stats": None}) |
| 95 | 95 | ||
| 96 | async def reset_stats(self): | 96 | async def reset_stats(self): |
| 97 | await self._set_mode(self.MODE_NORMAL) | 97 | await self._set_mode(self.MODE_NORMAL) |
| 98 | return await self.send_message({"reset-stats": None}) | 98 | return await self.invoke({"reset-stats": None}) |
| 99 | 99 | ||
| 100 | async def backfill_wait(self): | 100 | async def backfill_wait(self): |
| 101 | await self._set_mode(self.MODE_NORMAL) | 101 | await self._set_mode(self.MODE_NORMAL) |
| 102 | return (await self.send_message({"backfill-wait": None}))["tasks"] | 102 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
| 103 | 103 | ||
| 104 | async def remove(self, where): | 104 | async def remove(self, where): |
| 105 | await self._set_mode(self.MODE_NORMAL) | 105 | await self._set_mode(self.MODE_NORMAL) |
| 106 | return await self.send_message({"remove": {"where": where}}) | 106 | return await self.invoke({"remove": {"where": where}}) |
| 107 | 107 | ||
| 108 | async def clean_unused(self, max_age): | 108 | async def clean_unused(self, max_age): |
| 109 | await self._set_mode(self.MODE_NORMAL) | 109 | await self._set_mode(self.MODE_NORMAL) |
| 110 | return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) | 110 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) |
| 111 | 111 | ||
| 112 | 112 | ||
| 113 | class Client(bb.asyncrpc.Client): | 113 | class Client(bb.asyncrpc.Client): |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 45bf476bfe..13b754805b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -165,8 +165,8 @@ class ServerCursor(object): | |||
| 165 | 165 | ||
| 166 | 166 | ||
| 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
| 168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): |
| 169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(socket, 'OEHASHEQUIV', logger) |
| 170 | self.db = db | 170 | self.db = db |
| 171 | self.request_stats = request_stats | 171 | self.request_stats = request_stats |
| 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
| @@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 209 | if k in msg: | 209 | if k in msg: |
| 210 | logger.debug('Handling %s' % k) | 210 | logger.debug('Handling %s' % k) |
| 211 | if 'stream' in k: | 211 | if 'stream' in k: |
| 212 | await self.handlers[k](msg[k]) | 212 | return await self.handlers[k](msg[k]) |
| 213 | else: | 213 | else: |
| 214 | with self.request_stats.start_sample() as self.request_sample, \ | 214 | with self.request_stats.start_sample() as self.request_sample, \ |
| 215 | self.request_sample.measure(): | 215 | self.request_sample.measure(): |
| 216 | await self.handlers[k](msg[k]) | 216 | return await self.handlers[k](msg[k]) |
| 217 | return | ||
| 218 | 217 | ||
| 219 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
| 220 | 219 | ||
| @@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 224 | fetch_all = request.get('all', False) | 223 | fetch_all = request.get('all', False) |
| 225 | 224 | ||
| 226 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
| 227 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) | 226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) |
| 228 | |||
| 229 | self.write_message(d) | ||
| 230 | 227 | ||
| 231 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
| 232 | d = None | 229 | d = None |
| @@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 274 | with_unihash = request.get("with_unihash", True) | 271 | with_unihash = request.get("with_unihash", True) |
| 275 | 272 | ||
| 276 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
| 277 | d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | 274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) |
| 278 | |||
| 279 | self.write_message(d) | ||
| 280 | 275 | ||
| 281 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): |
| 282 | d = None | 277 | d = None |
| @@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 334 | ) | 329 | ) |
| 335 | 330 | ||
| 336 | async def handle_get_stream(self, request): | 331 | async def handle_get_stream(self, request): |
| 337 | self.write_message('ok') | 332 | await self.socket.send_message("ok") |
| 338 | 333 | ||
| 339 | while True: | 334 | while True: |
| 340 | upstream = None | 335 | upstream = None |
| 341 | 336 | ||
| 342 | l = await self.reader.readline() | 337 | l = await self.socket.recv() |
| 343 | if not l: | 338 | if not l: |
| 344 | return | 339 | break |
| 345 | 340 | ||
| 346 | try: | 341 | try: |
| 347 | # This inner loop is very sensitive and must be as fast as | 342 | # This inner loop is very sensitive and must be as fast as |
| @@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 352 | request_measure = self.request_sample.measure() | 347 | request_measure = self.request_sample.measure() |
| 353 | request_measure.start() | 348 | request_measure.start() |
| 354 | 349 | ||
| 355 | l = l.decode('utf-8').rstrip() | ||
| 356 | if l == 'END': | 350 | if l == 'END': |
| 357 | self.writer.write('ok\n'.encode('utf-8')) | 351 | break |
| 358 | return | ||
| 359 | 352 | ||
| 360 | (method, taskhash) = l.split() | 353 | (method, taskhash) = l.split() |
| 361 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 354 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
| @@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 366 | cursor.close() | 359 | cursor.close() |
| 367 | 360 | ||
| 368 | if row is not None: | 361 | if row is not None: |
| 369 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 362 | msg = row['unihash'] |
| 370 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 363 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
| 371 | elif self.upstream_client is not None: | 364 | elif self.upstream_client is not None: |
| 372 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 365 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
| 373 | if upstream: | 366 | if upstream: |
| 374 | msg = ("%s\n" % upstream).encode("utf-8") | 367 | msg = upstream |
| 375 | else: | 368 | else: |
| 376 | msg = "\n".encode("utf-8") | 369 | msg = "" |
| 377 | else: | 370 | else: |
| 378 | msg = '\n'.encode('utf-8') | 371 | msg = "" |
| 379 | 372 | ||
| 380 | self.writer.write(msg) | 373 | await self.socket.send(msg) |
| 381 | finally: | 374 | finally: |
| 382 | request_measure.end() | 375 | request_measure.end() |
| 383 | self.request_sample.end() | 376 | self.request_sample.end() |
| 384 | 377 | ||
| 385 | await self.writer.drain() | ||
| 386 | |||
| 387 | # Post to the backfill queue after writing the result to minimize | 378 | # Post to the backfill queue after writing the result to minimize |
| 388 | # the turn around time on a request | 379 | # the turn around time on a request |
| 389 | if upstream is not None: | 380 | if upstream is not None: |
| 390 | await self.backfill_queue.put((method, taskhash)) | 381 | await self.backfill_queue.put((method, taskhash)) |
| 391 | 382 | ||
| 383 | await self.socket.send("ok") | ||
| 384 | return self.NO_RESPONSE | ||
| 385 | |||
| 392 | async def handle_report(self, data): | 386 | async def handle_report(self, data): |
| 393 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
| 394 | outhash_data = { | 388 | outhash_data = { |
| @@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 468 | 'unihash': unihash, | 462 | 'unihash': unihash, |
| 469 | } | 463 | } |
| 470 | 464 | ||
| 471 | self.write_message(d) | 465 | return d |
| 472 | 466 | ||
| 473 | async def handle_equivreport(self, data): | 467 | async def handle_equivreport(self, data): |
| 474 | with closing(self.db.cursor()) as cursor: | 468 | with closing(self.db.cursor()) as cursor: |
| @@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 491 | 485 | ||
| 492 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | 486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} |
| 493 | 487 | ||
| 494 | self.write_message(d) | 488 | return d |
| 495 | 489 | ||
| 496 | 490 | ||
| 497 | async def handle_get_stats(self, request): | 491 | async def handle_get_stats(self, request): |
| 498 | d = { | 492 | return { |
| 499 | 'requests': self.request_stats.todict(), | 493 | 'requests': self.request_stats.todict(), |
| 500 | } | 494 | } |
| 501 | 495 | ||
| 502 | self.write_message(d) | ||
| 503 | |||
| 504 | async def handle_reset_stats(self, request): | 496 | async def handle_reset_stats(self, request): |
| 505 | d = { | 497 | d = { |
| 506 | 'requests': self.request_stats.todict(), | 498 | 'requests': self.request_stats.todict(), |
| 507 | } | 499 | } |
| 508 | 500 | ||
| 509 | self.request_stats.reset() | 501 | self.request_stats.reset() |
| 510 | self.write_message(d) | 502 | return d |
| 511 | 503 | ||
| 512 | async def handle_backfill_wait(self, request): | 504 | async def handle_backfill_wait(self, request): |
| 513 | d = { | 505 | d = { |
| 514 | 'tasks': self.backfill_queue.qsize(), | 506 | 'tasks': self.backfill_queue.qsize(), |
| 515 | } | 507 | } |
| 516 | await self.backfill_queue.join() | 508 | await self.backfill_queue.join() |
| 517 | self.write_message(d) | 509 | return d |
| 518 | 510 | ||
| 519 | async def handle_remove(self, request): | 511 | async def handle_remove(self, request): |
| 520 | condition = request["where"] | 512 | condition = request["where"] |
| @@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 541 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) |
| 542 | self.db.commit() | 534 | self.db.commit() |
| 543 | 535 | ||
| 544 | self.write_message({"count": count}) | 536 | return {"count": count} |
| 545 | 537 | ||
| 546 | async def handle_clean_unused(self, request): | 538 | async def handle_clean_unused(self, request): |
| 547 | max_age = request["max_age_seconds"] | 539 | max_age = request["max_age_seconds"] |
| @@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 558 | ) | 550 | ) |
| 559 | count = cursor.rowcount | 551 | count = cursor.rowcount |
| 560 | 552 | ||
| 561 | self.write_message({"count": count}) | 553 | return {"count": count} |
| 562 | 554 | ||
| 563 | def query_equivalent(self, cursor, method, taskhash): | 555 | def query_equivalent(self, cursor, method, taskhash): |
| 564 | # This is part of the inner loop and must be as fast as possible | 556 | # This is part of the inner loop and must be as fast as possible |
| @@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer): | |||
| 583 | self.db = db | 575 | self.db = db |
| 584 | self.upstream = upstream | 576 | self.upstream = upstream |
| 585 | self.read_only = read_only | 577 | self.read_only = read_only |
| 578 | self.backfill_queue = None | ||
| 586 | 579 | ||
| 587 | def accept_client(self, reader, writer): | 580 | def accept_client(self, socket): |
| 588 | return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) |
| 589 | 582 | ||
| 590 | @contextmanager | 583 | async def backfill_worker_task(self): |
| 591 | def _backfill_worker(self): | 584 | client = await create_async_client(self.upstream) |
| 592 | async def backfill_worker_task(): | 585 | try: |
| 593 | client = await create_async_client(self.upstream) | 586 | while True: |
| 594 | try: | 587 | item = await self.backfill_queue.get() |
| 595 | while True: | 588 | if item is None: |
| 596 | item = await self.backfill_queue.get() | ||
| 597 | if item is None: | ||
| 598 | self.backfill_queue.task_done() | ||
| 599 | break | ||
| 600 | method, taskhash = item | ||
| 601 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
| 602 | self.backfill_queue.task_done() | 589 | self.backfill_queue.task_done() |
| 603 | finally: | 590 | break |
| 604 | await client.close() | 591 | method, taskhash = item |
| 592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
| 593 | self.backfill_queue.task_done() | ||
| 594 | finally: | ||
| 595 | await client.close() | ||
| 605 | 596 | ||
| 606 | async def join_worker(worker): | 597 | def start(self): |
| 598 | tasks = super().start() | ||
| 599 | if self.upstream: | ||
| 600 | self.backfill_queue = asyncio.Queue() | ||
| 601 | tasks += [self.backfill_worker_task()] | ||
| 602 | return tasks | ||
| 603 | |||
| 604 | async def stop(self): | ||
| 605 | if self.backfill_queue is not None: | ||
| 607 | await self.backfill_queue.put(None) | 606 | await self.backfill_queue.put(None) |
| 608 | await worker | 607 | await super().stop() |
| 609 | |||
| 610 | if self.upstream is not None: | ||
| 611 | worker = asyncio.ensure_future(backfill_worker_task()) | ||
| 612 | try: | ||
| 613 | yield | ||
| 614 | finally: | ||
| 615 | self.loop.run_until_complete(join_worker(worker)) | ||
| 616 | else: | ||
| 617 | yield | ||
| 618 | |||
| 619 | def run_loop_forever(self): | ||
| 620 | self.backfill_queue = asyncio.Queue() | ||
| 621 | |||
| 622 | with self._backfill_worker(): | ||
| 623 | super().run_loop_forever() | ||
