diff options
Diffstat (limited to 'bitbake/lib/hashserv')
| -rw-r--r-- | bitbake/lib/hashserv/__init__.py | 21 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 38 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 116 |
3 files changed, 69 insertions, 106 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 9cb3fd57a5..3a4018353f 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
| @@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://" | |||
| 15 | ADDR_TYPE_UNIX = 0 | 15 | ADDR_TYPE_UNIX = 0 |
| 16 | ADDR_TYPE_TCP = 1 | 16 | ADDR_TYPE_TCP = 1 |
| 17 | 17 | ||
| 18 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
| 19 | # maximum chunk size. It would be better if the client and server reported to | ||
| 20 | # each other what the maximum chunk sizes were, but that will slow down the | ||
| 21 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
| 22 | # is necessary | ||
| 23 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
| 24 | |||
| 25 | UNIHASH_TABLE_DEFINITION = ( | 18 | UNIHASH_TABLE_DEFINITION = ( |
| 26 | ("method", "TEXT NOT NULL", "UNIQUE"), | 19 | ("method", "TEXT NOT NULL", "UNIQUE"), |
| 27 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | 20 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
| @@ -102,20 +95,6 @@ def parse_address(addr): | |||
| 102 | return (ADDR_TYPE_TCP, (host, int(port))) | 95 | return (ADDR_TYPE_TCP, (host, int(port))) |
| 103 | 96 | ||
| 104 | 97 | ||
| 105 | def chunkify(msg, max_chunk): | ||
| 106 | if len(msg) < max_chunk - 1: | ||
| 107 | yield ''.join((msg, "\n")) | ||
| 108 | else: | ||
| 109 | yield ''.join((json.dumps({ | ||
| 110 | 'chunk-stream': None | ||
| 111 | }), "\n")) | ||
| 112 | |||
| 113 | args = [iter(msg)] * (max_chunk - 1) | ||
| 114 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | ||
| 115 | yield ''.join(itertools.chain(m, "\n")) | ||
| 116 | yield "\n" | ||
| 117 | |||
| 118 | |||
| 119 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 98 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): |
| 120 | from . import server | 99 | from . import server |
| 121 | db = setup_database(dbname, sync=sync) | 100 | db = setup_database(dbname, sync=sync) |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index f676d267fa..5f7d22ab13 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 28 | 28 | ||
| 29 | async def send_stream(self, msg): | 29 | async def send_stream(self, msg): |
| 30 | async def proc(): | 30 | async def proc(): |
| 31 | self.writer.write(("%s\n" % msg).encode("utf-8")) | 31 | await self.socket.send(msg) |
| 32 | await self.writer.drain() | 32 | return await self.socket.recv() |
| 33 | l = await self.reader.readline() | ||
| 34 | if not l: | ||
| 35 | raise ConnectionError("Connection closed") | ||
| 36 | return l.decode("utf-8").rstrip() | ||
| 37 | 33 | ||
| 38 | return await self._send_wrapper(proc) | 34 | return await self._send_wrapper(proc) |
| 39 | 35 | ||
| 40 | async def _set_mode(self, new_mode): | 36 | async def _set_mode(self, new_mode): |
| 37 | async def stream_to_normal(): | ||
| 38 | await self.socket.send("END") | ||
| 39 | return await self.socket.recv() | ||
| 40 | |||
| 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: |
| 42 | r = await self.send_stream("END") | 42 | r = await self._send_wrapper(stream_to_normal) |
| 43 | if r != "ok": | 43 | if r != "ok": |
| 44 | raise ConnectionError("Bad response from server %r" % r) | 44 | raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) |
| 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: |
| 46 | r = await self.send_message({"get-stream": None}) | 46 | r = await self.invoke({"get-stream": None}) |
| 47 | if r != "ok": | 47 | if r != "ok": |
| 48 | raise ConnectionError("Bad response from server %r" % r) | 48 | raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) |
| 49 | elif new_mode != self.mode: | 49 | elif new_mode != self.mode: |
| 50 | raise Exception( | 50 | raise Exception( |
| 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) |
| @@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 67 | m["method"] = method | 67 | m["method"] = method |
| 68 | m["outhash"] = outhash | 68 | m["outhash"] = outhash |
| 69 | m["unihash"] = unihash | 69 | m["unihash"] = unihash |
| 70 | return await self.send_message({"report": m}) | 70 | return await self.invoke({"report": m}) |
| 71 | 71 | ||
| 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
| 73 | await self._set_mode(self.MODE_NORMAL) | 73 | await self._set_mode(self.MODE_NORMAL) |
| @@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 75 | m["taskhash"] = taskhash | 75 | m["taskhash"] = taskhash |
| 76 | m["method"] = method | 76 | m["method"] = method |
| 77 | m["unihash"] = unihash | 77 | m["unihash"] = unihash |
| 78 | return await self.send_message({"report-equiv": m}) | 78 | return await self.invoke({"report-equiv": m}) |
| 79 | 79 | ||
| 80 | async def get_taskhash(self, method, taskhash, all_properties=False): | 80 | async def get_taskhash(self, method, taskhash, all_properties=False): |
| 81 | await self._set_mode(self.MODE_NORMAL) | 81 | await self._set_mode(self.MODE_NORMAL) |
| 82 | return await self.send_message( | 82 | return await self.invoke( |
| 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
| 84 | ) | 84 | ) |
| 85 | 85 | ||
| 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
| 87 | await self._set_mode(self.MODE_NORMAL) | 87 | await self._set_mode(self.MODE_NORMAL) |
| 88 | return await self.send_message( | 88 | return await self.invoke( |
| 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} | 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} |
| 90 | ) | 90 | ) |
| 91 | 91 | ||
| 92 | async def get_stats(self): | 92 | async def get_stats(self): |
| 93 | await self._set_mode(self.MODE_NORMAL) | 93 | await self._set_mode(self.MODE_NORMAL) |
| 94 | return await self.send_message({"get-stats": None}) | 94 | return await self.invoke({"get-stats": None}) |
| 95 | 95 | ||
| 96 | async def reset_stats(self): | 96 | async def reset_stats(self): |
| 97 | await self._set_mode(self.MODE_NORMAL) | 97 | await self._set_mode(self.MODE_NORMAL) |
| 98 | return await self.send_message({"reset-stats": None}) | 98 | return await self.invoke({"reset-stats": None}) |
| 99 | 99 | ||
| 100 | async def backfill_wait(self): | 100 | async def backfill_wait(self): |
| 101 | await self._set_mode(self.MODE_NORMAL) | 101 | await self._set_mode(self.MODE_NORMAL) |
| 102 | return (await self.send_message({"backfill-wait": None}))["tasks"] | 102 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
| 103 | 103 | ||
| 104 | async def remove(self, where): | 104 | async def remove(self, where): |
| 105 | await self._set_mode(self.MODE_NORMAL) | 105 | await self._set_mode(self.MODE_NORMAL) |
| 106 | return await self.send_message({"remove": {"where": where}}) | 106 | return await self.invoke({"remove": {"where": where}}) |
| 107 | 107 | ||
| 108 | async def clean_unused(self, max_age): | 108 | async def clean_unused(self, max_age): |
| 109 | await self._set_mode(self.MODE_NORMAL) | 109 | await self._set_mode(self.MODE_NORMAL) |
| 110 | return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) | 110 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) |
| 111 | 111 | ||
| 112 | 112 | ||
| 113 | class Client(bb.asyncrpc.Client): | 113 | class Client(bb.asyncrpc.Client): |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 45bf476bfe..13b754805b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -165,8 +165,8 @@ class ServerCursor(object): | |||
| 165 | 165 | ||
| 166 | 166 | ||
| 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
| 168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): |
| 169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(socket, 'OEHASHEQUIV', logger) |
| 170 | self.db = db | 170 | self.db = db |
| 171 | self.request_stats = request_stats | 171 | self.request_stats = request_stats |
| 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
| @@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 209 | if k in msg: | 209 | if k in msg: |
| 210 | logger.debug('Handling %s' % k) | 210 | logger.debug('Handling %s' % k) |
| 211 | if 'stream' in k: | 211 | if 'stream' in k: |
| 212 | await self.handlers[k](msg[k]) | 212 | return await self.handlers[k](msg[k]) |
| 213 | else: | 213 | else: |
| 214 | with self.request_stats.start_sample() as self.request_sample, \ | 214 | with self.request_stats.start_sample() as self.request_sample, \ |
| 215 | self.request_sample.measure(): | 215 | self.request_sample.measure(): |
| 216 | await self.handlers[k](msg[k]) | 216 | return await self.handlers[k](msg[k]) |
| 217 | return | ||
| 218 | 217 | ||
| 219 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
| 220 | 219 | ||
| @@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 224 | fetch_all = request.get('all', False) | 223 | fetch_all = request.get('all', False) |
| 225 | 224 | ||
| 226 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
| 227 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) | 226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) |
| 228 | |||
| 229 | self.write_message(d) | ||
| 230 | 227 | ||
| 231 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
| 232 | d = None | 229 | d = None |
| @@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 274 | with_unihash = request.get("with_unihash", True) | 271 | with_unihash = request.get("with_unihash", True) |
| 275 | 272 | ||
| 276 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
| 277 | d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | 274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) |
| 278 | |||
| 279 | self.write_message(d) | ||
| 280 | 275 | ||
| 281 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): |
| 282 | d = None | 277 | d = None |
| @@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 334 | ) | 329 | ) |
| 335 | 330 | ||
| 336 | async def handle_get_stream(self, request): | 331 | async def handle_get_stream(self, request): |
| 337 | self.write_message('ok') | 332 | await self.socket.send_message("ok") |
| 338 | 333 | ||
| 339 | while True: | 334 | while True: |
| 340 | upstream = None | 335 | upstream = None |
| 341 | 336 | ||
| 342 | l = await self.reader.readline() | 337 | l = await self.socket.recv() |
| 343 | if not l: | 338 | if not l: |
| 344 | return | 339 | break |
| 345 | 340 | ||
| 346 | try: | 341 | try: |
| 347 | # This inner loop is very sensitive and must be as fast as | 342 | # This inner loop is very sensitive and must be as fast as |
| @@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 352 | request_measure = self.request_sample.measure() | 347 | request_measure = self.request_sample.measure() |
| 353 | request_measure.start() | 348 | request_measure.start() |
| 354 | 349 | ||
| 355 | l = l.decode('utf-8').rstrip() | ||
| 356 | if l == 'END': | 350 | if l == 'END': |
| 357 | self.writer.write('ok\n'.encode('utf-8')) | 351 | break |
| 358 | return | ||
| 359 | 352 | ||
| 360 | (method, taskhash) = l.split() | 353 | (method, taskhash) = l.split() |
| 361 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 354 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
| @@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 366 | cursor.close() | 359 | cursor.close() |
| 367 | 360 | ||
| 368 | if row is not None: | 361 | if row is not None: |
| 369 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 362 | msg = row['unihash'] |
| 370 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 363 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
| 371 | elif self.upstream_client is not None: | 364 | elif self.upstream_client is not None: |
| 372 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 365 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
| 373 | if upstream: | 366 | if upstream: |
| 374 | msg = ("%s\n" % upstream).encode("utf-8") | 367 | msg = upstream |
| 375 | else: | 368 | else: |
| 376 | msg = "\n".encode("utf-8") | 369 | msg = "" |
| 377 | else: | 370 | else: |
| 378 | msg = '\n'.encode('utf-8') | 371 | msg = "" |
| 379 | 372 | ||
| 380 | self.writer.write(msg) | 373 | await self.socket.send(msg) |
| 381 | finally: | 374 | finally: |
| 382 | request_measure.end() | 375 | request_measure.end() |
| 383 | self.request_sample.end() | 376 | self.request_sample.end() |
| 384 | 377 | ||
| 385 | await self.writer.drain() | ||
| 386 | |||
| 387 | # Post to the backfill queue after writing the result to minimize | 378 | # Post to the backfill queue after writing the result to minimize |
| 388 | # the turn around time on a request | 379 | # the turn around time on a request |
| 389 | if upstream is not None: | 380 | if upstream is not None: |
| 390 | await self.backfill_queue.put((method, taskhash)) | 381 | await self.backfill_queue.put((method, taskhash)) |
| 391 | 382 | ||
| 383 | await self.socket.send("ok") | ||
| 384 | return self.NO_RESPONSE | ||
| 385 | |||
| 392 | async def handle_report(self, data): | 386 | async def handle_report(self, data): |
| 393 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
| 394 | outhash_data = { | 388 | outhash_data = { |
| @@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 468 | 'unihash': unihash, | 462 | 'unihash': unihash, |
| 469 | } | 463 | } |
| 470 | 464 | ||
| 471 | self.write_message(d) | 465 | return d |
| 472 | 466 | ||
| 473 | async def handle_equivreport(self, data): | 467 | async def handle_equivreport(self, data): |
| 474 | with closing(self.db.cursor()) as cursor: | 468 | with closing(self.db.cursor()) as cursor: |
| @@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 491 | 485 | ||
| 492 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | 486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} |
| 493 | 487 | ||
| 494 | self.write_message(d) | 488 | return d |
| 495 | 489 | ||
| 496 | 490 | ||
| 497 | async def handle_get_stats(self, request): | 491 | async def handle_get_stats(self, request): |
| 498 | d = { | 492 | return { |
| 499 | 'requests': self.request_stats.todict(), | 493 | 'requests': self.request_stats.todict(), |
| 500 | } | 494 | } |
| 501 | 495 | ||
| 502 | self.write_message(d) | ||
| 503 | |||
| 504 | async def handle_reset_stats(self, request): | 496 | async def handle_reset_stats(self, request): |
| 505 | d = { | 497 | d = { |
| 506 | 'requests': self.request_stats.todict(), | 498 | 'requests': self.request_stats.todict(), |
| 507 | } | 499 | } |
| 508 | 500 | ||
| 509 | self.request_stats.reset() | 501 | self.request_stats.reset() |
| 510 | self.write_message(d) | 502 | return d |
| 511 | 503 | ||
| 512 | async def handle_backfill_wait(self, request): | 504 | async def handle_backfill_wait(self, request): |
| 513 | d = { | 505 | d = { |
| 514 | 'tasks': self.backfill_queue.qsize(), | 506 | 'tasks': self.backfill_queue.qsize(), |
| 515 | } | 507 | } |
| 516 | await self.backfill_queue.join() | 508 | await self.backfill_queue.join() |
| 517 | self.write_message(d) | 509 | return d |
| 518 | 510 | ||
| 519 | async def handle_remove(self, request): | 511 | async def handle_remove(self, request): |
| 520 | condition = request["where"] | 512 | condition = request["where"] |
| @@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 541 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) |
| 542 | self.db.commit() | 534 | self.db.commit() |
| 543 | 535 | ||
| 544 | self.write_message({"count": count}) | 536 | return {"count": count} |
| 545 | 537 | ||
| 546 | async def handle_clean_unused(self, request): | 538 | async def handle_clean_unused(self, request): |
| 547 | max_age = request["max_age_seconds"] | 539 | max_age = request["max_age_seconds"] |
| @@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 558 | ) | 550 | ) |
| 559 | count = cursor.rowcount | 551 | count = cursor.rowcount |
| 560 | 552 | ||
| 561 | self.write_message({"count": count}) | 553 | return {"count": count} |
| 562 | 554 | ||
| 563 | def query_equivalent(self, cursor, method, taskhash): | 555 | def query_equivalent(self, cursor, method, taskhash): |
| 564 | # This is part of the inner loop and must be as fast as possible | 556 | # This is part of the inner loop and must be as fast as possible |
| @@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer): | |||
| 583 | self.db = db | 575 | self.db = db |
| 584 | self.upstream = upstream | 576 | self.upstream = upstream |
| 585 | self.read_only = read_only | 577 | self.read_only = read_only |
| 578 | self.backfill_queue = None | ||
| 586 | 579 | ||
| 587 | def accept_client(self, reader, writer): | 580 | def accept_client(self, socket): |
| 588 | return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) |
| 589 | 582 | ||
| 590 | @contextmanager | 583 | async def backfill_worker_task(self): |
| 591 | def _backfill_worker(self): | 584 | client = await create_async_client(self.upstream) |
| 592 | async def backfill_worker_task(): | 585 | try: |
| 593 | client = await create_async_client(self.upstream) | 586 | while True: |
| 594 | try: | 587 | item = await self.backfill_queue.get() |
| 595 | while True: | 588 | if item is None: |
| 596 | item = await self.backfill_queue.get() | ||
| 597 | if item is None: | ||
| 598 | self.backfill_queue.task_done() | ||
| 599 | break | ||
| 600 | method, taskhash = item | ||
| 601 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
| 602 | self.backfill_queue.task_done() | 589 | self.backfill_queue.task_done() |
| 603 | finally: | 590 | break |
| 604 | await client.close() | 591 | method, taskhash = item |
| 592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
| 593 | self.backfill_queue.task_done() | ||
| 594 | finally: | ||
| 595 | await client.close() | ||
| 605 | 596 | ||
| 606 | async def join_worker(worker): | 597 | def start(self): |
| 598 | tasks = super().start() | ||
| 599 | if self.upstream: | ||
| 600 | self.backfill_queue = asyncio.Queue() | ||
| 601 | tasks += [self.backfill_worker_task()] | ||
| 602 | return tasks | ||
| 603 | |||
| 604 | async def stop(self): | ||
| 605 | if self.backfill_queue is not None: | ||
| 607 | await self.backfill_queue.put(None) | 606 | await self.backfill_queue.put(None) |
| 608 | await worker | 607 | await super().stop() |
| 609 | |||
| 610 | if self.upstream is not None: | ||
| 611 | worker = asyncio.ensure_future(backfill_worker_task()) | ||
| 612 | try: | ||
| 613 | yield | ||
| 614 | finally: | ||
| 615 | self.loop.run_until_complete(join_worker(worker)) | ||
| 616 | else: | ||
| 617 | yield | ||
| 618 | |||
| 619 | def run_loop_forever(self): | ||
| 620 | self.backfill_queue = asyncio.Queue() | ||
| 621 | |||
| 622 | with self._backfill_worker(): | ||
| 623 | super().run_loop_forever() | ||
