diff options
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 32 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 78 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 95 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/exceptions.py | 17 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/serv.py | 304 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/__init__.py | 21 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 38 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/server.py | 116 | ||||
| -rw-r--r-- | bitbake/lib/prserv/client.py | 8 | ||||
| -rw-r--r-- | bitbake/lib/prserv/serv.py | 31 |
10 files changed, 387 insertions, 353 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index 9a85e9965b..9f677eac4c 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
| @@ -4,30 +4,12 @@ | |||
| 4 | # SPDX-License-Identifier: GPL-2.0-only | 4 | # SPDX-License-Identifier: GPL-2.0-only |
| 5 | # | 5 | # |
| 6 | 6 | ||
| 7 | import itertools | ||
| 8 | import json | ||
| 9 | |||
| 10 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
| 11 | # maximum chunk size. It would be better if the client and server reported to | ||
| 12 | # each other what the maximum chunk sizes were, but that will slow down the | ||
| 13 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
| 14 | # is necessary | ||
| 15 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
| 16 | |||
| 17 | |||
| 18 | def chunkify(msg, max_chunk): | ||
| 19 | if len(msg) < max_chunk - 1: | ||
| 20 | yield ''.join((msg, "\n")) | ||
| 21 | else: | ||
| 22 | yield ''.join((json.dumps({ | ||
| 23 | 'chunk-stream': None | ||
| 24 | }), "\n")) | ||
| 25 | |||
| 26 | args = [iter(msg)] * (max_chunk - 1) | ||
| 27 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | ||
| 28 | yield ''.join(itertools.chain(m, "\n")) | ||
| 29 | yield "\n" | ||
| 30 | |||
| 31 | 7 | ||
| 32 | from .client import AsyncClient, Client | 8 | from .client import AsyncClient, Client |
| 33 | from .serv import AsyncServer, AsyncServerConnection, ClientError, ServerError | 9 | from .serv import AsyncServer, AsyncServerConnection |
| 10 | from .connection import DEFAULT_MAX_CHUNK | ||
| 11 | from .exceptions import ( | ||
| 12 | ClientError, | ||
| 13 | ServerError, | ||
| 14 | ConnectionClosedError, | ||
| 15 | ) | ||
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87..7f33099b63 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
| @@ -10,13 +10,13 @@ import json | |||
| 10 | import os | 10 | import os |
| 11 | import socket | 11 | import socket |
| 12 | import sys | 12 | import sys |
| 13 | from . import chunkify, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK |
| 14 | from .exceptions import ConnectionClosedError | ||
| 14 | 15 | ||
| 15 | 16 | ||
| 16 | class AsyncClient(object): | 17 | class AsyncClient(object): |
| 17 | def __init__(self, proto_name, proto_version, logger, timeout=30): | 18 | def __init__(self, proto_name, proto_version, logger, timeout=30): |
| 18 | self.reader = None | 19 | self.socket = None |
| 19 | self.writer = None | ||
| 20 | self.max_chunk = DEFAULT_MAX_CHUNK | 20 | self.max_chunk = DEFAULT_MAX_CHUNK |
| 21 | self.proto_name = proto_name | 21 | self.proto_name = proto_name |
| 22 | self.proto_version = proto_version | 22 | self.proto_version = proto_version |
| @@ -25,7 +25,8 @@ class AsyncClient(object): | |||
| 25 | 25 | ||
| 26 | async def connect_tcp(self, address, port): | 26 | async def connect_tcp(self, address, port): |
| 27 | async def connect_sock(): | 27 | async def connect_sock(): |
| 28 | return await asyncio.open_connection(address, port) | 28 | reader, writer = await asyncio.open_connection(address, port) |
| 29 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
| 29 | 30 | ||
| 30 | self._connect_sock = connect_sock | 31 | self._connect_sock = connect_sock |
| 31 | 32 | ||
| @@ -40,27 +41,27 @@ class AsyncClient(object): | |||
| 40 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) | 41 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) |
| 41 | sock.connect(os.path.basename(path)) | 42 | sock.connect(os.path.basename(path)) |
| 42 | finally: | 43 | finally: |
| 43 | os.chdir(cwd) | 44 | os.chdir(cwd) |
| 44 | return await asyncio.open_unix_connection(sock=sock) | 45 | reader, writer = await asyncio.open_unix_connection(sock=sock) |
| 46 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
| 45 | 47 | ||
| 46 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
| 47 | 49 | ||
| 48 | async def setup_connection(self): | 50 | async def setup_connection(self): |
| 49 | s = '%s %s\n\n' % (self.proto_name, self.proto_version) | 51 | # Send headers |
| 50 | self.writer.write(s.encode("utf-8")) | 52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
| 51 | await self.writer.drain() | 53 | # End of headers |
| 54 | await self.socket.send("") | ||
| 52 | 55 | ||
| 53 | async def connect(self): | 56 | async def connect(self): |
| 54 | if self.reader is None or self.writer is None: | 57 | if self.socket is None: |
| 55 | (self.reader, self.writer) = await self._connect_sock() | 58 | self.socket = await self._connect_sock() |
| 56 | await self.setup_connection() | 59 | await self.setup_connection() |
| 57 | 60 | ||
| 58 | async def close(self): | 61 | async def close(self): |
| 59 | self.reader = None | 62 | if self.socket is not None: |
| 60 | 63 | await self.socket.close() | |
| 61 | if self.writer is not None: | 64 | self.socket = None |
| 62 | self.writer.close() | ||
| 63 | self.writer = None | ||
| 64 | 65 | ||
| 65 | async def _send_wrapper(self, proc): | 66 | async def _send_wrapper(self, proc): |
| 66 | count = 0 | 67 | count = 0 |
| @@ -71,6 +72,7 @@ class AsyncClient(object): | |||
| 71 | except ( | 72 | except ( |
| 72 | OSError, | 73 | OSError, |
| 73 | ConnectionError, | 74 | ConnectionError, |
| 75 | ConnectionClosedError, | ||
| 74 | json.JSONDecodeError, | 76 | json.JSONDecodeError, |
| 75 | UnicodeDecodeError, | 77 | UnicodeDecodeError, |
| 76 | ) as e: | 78 | ) as e: |
| @@ -82,49 +84,15 @@ class AsyncClient(object): | |||
| 82 | await self.close() | 84 | await self.close() |
| 83 | count += 1 | 85 | count += 1 |
| 84 | 86 | ||
| 85 | async def send_message(self, msg): | 87 | async def invoke(self, msg): |
| 86 | async def get_line(): | ||
| 87 | try: | ||
| 88 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
| 89 | except asyncio.TimeoutError: | ||
| 90 | raise ConnectionError("Timed out waiting for server") | ||
| 91 | |||
| 92 | if not line: | ||
| 93 | raise ConnectionError("Connection closed") | ||
| 94 | |||
| 95 | line = line.decode("utf-8") | ||
| 96 | |||
| 97 | if not line.endswith("\n"): | ||
| 98 | raise ConnectionError("Bad message %r" % (line)) | ||
| 99 | |||
| 100 | return line | ||
| 101 | |||
| 102 | async def proc(): | 88 | async def proc(): |
| 103 | for c in chunkify(json.dumps(msg), self.max_chunk): | 89 | await self.socket.send_message(msg) |
| 104 | self.writer.write(c.encode("utf-8")) | 90 | return await self.socket.recv_message() |
| 105 | await self.writer.drain() | ||
| 106 | |||
| 107 | l = await get_line() | ||
| 108 | |||
| 109 | m = json.loads(l) | ||
| 110 | if m and "chunk-stream" in m: | ||
| 111 | lines = [] | ||
| 112 | while True: | ||
| 113 | l = (await get_line()).rstrip("\n") | ||
| 114 | if not l: | ||
| 115 | break | ||
| 116 | lines.append(l) | ||
| 117 | |||
| 118 | m = json.loads("".join(lines)) | ||
| 119 | |||
| 120 | return m | ||
| 121 | 91 | ||
| 122 | return await self._send_wrapper(proc) | 92 | return await self._send_wrapper(proc) |
| 123 | 93 | ||
| 124 | async def ping(self): | 94 | async def ping(self): |
| 125 | return await self.send_message( | 95 | return await self.invoke({"ping": {}}) |
| 126 | {'ping': {}} | ||
| 127 | ) | ||
| 128 | 96 | ||
| 129 | 97 | ||
| 130 | class Client(object): | 98 | class Client(object): |
| @@ -142,7 +110,7 @@ class Client(object): | |||
| 142 | # required (but harmless) with it. | 110 | # required (but harmless) with it. |
| 143 | asyncio.set_event_loop(self.loop) | 111 | asyncio.set_event_loop(self.loop) |
| 144 | 112 | ||
| 145 | self._add_methods('connect_tcp', 'ping') | 113 | self._add_methods("connect_tcp", "ping") |
| 146 | 114 | ||
| 147 | @abc.abstractmethod | 115 | @abc.abstractmethod |
| 148 | def _get_async_client(self): | 116 | def _get_async_client(self): |
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py new file mode 100644 index 0000000000..c4fd24754c --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
| @@ -0,0 +1,95 @@ | |||
| 1 | # | ||
| 2 | # Copyright BitBake Contributors | ||
| 3 | # | ||
| 4 | # SPDX-License-Identifier: GPL-2.0-only | ||
| 5 | # | ||
| 6 | |||
| 7 | import asyncio | ||
| 8 | import itertools | ||
| 9 | import json | ||
| 10 | from .exceptions import ClientError, ConnectionClosedError | ||
| 11 | |||
| 12 | |||
| 13 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
| 14 | # maximum chunk size. It would be better if the client and server reported to | ||
| 15 | # each other what the maximum chunk sizes were, but that will slow down the | ||
| 16 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
| 17 | # is necessary | ||
| 18 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
| 19 | |||
| 20 | |||
| 21 | def chunkify(msg, max_chunk): | ||
| 22 | if len(msg) < max_chunk - 1: | ||
| 23 | yield "".join((msg, "\n")) | ||
| 24 | else: | ||
| 25 | yield "".join((json.dumps({"chunk-stream": None}), "\n")) | ||
| 26 | |||
| 27 | args = [iter(msg)] * (max_chunk - 1) | ||
| 28 | for m in map("".join, itertools.zip_longest(*args, fillvalue="")): | ||
| 29 | yield "".join(itertools.chain(m, "\n")) | ||
| 30 | yield "\n" | ||
| 31 | |||
| 32 | |||
| 33 | class StreamConnection(object): | ||
| 34 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): | ||
| 35 | self.reader = reader | ||
| 36 | self.writer = writer | ||
| 37 | self.timeout = timeout | ||
| 38 | self.max_chunk = max_chunk | ||
| 39 | |||
| 40 | @property | ||
| 41 | def address(self): | ||
| 42 | return self.writer.get_extra_info("peername") | ||
| 43 | |||
| 44 | async def send_message(self, msg): | ||
| 45 | for c in chunkify(json.dumps(msg), self.max_chunk): | ||
| 46 | self.writer.write(c.encode("utf-8")) | ||
| 47 | await self.writer.drain() | ||
| 48 | |||
| 49 | async def recv_message(self): | ||
| 50 | l = await self.recv() | ||
| 51 | |||
| 52 | m = json.loads(l) | ||
| 53 | if not m: | ||
| 54 | return m | ||
| 55 | |||
| 56 | if "chunk-stream" in m: | ||
| 57 | lines = [] | ||
| 58 | while True: | ||
| 59 | l = await self.recv() | ||
| 60 | if not l: | ||
| 61 | break | ||
| 62 | lines.append(l) | ||
| 63 | |||
| 64 | m = json.loads("".join(lines)) | ||
| 65 | |||
| 66 | return m | ||
| 67 | |||
| 68 | async def send(self, msg): | ||
| 69 | self.writer.write(("%s\n" % msg).encode("utf-8")) | ||
| 70 | await self.writer.drain() | ||
| 71 | |||
| 72 | async def recv(self): | ||
| 73 | if self.timeout < 0: | ||
| 74 | line = await self.reader.readline() | ||
| 75 | else: | ||
| 76 | try: | ||
| 77 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
| 78 | except asyncio.TimeoutError: | ||
| 79 | raise ConnectionError("Timed out waiting for data") | ||
| 80 | |||
| 81 | if not line: | ||
| 82 | raise ConnectionClosedError("Connection closed") | ||
| 83 | |||
| 84 | line = line.decode("utf-8") | ||
| 85 | |||
| 86 | if not line.endswith("\n"): | ||
| 87 | raise ConnectionError("Bad message %r" % (line)) | ||
| 88 | |||
| 89 | return line.rstrip() | ||
| 90 | |||
| 91 | async def close(self): | ||
| 92 | self.reader = None | ||
| 93 | if self.writer is not None: | ||
| 94 | self.writer.close() | ||
| 95 | self.writer = None | ||
diff --git a/bitbake/lib/bb/asyncrpc/exceptions.py b/bitbake/lib/bb/asyncrpc/exceptions.py new file mode 100644 index 0000000000..a8942b4f0c --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/exceptions.py | |||
| @@ -0,0 +1,17 @@ | |||
| 1 | # | ||
| 2 | # Copyright BitBake Contributors | ||
| 3 | # | ||
| 4 | # SPDX-License-Identifier: GPL-2.0-only | ||
| 5 | # | ||
| 6 | |||
| 7 | |||
| 8 | class ClientError(Exception): | ||
| 9 | pass | ||
| 10 | |||
| 11 | |||
| 12 | class ServerError(Exception): | ||
| 13 | pass | ||
| 14 | |||
| 15 | |||
| 16 | class ConnectionClosedError(Exception): | ||
| 17 | pass | ||
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index d2de4891b8..3e0d0632cb 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py | |||
| @@ -12,241 +12,248 @@ import signal | |||
| 12 | import socket | 12 | import socket |
| 13 | import sys | 13 | import sys |
| 14 | import multiprocessing | 14 | import multiprocessing |
| 15 | from . import chunkify, DEFAULT_MAX_CHUNK | 15 | from .connection import StreamConnection |
| 16 | 16 | from .exceptions import ClientError, ServerError, ConnectionClosedError | |
| 17 | |||
| 18 | class ClientError(Exception): | ||
| 19 | pass | ||
| 20 | |||
| 21 | |||
| 22 | class ServerError(Exception): | ||
| 23 | pass | ||
| 24 | 17 | ||
| 25 | 18 | ||
| 26 | class AsyncServerConnection(object): | 19 | class AsyncServerConnection(object): |
| 27 | def __init__(self, reader, writer, proto_name, logger): | 20 | # If a handler returns this object (e.g. `return self.NO_RESPONSE`), no |
| 28 | self.reader = reader | 21 | # return message will be automatically be sent back to the client |
| 29 | self.writer = writer | 22 | NO_RESPONSE = object() |
| 23 | |||
| 24 | def __init__(self, socket, proto_name, logger): | ||
| 25 | self.socket = socket | ||
| 30 | self.proto_name = proto_name | 26 | self.proto_name = proto_name |
| 31 | self.max_chunk = DEFAULT_MAX_CHUNK | ||
| 32 | self.handlers = { | 27 | self.handlers = { |
| 33 | 'chunk-stream': self.handle_chunk, | 28 | "ping": self.handle_ping, |
| 34 | 'ping': self.handle_ping, | ||
| 35 | } | 29 | } |
| 36 | self.logger = logger | 30 | self.logger = logger |
| 37 | 31 | ||
| 32 | async def close(self): | ||
| 33 | await self.socket.close() | ||
| 34 | |||
| 38 | async def process_requests(self): | 35 | async def process_requests(self): |
| 39 | try: | 36 | try: |
| 40 | self.addr = self.writer.get_extra_info('peername') | 37 | self.logger.info("Client %r connected" % (self.socket.address,)) |
| 41 | self.logger.debug('Client %r connected' % (self.addr,)) | ||
| 42 | 38 | ||
| 43 | # Read protocol and version | 39 | # Read protocol and version |
| 44 | client_protocol = await self.reader.readline() | 40 | client_protocol = await self.socket.recv() |
| 45 | if not client_protocol: | 41 | if not client_protocol: |
| 46 | return | 42 | return |
| 47 | 43 | ||
| 48 | (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split() | 44 | (client_proto_name, client_proto_version) = client_protocol.split() |
| 49 | if client_proto_name != self.proto_name: | 45 | if client_proto_name != self.proto_name: |
| 50 | self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name)) | 46 | self.logger.debug("Rejecting invalid protocol %s" % (self.proto_name)) |
| 51 | return | 47 | return |
| 52 | 48 | ||
| 53 | self.proto_version = tuple(int(v) for v in client_proto_version.split('.')) | 49 | self.proto_version = tuple(int(v) for v in client_proto_version.split(".")) |
| 54 | if not self.validate_proto_version(): | 50 | if not self.validate_proto_version(): |
| 55 | self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version)) | 51 | self.logger.debug( |
| 52 | "Rejecting invalid protocol version %s" % (client_proto_version) | ||
| 53 | ) | ||
| 56 | return | 54 | return |
| 57 | 55 | ||
| 58 | # Read headers. Currently, no headers are implemented, so look for | 56 | # Read headers. Currently, no headers are implemented, so look for |
| 59 | # an empty line to signal the end of the headers | 57 | # an empty line to signal the end of the headers |
| 60 | while True: | 58 | while True: |
| 61 | line = await self.reader.readline() | 59 | header = await self.socket.recv() |
| 62 | if not line: | 60 | if not header: |
| 63 | return | ||
| 64 | |||
| 65 | line = line.decode('utf-8').rstrip() | ||
| 66 | if not line: | ||
| 67 | break | 61 | break |
| 68 | 62 | ||
| 69 | # Handle messages | 63 | # Handle messages |
| 70 | while True: | 64 | while True: |
| 71 | d = await self.read_message() | 65 | d = await self.socket.recv_message() |
| 72 | if d is None: | 66 | if d is None: |
| 73 | break | 67 | break |
| 74 | await self.dispatch_message(d) | 68 | response = await self.dispatch_message(d) |
| 75 | await self.writer.drain() | 69 | if response is not self.NO_RESPONSE: |
| 76 | except ClientError as e: | 70 | await self.socket.send_message(response) |
| 71 | |||
| 72 | except ConnectionClosedError as e: | ||
| 73 | self.logger.info(str(e)) | ||
| 74 | except (ClientError, ConnectionError) as e: | ||
| 77 | self.logger.error(str(e)) | 75 | self.logger.error(str(e)) |
| 78 | finally: | 76 | finally: |
| 79 | self.writer.close() | 77 | await self.close() |
| 80 | 78 | ||
| 81 | async def dispatch_message(self, msg): | 79 | async def dispatch_message(self, msg): |
| 82 | for k in self.handlers.keys(): | 80 | for k in self.handlers.keys(): |
| 83 | if k in msg: | 81 | if k in msg: |
| 84 | self.logger.debug('Handling %s' % k) | 82 | self.logger.debug("Handling %s" % k) |
| 85 | await self.handlers[k](msg[k]) | 83 | return await self.handlers[k](msg[k]) |
| 86 | return | ||
| 87 | 84 | ||
| 88 | raise ClientError("Unrecognized command %r" % msg) | 85 | raise ClientError("Unrecognized command %r" % msg) |
| 89 | 86 | ||
| 90 | def write_message(self, msg): | 87 | async def handle_ping(self, request): |
| 91 | for c in chunkify(json.dumps(msg), self.max_chunk): | 88 | return {"alive": True} |
| 92 | self.writer.write(c.encode('utf-8')) | ||
| 93 | 89 | ||
| 94 | async def read_message(self): | ||
| 95 | l = await self.reader.readline() | ||
| 96 | if not l: | ||
| 97 | return None | ||
| 98 | 90 | ||
| 99 | try: | 91 | class StreamServer(object): |
| 100 | message = l.decode('utf-8') | 92 | def __init__(self, handler, logger): |
| 93 | self.handler = handler | ||
| 94 | self.logger = logger | ||
| 95 | self.closed = False | ||
| 101 | 96 | ||
| 102 | if not message.endswith('\n'): | 97 | async def handle_stream_client(self, reader, writer): |
| 103 | return None | 98 | # writer.transport.set_write_buffer_limits(0) |
| 99 | socket = StreamConnection(reader, writer, -1) | ||
| 100 | if self.closed: | ||
| 101 | await socket.close() | ||
| 102 | return | ||
| 103 | |||
| 104 | await self.handler(socket) | ||
| 105 | |||
| 106 | async def stop(self): | ||
| 107 | self.closed = True | ||
| 108 | |||
| 109 | |||
| 110 | class TCPStreamServer(StreamServer): | ||
| 111 | def __init__(self, host, port, handler, logger): | ||
| 112 | super().__init__(handler, logger) | ||
| 113 | self.host = host | ||
| 114 | self.port = port | ||
| 115 | |||
| 116 | def start(self, loop): | ||
| 117 | self.server = loop.run_until_complete( | ||
| 118 | asyncio.start_server(self.handle_stream_client, self.host, self.port) | ||
| 119 | ) | ||
| 120 | |||
| 121 | for s in self.server.sockets: | ||
| 122 | self.logger.debug("Listening on %r" % (s.getsockname(),)) | ||
| 123 | # Newer python does this automatically. Do it manually here for | ||
| 124 | # maximum compatibility | ||
| 125 | s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | ||
| 126 | s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) | ||
| 127 | |||
| 128 | # Enable keep alives. This prevents broken client connections | ||
| 129 | # from persisting on the server for long periods of time. | ||
| 130 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
| 131 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
| 132 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
| 133 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
| 134 | |||
| 135 | name = self.server.sockets[0].getsockname() | ||
| 136 | if self.server.sockets[0].family == socket.AF_INET6: | ||
| 137 | self.address = "[%s]:%d" % (name[0], name[1]) | ||
| 138 | else: | ||
| 139 | self.address = "%s:%d" % (name[0], name[1]) | ||
| 140 | |||
| 141 | return [self.server.wait_closed()] | ||
| 142 | |||
| 143 | async def stop(self): | ||
| 144 | await super().stop() | ||
| 145 | self.server.close() | ||
| 146 | |||
| 147 | def cleanup(self): | ||
| 148 | pass | ||
| 104 | 149 | ||
| 105 | return json.loads(message) | ||
| 106 | except (json.JSONDecodeError, UnicodeDecodeError) as e: | ||
| 107 | self.logger.error('Bad message from client: %r' % message) | ||
| 108 | raise e | ||
| 109 | 150 | ||
| 110 | async def handle_chunk(self, request): | 151 | class UnixStreamServer(StreamServer): |
| 111 | lines = [] | 152 | def __init__(self, path, handler, logger): |
| 112 | try: | 153 | super().__init__(handler, logger) |
| 113 | while True: | 154 | self.path = path |
| 114 | l = await self.reader.readline() | ||
| 115 | l = l.rstrip(b"\n").decode("utf-8") | ||
| 116 | if not l: | ||
| 117 | break | ||
| 118 | lines.append(l) | ||
| 119 | 155 | ||
| 120 | msg = json.loads(''.join(lines)) | 156 | def start(self, loop): |
| 121 | except (json.JSONDecodeError, UnicodeDecodeError) as e: | 157 | cwd = os.getcwd() |
| 122 | self.logger.error('Bad message from client: %r' % lines) | 158 | try: |
| 123 | raise e | 159 | # Work around path length limits in AF_UNIX |
| 160 | os.chdir(os.path.dirname(self.path)) | ||
| 161 | self.server = loop.run_until_complete( | ||
| 162 | asyncio.start_unix_server( | ||
| 163 | self.handle_stream_client, os.path.basename(self.path) | ||
| 164 | ) | ||
| 165 | ) | ||
| 166 | finally: | ||
| 167 | os.chdir(cwd) | ||
| 124 | 168 | ||
| 125 | if 'chunk-stream' in msg: | 169 | self.logger.debug("Listening on %r" % self.path) |
| 126 | raise ClientError("Nested chunks are not allowed") | 170 | self.address = "unix://%s" % os.path.abspath(self.path) |
| 171 | return [self.server.wait_closed()] | ||
| 127 | 172 | ||
| 128 | await self.dispatch_message(msg) | 173 | async def stop(self): |
| 174 | await super().stop() | ||
| 175 | self.server.close() | ||
| 129 | 176 | ||
| 130 | async def handle_ping(self, request): | 177 | def cleanup(self): |
| 131 | response = {'alive': True} | 178 | os.unlink(self.path) |
| 132 | self.write_message(response) | ||
| 133 | 179 | ||
| 134 | 180 | ||
| 135 | class AsyncServer(object): | 181 | class AsyncServer(object): |
| 136 | def __init__(self, logger): | 182 | def __init__(self, logger): |
| 137 | self._cleanup_socket = None | ||
| 138 | self.logger = logger | 183 | self.logger = logger |
| 139 | self.start = None | ||
| 140 | self.address = None | ||
| 141 | self.loop = None | 184 | self.loop = None |
| 185 | self.run_tasks = [] | ||
| 142 | 186 | ||
| 143 | def start_tcp_server(self, host, port): | 187 | def start_tcp_server(self, host, port): |
| 144 | def start_tcp(): | 188 | self.server = TCPStreamServer(host, port, self._client_handler, self.logger) |
| 145 | self.server = self.loop.run_until_complete( | ||
| 146 | asyncio.start_server(self.handle_client, host, port) | ||
| 147 | ) | ||
| 148 | |||
| 149 | for s in self.server.sockets: | ||
| 150 | self.logger.debug('Listening on %r' % (s.getsockname(),)) | ||
| 151 | # Newer python does this automatically. Do it manually here for | ||
| 152 | # maximum compatibility | ||
| 153 | s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | ||
| 154 | s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) | ||
| 155 | |||
| 156 | # Enable keep alives. This prevents broken client connections | ||
| 157 | # from persisting on the server for long periods of time. | ||
| 158 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
| 159 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
| 160 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
| 161 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
| 162 | |||
| 163 | name = self.server.sockets[0].getsockname() | ||
| 164 | if self.server.sockets[0].family == socket.AF_INET6: | ||
| 165 | self.address = "[%s]:%d" % (name[0], name[1]) | ||
| 166 | else: | ||
| 167 | self.address = "%s:%d" % (name[0], name[1]) | ||
| 168 | |||
| 169 | self.start = start_tcp | ||
| 170 | 189 | ||
| 171 | def start_unix_server(self, path): | 190 | def start_unix_server(self, path): |
| 172 | def cleanup(): | 191 | self.server = UnixStreamServer(path, self._client_handler, self.logger) |
| 173 | os.unlink(path) | ||
| 174 | |||
| 175 | def start_unix(): | ||
| 176 | cwd = os.getcwd() | ||
| 177 | try: | ||
| 178 | # Work around path length limits in AF_UNIX | ||
| 179 | os.chdir(os.path.dirname(path)) | ||
| 180 | self.server = self.loop.run_until_complete( | ||
| 181 | asyncio.start_unix_server(self.handle_client, os.path.basename(path)) | ||
| 182 | ) | ||
| 183 | finally: | ||
| 184 | os.chdir(cwd) | ||
| 185 | |||
| 186 | self.logger.debug('Listening on %r' % path) | ||
| 187 | 192 | ||
| 188 | self._cleanup_socket = cleanup | 193 | async def _client_handler(self, socket): |
| 189 | self.address = "unix://%s" % os.path.abspath(path) | ||
| 190 | |||
| 191 | self.start = start_unix | ||
| 192 | |||
| 193 | @abc.abstractmethod | ||
| 194 | def accept_client(self, reader, writer): | ||
| 195 | pass | ||
| 196 | |||
| 197 | async def handle_client(self, reader, writer): | ||
| 198 | # writer.transport.set_write_buffer_limits(0) | ||
| 199 | try: | 194 | try: |
| 200 | client = self.accept_client(reader, writer) | 195 | client = self.accept_client(socket) |
| 201 | await client.process_requests() | 196 | await client.process_requests() |
| 202 | except Exception as e: | 197 | except Exception as e: |
| 203 | import traceback | 198 | import traceback |
| 204 | self.logger.error('Error from client: %s' % str(e), exc_info=True) | 199 | |
| 200 | self.logger.error("Error from client: %s" % str(e), exc_info=True) | ||
| 205 | traceback.print_exc() | 201 | traceback.print_exc() |
| 206 | writer.close() | 202 | await socket.close() |
| 207 | self.logger.debug('Client disconnected') | 203 | self.logger.debug("Client disconnected") |
| 208 | 204 | ||
| 209 | def run_loop_forever(self): | 205 | @abc.abstractmethod |
| 210 | try: | 206 | def accept_client(self, socket): |
| 211 | self.loop.run_forever() | 207 | pass |
| 212 | except KeyboardInterrupt: | 208 | |
| 213 | pass | 209 | async def stop(self): |
| 210 | self.logger.debug("Stopping server") | ||
| 211 | await self.server.stop() | ||
| 212 | |||
| 213 | def start(self): | ||
| 214 | tasks = self.server.start(self.loop) | ||
| 215 | self.address = self.server.address | ||
| 216 | return tasks | ||
| 214 | 217 | ||
| 215 | def signal_handler(self): | 218 | def signal_handler(self): |
| 216 | self.logger.debug("Got exit signal") | 219 | self.logger.debug("Got exit signal") |
| 217 | self.loop.stop() | 220 | self.loop.create_task(self.stop()) |
| 218 | 221 | ||
| 219 | def _serve_forever(self): | 222 | def _serve_forever(self, tasks): |
| 220 | try: | 223 | try: |
| 221 | self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) | 224 | self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) |
| 225 | self.loop.add_signal_handler(signal.SIGINT, self.signal_handler) | ||
| 226 | self.loop.add_signal_handler(signal.SIGQUIT, self.signal_handler) | ||
| 222 | signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) | 227 | signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) |
| 223 | 228 | ||
| 224 | self.run_loop_forever() | 229 | self.loop.run_until_complete(asyncio.gather(*tasks)) |
| 225 | self.server.close() | ||
| 226 | 230 | ||
| 227 | self.loop.run_until_complete(self.server.wait_closed()) | 231 | self.logger.debug("Server shutting down") |
| 228 | self.logger.debug('Server shutting down') | ||
| 229 | finally: | 232 | finally: |
| 230 | if self._cleanup_socket is not None: | 233 | self.server.cleanup() |
| 231 | self._cleanup_socket() | ||
| 232 | 234 | ||
| 233 | def serve_forever(self): | 235 | def serve_forever(self): |
| 234 | """ | 236 | """ |
| 235 | Serve requests in the current process | 237 | Serve requests in the current process |
| 236 | """ | 238 | """ |
| 239 | self._create_loop() | ||
| 240 | tasks = self.start() | ||
| 241 | self._serve_forever(tasks) | ||
| 242 | self.loop.close() | ||
| 243 | |||
| 244 | def _create_loop(self): | ||
| 237 | # Create loop and override any loop that may have existed in | 245 | # Create loop and override any loop that may have existed in |
| 238 | # a parent process. It is possible that the usecases of | 246 | # a parent process. It is possible that the usecases of |
| 239 | # serve_forever might be constrained enough to allow using | 247 | # serve_forever might be constrained enough to allow using |
| 240 | # get_event_loop here, but better safe than sorry for now. | 248 | # get_event_loop here, but better safe than sorry for now. |
| 241 | self.loop = asyncio.new_event_loop() | 249 | self.loop = asyncio.new_event_loop() |
| 242 | asyncio.set_event_loop(self.loop) | 250 | asyncio.set_event_loop(self.loop) |
| 243 | self.start() | ||
| 244 | self._serve_forever() | ||
| 245 | 251 | ||
| 246 | def serve_as_process(self, *, prefunc=None, args=()): | 252 | def serve_as_process(self, *, prefunc=None, args=()): |
| 247 | """ | 253 | """ |
| 248 | Serve requests in a child process | 254 | Serve requests in a child process |
| 249 | """ | 255 | """ |
| 256 | |||
| 250 | def run(queue): | 257 | def run(queue): |
| 251 | # Create loop and override any loop that may have existed | 258 | # Create loop and override any loop that may have existed |
| 252 | # in a parent process. Without doing this and instead | 259 | # in a parent process. Without doing this and instead |
| @@ -259,18 +266,19 @@ class AsyncServer(object): | |||
| 259 | # more general, though, as any potential use of asyncio in | 266 | # more general, though, as any potential use of asyncio in |
| 260 | # Cooker could create a loop that needs to replaced in this | 267 | # Cooker could create a loop that needs to replaced in this |
| 261 | # new process. | 268 | # new process. |
| 262 | self.loop = asyncio.new_event_loop() | 269 | self._create_loop() |
| 263 | asyncio.set_event_loop(self.loop) | ||
| 264 | try: | 270 | try: |
| 265 | self.start() | 271 | self.address = None |
| 272 | tasks = self.start() | ||
| 266 | finally: | 273 | finally: |
| 274 | # Always put the server address to wake up the parent task | ||
| 267 | queue.put(self.address) | 275 | queue.put(self.address) |
| 268 | queue.close() | 276 | queue.close() |
| 269 | 277 | ||
| 270 | if prefunc is not None: | 278 | if prefunc is not None: |
| 271 | prefunc(self, *args) | 279 | prefunc(self, *args) |
| 272 | 280 | ||
| 273 | self._serve_forever() | 281 | self._serve_forever(tasks) |
| 274 | 282 | ||
| 275 | if sys.version_info >= (3, 6): | 283 | if sys.version_info >= (3, 6): |
| 276 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | 284 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 9cb3fd57a5..3a4018353f 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
| @@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://" | |||
| 15 | ADDR_TYPE_UNIX = 0 | 15 | ADDR_TYPE_UNIX = 0 |
| 16 | ADDR_TYPE_TCP = 1 | 16 | ADDR_TYPE_TCP = 1 |
| 17 | 17 | ||
| 18 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
| 19 | # maximum chunk size. It would be better if the client and server reported to | ||
| 20 | # each other what the maximum chunk sizes were, but that will slow down the | ||
| 21 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
| 22 | # is necessary | ||
| 23 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
| 24 | |||
| 25 | UNIHASH_TABLE_DEFINITION = ( | 18 | UNIHASH_TABLE_DEFINITION = ( |
| 26 | ("method", "TEXT NOT NULL", "UNIQUE"), | 19 | ("method", "TEXT NOT NULL", "UNIQUE"), |
| 27 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | 20 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
| @@ -102,20 +95,6 @@ def parse_address(addr): | |||
| 102 | return (ADDR_TYPE_TCP, (host, int(port))) | 95 | return (ADDR_TYPE_TCP, (host, int(port))) |
| 103 | 96 | ||
| 104 | 97 | ||
| 105 | def chunkify(msg, max_chunk): | ||
| 106 | if len(msg) < max_chunk - 1: | ||
| 107 | yield ''.join((msg, "\n")) | ||
| 108 | else: | ||
| 109 | yield ''.join((json.dumps({ | ||
| 110 | 'chunk-stream': None | ||
| 111 | }), "\n")) | ||
| 112 | |||
| 113 | args = [iter(msg)] * (max_chunk - 1) | ||
| 114 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | ||
| 115 | yield ''.join(itertools.chain(m, "\n")) | ||
| 116 | yield "\n" | ||
| 117 | |||
| 118 | |||
| 119 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 98 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): |
| 120 | from . import server | 99 | from . import server |
| 121 | db = setup_database(dbname, sync=sync) | 100 | db = setup_database(dbname, sync=sync) |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index f676d267fa..5f7d22ab13 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 28 | 28 | ||
| 29 | async def send_stream(self, msg): | 29 | async def send_stream(self, msg): |
| 30 | async def proc(): | 30 | async def proc(): |
| 31 | self.writer.write(("%s\n" % msg).encode("utf-8")) | 31 | await self.socket.send(msg) |
| 32 | await self.writer.drain() | 32 | return await self.socket.recv() |
| 33 | l = await self.reader.readline() | ||
| 34 | if not l: | ||
| 35 | raise ConnectionError("Connection closed") | ||
| 36 | return l.decode("utf-8").rstrip() | ||
| 37 | 33 | ||
| 38 | return await self._send_wrapper(proc) | 34 | return await self._send_wrapper(proc) |
| 39 | 35 | ||
| 40 | async def _set_mode(self, new_mode): | 36 | async def _set_mode(self, new_mode): |
| 37 | async def stream_to_normal(): | ||
| 38 | await self.socket.send("END") | ||
| 39 | return await self.socket.recv() | ||
| 40 | |||
| 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: |
| 42 | r = await self.send_stream("END") | 42 | r = await self._send_wrapper(stream_to_normal) |
| 43 | if r != "ok": | 43 | if r != "ok": |
| 44 | raise ConnectionError("Bad response from server %r" % r) | 44 | raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) |
| 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: |
| 46 | r = await self.send_message({"get-stream": None}) | 46 | r = await self.invoke({"get-stream": None}) |
| 47 | if r != "ok": | 47 | if r != "ok": |
| 48 | raise ConnectionError("Bad response from server %r" % r) | 48 | raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) |
| 49 | elif new_mode != self.mode: | 49 | elif new_mode != self.mode: |
| 50 | raise Exception( | 50 | raise Exception( |
| 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) |
| @@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 67 | m["method"] = method | 67 | m["method"] = method |
| 68 | m["outhash"] = outhash | 68 | m["outhash"] = outhash |
| 69 | m["unihash"] = unihash | 69 | m["unihash"] = unihash |
| 70 | return await self.send_message({"report": m}) | 70 | return await self.invoke({"report": m}) |
| 71 | 71 | ||
| 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
| 73 | await self._set_mode(self.MODE_NORMAL) | 73 | await self._set_mode(self.MODE_NORMAL) |
| @@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
| 75 | m["taskhash"] = taskhash | 75 | m["taskhash"] = taskhash |
| 76 | m["method"] = method | 76 | m["method"] = method |
| 77 | m["unihash"] = unihash | 77 | m["unihash"] = unihash |
| 78 | return await self.send_message({"report-equiv": m}) | 78 | return await self.invoke({"report-equiv": m}) |
| 79 | 79 | ||
| 80 | async def get_taskhash(self, method, taskhash, all_properties=False): | 80 | async def get_taskhash(self, method, taskhash, all_properties=False): |
| 81 | await self._set_mode(self.MODE_NORMAL) | 81 | await self._set_mode(self.MODE_NORMAL) |
| 82 | return await self.send_message( | 82 | return await self.invoke( |
| 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
| 84 | ) | 84 | ) |
| 85 | 85 | ||
| 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
| 87 | await self._set_mode(self.MODE_NORMAL) | 87 | await self._set_mode(self.MODE_NORMAL) |
| 88 | return await self.send_message( | 88 | return await self.invoke( |
| 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} | 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} |
| 90 | ) | 90 | ) |
| 91 | 91 | ||
| 92 | async def get_stats(self): | 92 | async def get_stats(self): |
| 93 | await self._set_mode(self.MODE_NORMAL) | 93 | await self._set_mode(self.MODE_NORMAL) |
| 94 | return await self.send_message({"get-stats": None}) | 94 | return await self.invoke({"get-stats": None}) |
| 95 | 95 | ||
| 96 | async def reset_stats(self): | 96 | async def reset_stats(self): |
| 97 | await self._set_mode(self.MODE_NORMAL) | 97 | await self._set_mode(self.MODE_NORMAL) |
| 98 | return await self.send_message({"reset-stats": None}) | 98 | return await self.invoke({"reset-stats": None}) |
| 99 | 99 | ||
| 100 | async def backfill_wait(self): | 100 | async def backfill_wait(self): |
| 101 | await self._set_mode(self.MODE_NORMAL) | 101 | await self._set_mode(self.MODE_NORMAL) |
| 102 | return (await self.send_message({"backfill-wait": None}))["tasks"] | 102 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
| 103 | 103 | ||
| 104 | async def remove(self, where): | 104 | async def remove(self, where): |
| 105 | await self._set_mode(self.MODE_NORMAL) | 105 | await self._set_mode(self.MODE_NORMAL) |
| 106 | return await self.send_message({"remove": {"where": where}}) | 106 | return await self.invoke({"remove": {"where": where}}) |
| 107 | 107 | ||
| 108 | async def clean_unused(self, max_age): | 108 | async def clean_unused(self, max_age): |
| 109 | await self._set_mode(self.MODE_NORMAL) | 109 | await self._set_mode(self.MODE_NORMAL) |
| 110 | return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) | 110 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) |
| 111 | 111 | ||
| 112 | 112 | ||
| 113 | class Client(bb.asyncrpc.Client): | 113 | class Client(bb.asyncrpc.Client): |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 45bf476bfe..13b754805b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
| @@ -165,8 +165,8 @@ class ServerCursor(object): | |||
| 165 | 165 | ||
| 166 | 166 | ||
| 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
| 168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): |
| 169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(socket, 'OEHASHEQUIV', logger) |
| 170 | self.db = db | 170 | self.db = db |
| 171 | self.request_stats = request_stats | 171 | self.request_stats = request_stats |
| 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
| @@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 209 | if k in msg: | 209 | if k in msg: |
| 210 | logger.debug('Handling %s' % k) | 210 | logger.debug('Handling %s' % k) |
| 211 | if 'stream' in k: | 211 | if 'stream' in k: |
| 212 | await self.handlers[k](msg[k]) | 212 | return await self.handlers[k](msg[k]) |
| 213 | else: | 213 | else: |
| 214 | with self.request_stats.start_sample() as self.request_sample, \ | 214 | with self.request_stats.start_sample() as self.request_sample, \ |
| 215 | self.request_sample.measure(): | 215 | self.request_sample.measure(): |
| 216 | await self.handlers[k](msg[k]) | 216 | return await self.handlers[k](msg[k]) |
| 217 | return | ||
| 218 | 217 | ||
| 219 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
| 220 | 219 | ||
| @@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 224 | fetch_all = request.get('all', False) | 223 | fetch_all = request.get('all', False) |
| 225 | 224 | ||
| 226 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
| 227 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) | 226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) |
| 228 | |||
| 229 | self.write_message(d) | ||
| 230 | 227 | ||
| 231 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
| 232 | d = None | 229 | d = None |
| @@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 274 | with_unihash = request.get("with_unihash", True) | 271 | with_unihash = request.get("with_unihash", True) |
| 275 | 272 | ||
| 276 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
| 277 | d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | 274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) |
| 278 | |||
| 279 | self.write_message(d) | ||
| 280 | 275 | ||
| 281 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): |
| 282 | d = None | 277 | d = None |
| @@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 334 | ) | 329 | ) |
| 335 | 330 | ||
| 336 | async def handle_get_stream(self, request): | 331 | async def handle_get_stream(self, request): |
| 337 | self.write_message('ok') | 332 | await self.socket.send_message("ok") |
| 338 | 333 | ||
| 339 | while True: | 334 | while True: |
| 340 | upstream = None | 335 | upstream = None |
| 341 | 336 | ||
| 342 | l = await self.reader.readline() | 337 | l = await self.socket.recv() |
| 343 | if not l: | 338 | if not l: |
| 344 | return | 339 | break |
| 345 | 340 | ||
| 346 | try: | 341 | try: |
| 347 | # This inner loop is very sensitive and must be as fast as | 342 | # This inner loop is very sensitive and must be as fast as |
| @@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 352 | request_measure = self.request_sample.measure() | 347 | request_measure = self.request_sample.measure() |
| 353 | request_measure.start() | 348 | request_measure.start() |
| 354 | 349 | ||
| 355 | l = l.decode('utf-8').rstrip() | ||
| 356 | if l == 'END': | 350 | if l == 'END': |
| 357 | self.writer.write('ok\n'.encode('utf-8')) | 351 | break |
| 358 | return | ||
| 359 | 352 | ||
| 360 | (method, taskhash) = l.split() | 353 | (method, taskhash) = l.split() |
| 361 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 354 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
| @@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 366 | cursor.close() | 359 | cursor.close() |
| 367 | 360 | ||
| 368 | if row is not None: | 361 | if row is not None: |
| 369 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 362 | msg = row['unihash'] |
| 370 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 363 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
| 371 | elif self.upstream_client is not None: | 364 | elif self.upstream_client is not None: |
| 372 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 365 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
| 373 | if upstream: | 366 | if upstream: |
| 374 | msg = ("%s\n" % upstream).encode("utf-8") | 367 | msg = upstream |
| 375 | else: | 368 | else: |
| 376 | msg = "\n".encode("utf-8") | 369 | msg = "" |
| 377 | else: | 370 | else: |
| 378 | msg = '\n'.encode('utf-8') | 371 | msg = "" |
| 379 | 372 | ||
| 380 | self.writer.write(msg) | 373 | await self.socket.send(msg) |
| 381 | finally: | 374 | finally: |
| 382 | request_measure.end() | 375 | request_measure.end() |
| 383 | self.request_sample.end() | 376 | self.request_sample.end() |
| 384 | 377 | ||
| 385 | await self.writer.drain() | ||
| 386 | |||
| 387 | # Post to the backfill queue after writing the result to minimize | 378 | # Post to the backfill queue after writing the result to minimize |
| 388 | # the turn around time on a request | 379 | # the turn around time on a request |
| 389 | if upstream is not None: | 380 | if upstream is not None: |
| 390 | await self.backfill_queue.put((method, taskhash)) | 381 | await self.backfill_queue.put((method, taskhash)) |
| 391 | 382 | ||
| 383 | await self.socket.send("ok") | ||
| 384 | return self.NO_RESPONSE | ||
| 385 | |||
| 392 | async def handle_report(self, data): | 386 | async def handle_report(self, data): |
| 393 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
| 394 | outhash_data = { | 388 | outhash_data = { |
| @@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 468 | 'unihash': unihash, | 462 | 'unihash': unihash, |
| 469 | } | 463 | } |
| 470 | 464 | ||
| 471 | self.write_message(d) | 465 | return d |
| 472 | 466 | ||
| 473 | async def handle_equivreport(self, data): | 467 | async def handle_equivreport(self, data): |
| 474 | with closing(self.db.cursor()) as cursor: | 468 | with closing(self.db.cursor()) as cursor: |
| @@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 491 | 485 | ||
| 492 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | 486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} |
| 493 | 487 | ||
| 494 | self.write_message(d) | 488 | return d |
| 495 | 489 | ||
| 496 | 490 | ||
| 497 | async def handle_get_stats(self, request): | 491 | async def handle_get_stats(self, request): |
| 498 | d = { | 492 | return { |
| 499 | 'requests': self.request_stats.todict(), | 493 | 'requests': self.request_stats.todict(), |
| 500 | } | 494 | } |
| 501 | 495 | ||
| 502 | self.write_message(d) | ||
| 503 | |||
| 504 | async def handle_reset_stats(self, request): | 496 | async def handle_reset_stats(self, request): |
| 505 | d = { | 497 | d = { |
| 506 | 'requests': self.request_stats.todict(), | 498 | 'requests': self.request_stats.todict(), |
| 507 | } | 499 | } |
| 508 | 500 | ||
| 509 | self.request_stats.reset() | 501 | self.request_stats.reset() |
| 510 | self.write_message(d) | 502 | return d |
| 511 | 503 | ||
| 512 | async def handle_backfill_wait(self, request): | 504 | async def handle_backfill_wait(self, request): |
| 513 | d = { | 505 | d = { |
| 514 | 'tasks': self.backfill_queue.qsize(), | 506 | 'tasks': self.backfill_queue.qsize(), |
| 515 | } | 507 | } |
| 516 | await self.backfill_queue.join() | 508 | await self.backfill_queue.join() |
| 517 | self.write_message(d) | 509 | return d |
| 518 | 510 | ||
| 519 | async def handle_remove(self, request): | 511 | async def handle_remove(self, request): |
| 520 | condition = request["where"] | 512 | condition = request["where"] |
| @@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 541 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) |
| 542 | self.db.commit() | 534 | self.db.commit() |
| 543 | 535 | ||
| 544 | self.write_message({"count": count}) | 536 | return {"count": count} |
| 545 | 537 | ||
| 546 | async def handle_clean_unused(self, request): | 538 | async def handle_clean_unused(self, request): |
| 547 | max_age = request["max_age_seconds"] | 539 | max_age = request["max_age_seconds"] |
| @@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 558 | ) | 550 | ) |
| 559 | count = cursor.rowcount | 551 | count = cursor.rowcount |
| 560 | 552 | ||
| 561 | self.write_message({"count": count}) | 553 | return {"count": count} |
| 562 | 554 | ||
| 563 | def query_equivalent(self, cursor, method, taskhash): | 555 | def query_equivalent(self, cursor, method, taskhash): |
| 564 | # This is part of the inner loop and must be as fast as possible | 556 | # This is part of the inner loop and must be as fast as possible |
| @@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer): | |||
| 583 | self.db = db | 575 | self.db = db |
| 584 | self.upstream = upstream | 576 | self.upstream = upstream |
| 585 | self.read_only = read_only | 577 | self.read_only = read_only |
| 578 | self.backfill_queue = None | ||
| 586 | 579 | ||
| 587 | def accept_client(self, reader, writer): | 580 | def accept_client(self, socket): |
| 588 | return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) |
| 589 | 582 | ||
| 590 | @contextmanager | 583 | async def backfill_worker_task(self): |
| 591 | def _backfill_worker(self): | 584 | client = await create_async_client(self.upstream) |
| 592 | async def backfill_worker_task(): | 585 | try: |
| 593 | client = await create_async_client(self.upstream) | 586 | while True: |
| 594 | try: | 587 | item = await self.backfill_queue.get() |
| 595 | while True: | 588 | if item is None: |
| 596 | item = await self.backfill_queue.get() | ||
| 597 | if item is None: | ||
| 598 | self.backfill_queue.task_done() | ||
| 599 | break | ||
| 600 | method, taskhash = item | ||
| 601 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
| 602 | self.backfill_queue.task_done() | 589 | self.backfill_queue.task_done() |
| 603 | finally: | 590 | break |
| 604 | await client.close() | 591 | method, taskhash = item |
| 592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
| 593 | self.backfill_queue.task_done() | ||
| 594 | finally: | ||
| 595 | await client.close() | ||
| 605 | 596 | ||
| 606 | async def join_worker(worker): | 597 | def start(self): |
| 598 | tasks = super().start() | ||
| 599 | if self.upstream: | ||
| 600 | self.backfill_queue = asyncio.Queue() | ||
| 601 | tasks += [self.backfill_worker_task()] | ||
| 602 | return tasks | ||
| 603 | |||
| 604 | async def stop(self): | ||
| 605 | if self.backfill_queue is not None: | ||
| 607 | await self.backfill_queue.put(None) | 606 | await self.backfill_queue.put(None) |
| 608 | await worker | 607 | await super().stop() |
| 609 | |||
| 610 | if self.upstream is not None: | ||
| 611 | worker = asyncio.ensure_future(backfill_worker_task()) | ||
| 612 | try: | ||
| 613 | yield | ||
| 614 | finally: | ||
| 615 | self.loop.run_until_complete(join_worker(worker)) | ||
| 616 | else: | ||
| 617 | yield | ||
| 618 | |||
| 619 | def run_loop_forever(self): | ||
| 620 | self.backfill_queue = asyncio.Queue() | ||
| 621 | |||
| 622 | with self._backfill_worker(): | ||
| 623 | super().run_loop_forever() | ||
diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py index 69ab7a4ac9..6b81356fac 100644 --- a/bitbake/lib/prserv/client.py +++ b/bitbake/lib/prserv/client.py | |||
| @@ -14,28 +14,28 @@ class PRAsyncClient(bb.asyncrpc.AsyncClient): | |||
| 14 | super().__init__('PRSERVICE', '1.0', logger) | 14 | super().__init__('PRSERVICE', '1.0', logger) |
| 15 | 15 | ||
| 16 | async def getPR(self, version, pkgarch, checksum): | 16 | async def getPR(self, version, pkgarch, checksum): |
| 17 | response = await self.send_message( | 17 | response = await self.invoke( |
| 18 | {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} | 18 | {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} |
| 19 | ) | 19 | ) |
| 20 | if response: | 20 | if response: |
| 21 | return response['value'] | 21 | return response['value'] |
| 22 | 22 | ||
| 23 | async def importone(self, version, pkgarch, checksum, value): | 23 | async def importone(self, version, pkgarch, checksum, value): |
| 24 | response = await self.send_message( | 24 | response = await self.invoke( |
| 25 | {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} | 25 | {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} |
| 26 | ) | 26 | ) |
| 27 | if response: | 27 | if response: |
| 28 | return response['value'] | 28 | return response['value'] |
| 29 | 29 | ||
| 30 | async def export(self, version, pkgarch, checksum, colinfo): | 30 | async def export(self, version, pkgarch, checksum, colinfo): |
| 31 | response = await self.send_message( | 31 | response = await self.invoke( |
| 32 | {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} | 32 | {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} |
| 33 | ) | 33 | ) |
| 34 | if response: | 34 | if response: |
| 35 | return (response['metainfo'], response['datainfo']) | 35 | return (response['metainfo'], response['datainfo']) |
| 36 | 36 | ||
| 37 | async def is_readonly(self): | 37 | async def is_readonly(self): |
| 38 | response = await self.send_message( | 38 | response = await self.invoke( |
| 39 | {'is-readonly': {}} | 39 | {'is-readonly': {}} |
| 40 | ) | 40 | ) |
| 41 | if response: | 41 | if response: |
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index c686b2065c..ea7933164b 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py | |||
| @@ -20,8 +20,8 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid" | |||
| 20 | singleton = None | 20 | singleton = None |
| 21 | 21 | ||
| 22 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): | 22 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): |
| 23 | def __init__(self, reader, writer, table, read_only): | 23 | def __init__(self, socket, table, read_only): |
| 24 | super().__init__(reader, writer, 'PRSERVICE', logger) | 24 | super().__init__(socket, 'PRSERVICE', logger) |
| 25 | self.handlers.update({ | 25 | self.handlers.update({ |
| 26 | 'get-pr': self.handle_get_pr, | 26 | 'get-pr': self.handle_get_pr, |
| 27 | 'import-one': self.handle_import_one, | 27 | 'import-one': self.handle_import_one, |
| @@ -36,12 +36,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 36 | 36 | ||
| 37 | async def dispatch_message(self, msg): | 37 | async def dispatch_message(self, msg): |
| 38 | try: | 38 | try: |
| 39 | await super().dispatch_message(msg) | 39 | return await super().dispatch_message(msg) |
| 40 | except: | 40 | except: |
| 41 | self.table.sync() | 41 | self.table.sync() |
| 42 | raise | 42 | raise |
| 43 | 43 | else: | |
| 44 | self.table.sync_if_dirty() | 44 | self.table.sync_if_dirty() |
| 45 | 45 | ||
| 46 | async def handle_get_pr(self, request): | 46 | async def handle_get_pr(self, request): |
| 47 | version = request['version'] | 47 | version = request['version'] |
| @@ -57,7 +57,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 57 | except sqlite3.Error as exc: | 57 | except sqlite3.Error as exc: |
| 58 | logger.error(str(exc)) | 58 | logger.error(str(exc)) |
| 59 | 59 | ||
| 60 | self.write_message(response) | 60 | return response |
| 61 | 61 | ||
| 62 | async def handle_import_one(self, request): | 62 | async def handle_import_one(self, request): |
| 63 | response = None | 63 | response = None |
| @@ -71,7 +71,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 71 | if value is not None: | 71 | if value is not None: |
| 72 | response = {'value': value} | 72 | response = {'value': value} |
| 73 | 73 | ||
| 74 | self.write_message(response) | 74 | return response |
| 75 | 75 | ||
| 76 | async def handle_export(self, request): | 76 | async def handle_export(self, request): |
| 77 | version = request['version'] | 77 | version = request['version'] |
| @@ -85,12 +85,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
| 85 | logger.error(str(exc)) | 85 | logger.error(str(exc)) |
| 86 | metainfo = datainfo = None | 86 | metainfo = datainfo = None |
| 87 | 87 | ||
| 88 | response = {'metainfo': metainfo, 'datainfo': datainfo} | 88 | return {'metainfo': metainfo, 'datainfo': datainfo} |
| 89 | self.write_message(response) | ||
| 90 | 89 | ||
| 91 | async def handle_is_readonly(self, request): | 90 | async def handle_is_readonly(self, request): |
| 92 | response = {'readonly': self.read_only} | 91 | return {'readonly': self.read_only} |
| 93 | self.write_message(response) | ||
| 94 | 92 | ||
| 95 | class PRServer(bb.asyncrpc.AsyncServer): | 93 | class PRServer(bb.asyncrpc.AsyncServer): |
| 96 | def __init__(self, dbfile, read_only=False): | 94 | def __init__(self, dbfile, read_only=False): |
| @@ -99,20 +97,23 @@ class PRServer(bb.asyncrpc.AsyncServer): | |||
| 99 | self.table = None | 97 | self.table = None |
| 100 | self.read_only = read_only | 98 | self.read_only = read_only |
| 101 | 99 | ||
| 102 | def accept_client(self, reader, writer): | 100 | def accept_client(self, socket): |
| 103 | return PRServerClient(reader, writer, self.table, self.read_only) | 101 | return PRServerClient(socket, self.table, self.read_only) |
| 104 | 102 | ||
| 105 | def _serve_forever(self): | 103 | def start(self): |
| 104 | tasks = super().start() | ||
| 106 | self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) | 105 | self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) |
| 107 | self.table = self.db["PRMAIN"] | 106 | self.table = self.db["PRMAIN"] |
| 108 | 107 | ||
| 109 | logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % | 108 | logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % |
| 110 | (self.dbfile, self.address, str(os.getpid()))) | 109 | (self.dbfile, self.address, str(os.getpid()))) |
| 111 | 110 | ||
| 112 | super()._serve_forever() | 111 | return tasks |
| 113 | 112 | ||
| 113 | async def stop(self): | ||
| 114 | self.table.sync_if_dirty() | 114 | self.table.sync_if_dirty() |
| 115 | self.db.disconnect() | 115 | self.db.disconnect() |
| 116 | await super().stop() | ||
| 116 | 117 | ||
| 117 | def signal_handler(self): | 118 | def signal_handler(self): |
| 118 | super().signal_handler() | 119 | super().signal_handler() |
