diff options
-rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 32 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 78 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 95 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/exceptions.py | 17 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/serv.py | 304 | ||||
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 21 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 38 | ||||
-rw-r--r-- | bitbake/lib/hashserv/server.py | 116 | ||||
-rw-r--r-- | bitbake/lib/prserv/client.py | 8 | ||||
-rw-r--r-- | bitbake/lib/prserv/serv.py | 31 |
10 files changed, 387 insertions, 353 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index 9a85e9965b..9f677eac4c 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
@@ -4,30 +4,12 @@ | |||
4 | # SPDX-License-Identifier: GPL-2.0-only | 4 | # SPDX-License-Identifier: GPL-2.0-only |
5 | # | 5 | # |
6 | 6 | ||
7 | import itertools | ||
8 | import json | ||
9 | |||
10 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
11 | # maximum chunk size. It would be better if the client and server reported to | ||
12 | # each other what the maximum chunk sizes were, but that will slow down the | ||
13 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
14 | # is necessary | ||
15 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
16 | |||
17 | |||
18 | def chunkify(msg, max_chunk): | ||
19 | if len(msg) < max_chunk - 1: | ||
20 | yield ''.join((msg, "\n")) | ||
21 | else: | ||
22 | yield ''.join((json.dumps({ | ||
23 | 'chunk-stream': None | ||
24 | }), "\n")) | ||
25 | |||
26 | args = [iter(msg)] * (max_chunk - 1) | ||
27 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | ||
28 | yield ''.join(itertools.chain(m, "\n")) | ||
29 | yield "\n" | ||
30 | |||
31 | 7 | ||
32 | from .client import AsyncClient, Client | 8 | from .client import AsyncClient, Client |
33 | from .serv import AsyncServer, AsyncServerConnection, ClientError, ServerError | 9 | from .serv import AsyncServer, AsyncServerConnection |
10 | from .connection import DEFAULT_MAX_CHUNK | ||
11 | from .exceptions import ( | ||
12 | ClientError, | ||
13 | ServerError, | ||
14 | ConnectionClosedError, | ||
15 | ) | ||
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87..7f33099b63 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -10,13 +10,13 @@ import json | |||
10 | import os | 10 | import os |
11 | import socket | 11 | import socket |
12 | import sys | 12 | import sys |
13 | from . import chunkify, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK |
14 | from .exceptions import ConnectionClosedError | ||
14 | 15 | ||
15 | 16 | ||
16 | class AsyncClient(object): | 17 | class AsyncClient(object): |
17 | def __init__(self, proto_name, proto_version, logger, timeout=30): | 18 | def __init__(self, proto_name, proto_version, logger, timeout=30): |
18 | self.reader = None | 19 | self.socket = None |
19 | self.writer = None | ||
20 | self.max_chunk = DEFAULT_MAX_CHUNK | 20 | self.max_chunk = DEFAULT_MAX_CHUNK |
21 | self.proto_name = proto_name | 21 | self.proto_name = proto_name |
22 | self.proto_version = proto_version | 22 | self.proto_version = proto_version |
@@ -25,7 +25,8 @@ class AsyncClient(object): | |||
25 | 25 | ||
26 | async def connect_tcp(self, address, port): | 26 | async def connect_tcp(self, address, port): |
27 | async def connect_sock(): | 27 | async def connect_sock(): |
28 | return await asyncio.open_connection(address, port) | 28 | reader, writer = await asyncio.open_connection(address, port) |
29 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
29 | 30 | ||
30 | self._connect_sock = connect_sock | 31 | self._connect_sock = connect_sock |
31 | 32 | ||
@@ -40,27 +41,27 @@ class AsyncClient(object): | |||
40 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) | 41 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) |
41 | sock.connect(os.path.basename(path)) | 42 | sock.connect(os.path.basename(path)) |
42 | finally: | 43 | finally: |
43 | os.chdir(cwd) | 44 | os.chdir(cwd) |
44 | return await asyncio.open_unix_connection(sock=sock) | 45 | reader, writer = await asyncio.open_unix_connection(sock=sock) |
46 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
45 | 47 | ||
46 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
47 | 49 | ||
48 | async def setup_connection(self): | 50 | async def setup_connection(self): |
49 | s = '%s %s\n\n' % (self.proto_name, self.proto_version) | 51 | # Send headers |
50 | self.writer.write(s.encode("utf-8")) | 52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
51 | await self.writer.drain() | 53 | # End of headers |
54 | await self.socket.send("") | ||
52 | 55 | ||
53 | async def connect(self): | 56 | async def connect(self): |
54 | if self.reader is None or self.writer is None: | 57 | if self.socket is None: |
55 | (self.reader, self.writer) = await self._connect_sock() | 58 | self.socket = await self._connect_sock() |
56 | await self.setup_connection() | 59 | await self.setup_connection() |
57 | 60 | ||
58 | async def close(self): | 61 | async def close(self): |
59 | self.reader = None | 62 | if self.socket is not None: |
60 | 63 | await self.socket.close() | |
61 | if self.writer is not None: | 64 | self.socket = None |
62 | self.writer.close() | ||
63 | self.writer = None | ||
64 | 65 | ||
65 | async def _send_wrapper(self, proc): | 66 | async def _send_wrapper(self, proc): |
66 | count = 0 | 67 | count = 0 |
@@ -71,6 +72,7 @@ class AsyncClient(object): | |||
71 | except ( | 72 | except ( |
72 | OSError, | 73 | OSError, |
73 | ConnectionError, | 74 | ConnectionError, |
75 | ConnectionClosedError, | ||
74 | json.JSONDecodeError, | 76 | json.JSONDecodeError, |
75 | UnicodeDecodeError, | 77 | UnicodeDecodeError, |
76 | ) as e: | 78 | ) as e: |
@@ -82,49 +84,15 @@ class AsyncClient(object): | |||
82 | await self.close() | 84 | await self.close() |
83 | count += 1 | 85 | count += 1 |
84 | 86 | ||
85 | async def send_message(self, msg): | 87 | async def invoke(self, msg): |
86 | async def get_line(): | ||
87 | try: | ||
88 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
89 | except asyncio.TimeoutError: | ||
90 | raise ConnectionError("Timed out waiting for server") | ||
91 | |||
92 | if not line: | ||
93 | raise ConnectionError("Connection closed") | ||
94 | |||
95 | line = line.decode("utf-8") | ||
96 | |||
97 | if not line.endswith("\n"): | ||
98 | raise ConnectionError("Bad message %r" % (line)) | ||
99 | |||
100 | return line | ||
101 | |||
102 | async def proc(): | 88 | async def proc(): |
103 | for c in chunkify(json.dumps(msg), self.max_chunk): | 89 | await self.socket.send_message(msg) |
104 | self.writer.write(c.encode("utf-8")) | 90 | return await self.socket.recv_message() |
105 | await self.writer.drain() | ||
106 | |||
107 | l = await get_line() | ||
108 | |||
109 | m = json.loads(l) | ||
110 | if m and "chunk-stream" in m: | ||
111 | lines = [] | ||
112 | while True: | ||
113 | l = (await get_line()).rstrip("\n") | ||
114 | if not l: | ||
115 | break | ||
116 | lines.append(l) | ||
117 | |||
118 | m = json.loads("".join(lines)) | ||
119 | |||
120 | return m | ||
121 | 91 | ||
122 | return await self._send_wrapper(proc) | 92 | return await self._send_wrapper(proc) |
123 | 93 | ||
124 | async def ping(self): | 94 | async def ping(self): |
125 | return await self.send_message( | 95 | return await self.invoke({"ping": {}}) |
126 | {'ping': {}} | ||
127 | ) | ||
128 | 96 | ||
129 | 97 | ||
130 | class Client(object): | 98 | class Client(object): |
@@ -142,7 +110,7 @@ class Client(object): | |||
142 | # required (but harmless) with it. | 110 | # required (but harmless) with it. |
143 | asyncio.set_event_loop(self.loop) | 111 | asyncio.set_event_loop(self.loop) |
144 | 112 | ||
145 | self._add_methods('connect_tcp', 'ping') | 113 | self._add_methods("connect_tcp", "ping") |
146 | 114 | ||
147 | @abc.abstractmethod | 115 | @abc.abstractmethod |
148 | def _get_async_client(self): | 116 | def _get_async_client(self): |
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py new file mode 100644 index 0000000000..c4fd24754c --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
@@ -0,0 +1,95 @@ | |||
1 | # | ||
2 | # Copyright BitBake Contributors | ||
3 | # | ||
4 | # SPDX-License-Identifier: GPL-2.0-only | ||
5 | # | ||
6 | |||
7 | import asyncio | ||
8 | import itertools | ||
9 | import json | ||
10 | from .exceptions import ClientError, ConnectionClosedError | ||
11 | |||
12 | |||
13 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
14 | # maximum chunk size. It would be better if the client and server reported to | ||
15 | # each other what the maximum chunk sizes were, but that will slow down the | ||
16 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
17 | # is necessary | ||
18 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
19 | |||
20 | |||
21 | def chunkify(msg, max_chunk): | ||
22 | if len(msg) < max_chunk - 1: | ||
23 | yield "".join((msg, "\n")) | ||
24 | else: | ||
25 | yield "".join((json.dumps({"chunk-stream": None}), "\n")) | ||
26 | |||
27 | args = [iter(msg)] * (max_chunk - 1) | ||
28 | for m in map("".join, itertools.zip_longest(*args, fillvalue="")): | ||
29 | yield "".join(itertools.chain(m, "\n")) | ||
30 | yield "\n" | ||
31 | |||
32 | |||
33 | class StreamConnection(object): | ||
34 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): | ||
35 | self.reader = reader | ||
36 | self.writer = writer | ||
37 | self.timeout = timeout | ||
38 | self.max_chunk = max_chunk | ||
39 | |||
40 | @property | ||
41 | def address(self): | ||
42 | return self.writer.get_extra_info("peername") | ||
43 | |||
44 | async def send_message(self, msg): | ||
45 | for c in chunkify(json.dumps(msg), self.max_chunk): | ||
46 | self.writer.write(c.encode("utf-8")) | ||
47 | await self.writer.drain() | ||
48 | |||
49 | async def recv_message(self): | ||
50 | l = await self.recv() | ||
51 | |||
52 | m = json.loads(l) | ||
53 | if not m: | ||
54 | return m | ||
55 | |||
56 | if "chunk-stream" in m: | ||
57 | lines = [] | ||
58 | while True: | ||
59 | l = await self.recv() | ||
60 | if not l: | ||
61 | break | ||
62 | lines.append(l) | ||
63 | |||
64 | m = json.loads("".join(lines)) | ||
65 | |||
66 | return m | ||
67 | |||
68 | async def send(self, msg): | ||
69 | self.writer.write(("%s\n" % msg).encode("utf-8")) | ||
70 | await self.writer.drain() | ||
71 | |||
72 | async def recv(self): | ||
73 | if self.timeout < 0: | ||
74 | line = await self.reader.readline() | ||
75 | else: | ||
76 | try: | ||
77 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
78 | except asyncio.TimeoutError: | ||
79 | raise ConnectionError("Timed out waiting for data") | ||
80 | |||
81 | if not line: | ||
82 | raise ConnectionClosedError("Connection closed") | ||
83 | |||
84 | line = line.decode("utf-8") | ||
85 | |||
86 | if not line.endswith("\n"): | ||
87 | raise ConnectionError("Bad message %r" % (line)) | ||
88 | |||
89 | return line.rstrip() | ||
90 | |||
91 | async def close(self): | ||
92 | self.reader = None | ||
93 | if self.writer is not None: | ||
94 | self.writer.close() | ||
95 | self.writer = None | ||
diff --git a/bitbake/lib/bb/asyncrpc/exceptions.py b/bitbake/lib/bb/asyncrpc/exceptions.py new file mode 100644 index 0000000000..a8942b4f0c --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/exceptions.py | |||
@@ -0,0 +1,17 @@ | |||
1 | # | ||
2 | # Copyright BitBake Contributors | ||
3 | # | ||
4 | # SPDX-License-Identifier: GPL-2.0-only | ||
5 | # | ||
6 | |||
7 | |||
8 | class ClientError(Exception): | ||
9 | pass | ||
10 | |||
11 | |||
12 | class ServerError(Exception): | ||
13 | pass | ||
14 | |||
15 | |||
16 | class ConnectionClosedError(Exception): | ||
17 | pass | ||
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index d2de4891b8..3e0d0632cb 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py | |||
@@ -12,241 +12,248 @@ import signal | |||
12 | import socket | 12 | import socket |
13 | import sys | 13 | import sys |
14 | import multiprocessing | 14 | import multiprocessing |
15 | from . import chunkify, DEFAULT_MAX_CHUNK | 15 | from .connection import StreamConnection |
16 | 16 | from .exceptions import ClientError, ServerError, ConnectionClosedError | |
17 | |||
18 | class ClientError(Exception): | ||
19 | pass | ||
20 | |||
21 | |||
22 | class ServerError(Exception): | ||
23 | pass | ||
24 | 17 | ||
25 | 18 | ||
26 | class AsyncServerConnection(object): | 19 | class AsyncServerConnection(object): |
27 | def __init__(self, reader, writer, proto_name, logger): | 20 | # If a handler returns this object (e.g. `return self.NO_RESPONSE`), no |
28 | self.reader = reader | 21 | # return message will be automatically be sent back to the client |
29 | self.writer = writer | 22 | NO_RESPONSE = object() |
23 | |||
24 | def __init__(self, socket, proto_name, logger): | ||
25 | self.socket = socket | ||
30 | self.proto_name = proto_name | 26 | self.proto_name = proto_name |
31 | self.max_chunk = DEFAULT_MAX_CHUNK | ||
32 | self.handlers = { | 27 | self.handlers = { |
33 | 'chunk-stream': self.handle_chunk, | 28 | "ping": self.handle_ping, |
34 | 'ping': self.handle_ping, | ||
35 | } | 29 | } |
36 | self.logger = logger | 30 | self.logger = logger |
37 | 31 | ||
32 | async def close(self): | ||
33 | await self.socket.close() | ||
34 | |||
38 | async def process_requests(self): | 35 | async def process_requests(self): |
39 | try: | 36 | try: |
40 | self.addr = self.writer.get_extra_info('peername') | 37 | self.logger.info("Client %r connected" % (self.socket.address,)) |
41 | self.logger.debug('Client %r connected' % (self.addr,)) | ||
42 | 38 | ||
43 | # Read protocol and version | 39 | # Read protocol and version |
44 | client_protocol = await self.reader.readline() | 40 | client_protocol = await self.socket.recv() |
45 | if not client_protocol: | 41 | if not client_protocol: |
46 | return | 42 | return |
47 | 43 | ||
48 | (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split() | 44 | (client_proto_name, client_proto_version) = client_protocol.split() |
49 | if client_proto_name != self.proto_name: | 45 | if client_proto_name != self.proto_name: |
50 | self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name)) | 46 | self.logger.debug("Rejecting invalid protocol %s" % (self.proto_name)) |
51 | return | 47 | return |
52 | 48 | ||
53 | self.proto_version = tuple(int(v) for v in client_proto_version.split('.')) | 49 | self.proto_version = tuple(int(v) for v in client_proto_version.split(".")) |
54 | if not self.validate_proto_version(): | 50 | if not self.validate_proto_version(): |
55 | self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version)) | 51 | self.logger.debug( |
52 | "Rejecting invalid protocol version %s" % (client_proto_version) | ||
53 | ) | ||
56 | return | 54 | return |
57 | 55 | ||
58 | # Read headers. Currently, no headers are implemented, so look for | 56 | # Read headers. Currently, no headers are implemented, so look for |
59 | # an empty line to signal the end of the headers | 57 | # an empty line to signal the end of the headers |
60 | while True: | 58 | while True: |
61 | line = await self.reader.readline() | 59 | header = await self.socket.recv() |
62 | if not line: | 60 | if not header: |
63 | return | ||
64 | |||
65 | line = line.decode('utf-8').rstrip() | ||
66 | if not line: | ||
67 | break | 61 | break |
68 | 62 | ||
69 | # Handle messages | 63 | # Handle messages |
70 | while True: | 64 | while True: |
71 | d = await self.read_message() | 65 | d = await self.socket.recv_message() |
72 | if d is None: | 66 | if d is None: |
73 | break | 67 | break |
74 | await self.dispatch_message(d) | 68 | response = await self.dispatch_message(d) |
75 | await self.writer.drain() | 69 | if response is not self.NO_RESPONSE: |
76 | except ClientError as e: | 70 | await self.socket.send_message(response) |
71 | |||
72 | except ConnectionClosedError as e: | ||
73 | self.logger.info(str(e)) | ||
74 | except (ClientError, ConnectionError) as e: | ||
77 | self.logger.error(str(e)) | 75 | self.logger.error(str(e)) |
78 | finally: | 76 | finally: |
79 | self.writer.close() | 77 | await self.close() |
80 | 78 | ||
81 | async def dispatch_message(self, msg): | 79 | async def dispatch_message(self, msg): |
82 | for k in self.handlers.keys(): | 80 | for k in self.handlers.keys(): |
83 | if k in msg: | 81 | if k in msg: |
84 | self.logger.debug('Handling %s' % k) | 82 | self.logger.debug("Handling %s" % k) |
85 | await self.handlers[k](msg[k]) | 83 | return await self.handlers[k](msg[k]) |
86 | return | ||
87 | 84 | ||
88 | raise ClientError("Unrecognized command %r" % msg) | 85 | raise ClientError("Unrecognized command %r" % msg) |
89 | 86 | ||
90 | def write_message(self, msg): | 87 | async def handle_ping(self, request): |
91 | for c in chunkify(json.dumps(msg), self.max_chunk): | 88 | return {"alive": True} |
92 | self.writer.write(c.encode('utf-8')) | ||
93 | 89 | ||
94 | async def read_message(self): | ||
95 | l = await self.reader.readline() | ||
96 | if not l: | ||
97 | return None | ||
98 | 90 | ||
99 | try: | 91 | class StreamServer(object): |
100 | message = l.decode('utf-8') | 92 | def __init__(self, handler, logger): |
93 | self.handler = handler | ||
94 | self.logger = logger | ||
95 | self.closed = False | ||
101 | 96 | ||
102 | if not message.endswith('\n'): | 97 | async def handle_stream_client(self, reader, writer): |
103 | return None | 98 | # writer.transport.set_write_buffer_limits(0) |
99 | socket = StreamConnection(reader, writer, -1) | ||
100 | if self.closed: | ||
101 | await socket.close() | ||
102 | return | ||
103 | |||
104 | await self.handler(socket) | ||
105 | |||
106 | async def stop(self): | ||
107 | self.closed = True | ||
108 | |||
109 | |||
110 | class TCPStreamServer(StreamServer): | ||
111 | def __init__(self, host, port, handler, logger): | ||
112 | super().__init__(handler, logger) | ||
113 | self.host = host | ||
114 | self.port = port | ||
115 | |||
116 | def start(self, loop): | ||
117 | self.server = loop.run_until_complete( | ||
118 | asyncio.start_server(self.handle_stream_client, self.host, self.port) | ||
119 | ) | ||
120 | |||
121 | for s in self.server.sockets: | ||
122 | self.logger.debug("Listening on %r" % (s.getsockname(),)) | ||
123 | # Newer python does this automatically. Do it manually here for | ||
124 | # maximum compatibility | ||
125 | s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | ||
126 | s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) | ||
127 | |||
128 | # Enable keep alives. This prevents broken client connections | ||
129 | # from persisting on the server for long periods of time. | ||
130 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
131 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
132 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
133 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
134 | |||
135 | name = self.server.sockets[0].getsockname() | ||
136 | if self.server.sockets[0].family == socket.AF_INET6: | ||
137 | self.address = "[%s]:%d" % (name[0], name[1]) | ||
138 | else: | ||
139 | self.address = "%s:%d" % (name[0], name[1]) | ||
140 | |||
141 | return [self.server.wait_closed()] | ||
142 | |||
143 | async def stop(self): | ||
144 | await super().stop() | ||
145 | self.server.close() | ||
146 | |||
147 | def cleanup(self): | ||
148 | pass | ||
104 | 149 | ||
105 | return json.loads(message) | ||
106 | except (json.JSONDecodeError, UnicodeDecodeError) as e: | ||
107 | self.logger.error('Bad message from client: %r' % message) | ||
108 | raise e | ||
109 | 150 | ||
110 | async def handle_chunk(self, request): | 151 | class UnixStreamServer(StreamServer): |
111 | lines = [] | 152 | def __init__(self, path, handler, logger): |
112 | try: | 153 | super().__init__(handler, logger) |
113 | while True: | 154 | self.path = path |
114 | l = await self.reader.readline() | ||
115 | l = l.rstrip(b"\n").decode("utf-8") | ||
116 | if not l: | ||
117 | break | ||
118 | lines.append(l) | ||
119 | 155 | ||
120 | msg = json.loads(''.join(lines)) | 156 | def start(self, loop): |
121 | except (json.JSONDecodeError, UnicodeDecodeError) as e: | 157 | cwd = os.getcwd() |
122 | self.logger.error('Bad message from client: %r' % lines) | 158 | try: |
123 | raise e | 159 | # Work around path length limits in AF_UNIX |
160 | os.chdir(os.path.dirname(self.path)) | ||
161 | self.server = loop.run_until_complete( | ||
162 | asyncio.start_unix_server( | ||
163 | self.handle_stream_client, os.path.basename(self.path) | ||
164 | ) | ||
165 | ) | ||
166 | finally: | ||
167 | os.chdir(cwd) | ||
124 | 168 | ||
125 | if 'chunk-stream' in msg: | 169 | self.logger.debug("Listening on %r" % self.path) |
126 | raise ClientError("Nested chunks are not allowed") | 170 | self.address = "unix://%s" % os.path.abspath(self.path) |
171 | return [self.server.wait_closed()] | ||
127 | 172 | ||
128 | await self.dispatch_message(msg) | 173 | async def stop(self): |
174 | await super().stop() | ||
175 | self.server.close() | ||
129 | 176 | ||
130 | async def handle_ping(self, request): | 177 | def cleanup(self): |
131 | response = {'alive': True} | 178 | os.unlink(self.path) |
132 | self.write_message(response) | ||
133 | 179 | ||
134 | 180 | ||
135 | class AsyncServer(object): | 181 | class AsyncServer(object): |
136 | def __init__(self, logger): | 182 | def __init__(self, logger): |
137 | self._cleanup_socket = None | ||
138 | self.logger = logger | 183 | self.logger = logger |
139 | self.start = None | ||
140 | self.address = None | ||
141 | self.loop = None | 184 | self.loop = None |
185 | self.run_tasks = [] | ||
142 | 186 | ||
143 | def start_tcp_server(self, host, port): | 187 | def start_tcp_server(self, host, port): |
144 | def start_tcp(): | 188 | self.server = TCPStreamServer(host, port, self._client_handler, self.logger) |
145 | self.server = self.loop.run_until_complete( | ||
146 | asyncio.start_server(self.handle_client, host, port) | ||
147 | ) | ||
148 | |||
149 | for s in self.server.sockets: | ||
150 | self.logger.debug('Listening on %r' % (s.getsockname(),)) | ||
151 | # Newer python does this automatically. Do it manually here for | ||
152 | # maximum compatibility | ||
153 | s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) | ||
154 | s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) | ||
155 | |||
156 | # Enable keep alives. This prevents broken client connections | ||
157 | # from persisting on the server for long periods of time. | ||
158 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
159 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
160 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
161 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
162 | |||
163 | name = self.server.sockets[0].getsockname() | ||
164 | if self.server.sockets[0].family == socket.AF_INET6: | ||
165 | self.address = "[%s]:%d" % (name[0], name[1]) | ||
166 | else: | ||
167 | self.address = "%s:%d" % (name[0], name[1]) | ||
168 | |||
169 | self.start = start_tcp | ||
170 | 189 | ||
171 | def start_unix_server(self, path): | 190 | def start_unix_server(self, path): |
172 | def cleanup(): | 191 | self.server = UnixStreamServer(path, self._client_handler, self.logger) |
173 | os.unlink(path) | ||
174 | |||
175 | def start_unix(): | ||
176 | cwd = os.getcwd() | ||
177 | try: | ||
178 | # Work around path length limits in AF_UNIX | ||
179 | os.chdir(os.path.dirname(path)) | ||
180 | self.server = self.loop.run_until_complete( | ||
181 | asyncio.start_unix_server(self.handle_client, os.path.basename(path)) | ||
182 | ) | ||
183 | finally: | ||
184 | os.chdir(cwd) | ||
185 | |||
186 | self.logger.debug('Listening on %r' % path) | ||
187 | 192 | ||
188 | self._cleanup_socket = cleanup | 193 | async def _client_handler(self, socket): |
189 | self.address = "unix://%s" % os.path.abspath(path) | ||
190 | |||
191 | self.start = start_unix | ||
192 | |||
193 | @abc.abstractmethod | ||
194 | def accept_client(self, reader, writer): | ||
195 | pass | ||
196 | |||
197 | async def handle_client(self, reader, writer): | ||
198 | # writer.transport.set_write_buffer_limits(0) | ||
199 | try: | 194 | try: |
200 | client = self.accept_client(reader, writer) | 195 | client = self.accept_client(socket) |
201 | await client.process_requests() | 196 | await client.process_requests() |
202 | except Exception as e: | 197 | except Exception as e: |
203 | import traceback | 198 | import traceback |
204 | self.logger.error('Error from client: %s' % str(e), exc_info=True) | 199 | |
200 | self.logger.error("Error from client: %s" % str(e), exc_info=True) | ||
205 | traceback.print_exc() | 201 | traceback.print_exc() |
206 | writer.close() | 202 | await socket.close() |
207 | self.logger.debug('Client disconnected') | 203 | self.logger.debug("Client disconnected") |
208 | 204 | ||
209 | def run_loop_forever(self): | 205 | @abc.abstractmethod |
210 | try: | 206 | def accept_client(self, socket): |
211 | self.loop.run_forever() | 207 | pass |
212 | except KeyboardInterrupt: | 208 | |
213 | pass | 209 | async def stop(self): |
210 | self.logger.debug("Stopping server") | ||
211 | await self.server.stop() | ||
212 | |||
213 | def start(self): | ||
214 | tasks = self.server.start(self.loop) | ||
215 | self.address = self.server.address | ||
216 | return tasks | ||
214 | 217 | ||
215 | def signal_handler(self): | 218 | def signal_handler(self): |
216 | self.logger.debug("Got exit signal") | 219 | self.logger.debug("Got exit signal") |
217 | self.loop.stop() | 220 | self.loop.create_task(self.stop()) |
218 | 221 | ||
219 | def _serve_forever(self): | 222 | def _serve_forever(self, tasks): |
220 | try: | 223 | try: |
221 | self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) | 224 | self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) |
225 | self.loop.add_signal_handler(signal.SIGINT, self.signal_handler) | ||
226 | self.loop.add_signal_handler(signal.SIGQUIT, self.signal_handler) | ||
222 | signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) | 227 | signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) |
223 | 228 | ||
224 | self.run_loop_forever() | 229 | self.loop.run_until_complete(asyncio.gather(*tasks)) |
225 | self.server.close() | ||
226 | 230 | ||
227 | self.loop.run_until_complete(self.server.wait_closed()) | 231 | self.logger.debug("Server shutting down") |
228 | self.logger.debug('Server shutting down') | ||
229 | finally: | 232 | finally: |
230 | if self._cleanup_socket is not None: | 233 | self.server.cleanup() |
231 | self._cleanup_socket() | ||
232 | 234 | ||
233 | def serve_forever(self): | 235 | def serve_forever(self): |
234 | """ | 236 | """ |
235 | Serve requests in the current process | 237 | Serve requests in the current process |
236 | """ | 238 | """ |
239 | self._create_loop() | ||
240 | tasks = self.start() | ||
241 | self._serve_forever(tasks) | ||
242 | self.loop.close() | ||
243 | |||
244 | def _create_loop(self): | ||
237 | # Create loop and override any loop that may have existed in | 245 | # Create loop and override any loop that may have existed in |
238 | # a parent process. It is possible that the usecases of | 246 | # a parent process. It is possible that the usecases of |
239 | # serve_forever might be constrained enough to allow using | 247 | # serve_forever might be constrained enough to allow using |
240 | # get_event_loop here, but better safe than sorry for now. | 248 | # get_event_loop here, but better safe than sorry for now. |
241 | self.loop = asyncio.new_event_loop() | 249 | self.loop = asyncio.new_event_loop() |
242 | asyncio.set_event_loop(self.loop) | 250 | asyncio.set_event_loop(self.loop) |
243 | self.start() | ||
244 | self._serve_forever() | ||
245 | 251 | ||
246 | def serve_as_process(self, *, prefunc=None, args=()): | 252 | def serve_as_process(self, *, prefunc=None, args=()): |
247 | """ | 253 | """ |
248 | Serve requests in a child process | 254 | Serve requests in a child process |
249 | """ | 255 | """ |
256 | |||
250 | def run(queue): | 257 | def run(queue): |
251 | # Create loop and override any loop that may have existed | 258 | # Create loop and override any loop that may have existed |
252 | # in a parent process. Without doing this and instead | 259 | # in a parent process. Without doing this and instead |
@@ -259,18 +266,19 @@ class AsyncServer(object): | |||
259 | # more general, though, as any potential use of asyncio in | 266 | # more general, though, as any potential use of asyncio in |
260 | # Cooker could create a loop that needs to replaced in this | 267 | # Cooker could create a loop that needs to replaced in this |
261 | # new process. | 268 | # new process. |
262 | self.loop = asyncio.new_event_loop() | 269 | self._create_loop() |
263 | asyncio.set_event_loop(self.loop) | ||
264 | try: | 270 | try: |
265 | self.start() | 271 | self.address = None |
272 | tasks = self.start() | ||
266 | finally: | 273 | finally: |
274 | # Always put the server address to wake up the parent task | ||
267 | queue.put(self.address) | 275 | queue.put(self.address) |
268 | queue.close() | 276 | queue.close() |
269 | 277 | ||
270 | if prefunc is not None: | 278 | if prefunc is not None: |
271 | prefunc(self, *args) | 279 | prefunc(self, *args) |
272 | 280 | ||
273 | self._serve_forever() | 281 | self._serve_forever(tasks) |
274 | 282 | ||
275 | if sys.version_info >= (3, 6): | 283 | if sys.version_info >= (3, 6): |
276 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | 284 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 9cb3fd57a5..3a4018353f 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://" | |||
15 | ADDR_TYPE_UNIX = 0 | 15 | ADDR_TYPE_UNIX = 0 |
16 | ADDR_TYPE_TCP = 1 | 16 | ADDR_TYPE_TCP = 1 |
17 | 17 | ||
18 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
19 | # maximum chunk size. It would be better if the client and server reported to | ||
20 | # each other what the maximum chunk sizes were, but that will slow down the | ||
21 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
22 | # is necessary | ||
23 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
24 | |||
25 | UNIHASH_TABLE_DEFINITION = ( | 18 | UNIHASH_TABLE_DEFINITION = ( |
26 | ("method", "TEXT NOT NULL", "UNIQUE"), | 19 | ("method", "TEXT NOT NULL", "UNIQUE"), |
27 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), | 20 | ("taskhash", "TEXT NOT NULL", "UNIQUE"), |
@@ -102,20 +95,6 @@ def parse_address(addr): | |||
102 | return (ADDR_TYPE_TCP, (host, int(port))) | 95 | return (ADDR_TYPE_TCP, (host, int(port))) |
103 | 96 | ||
104 | 97 | ||
105 | def chunkify(msg, max_chunk): | ||
106 | if len(msg) < max_chunk - 1: | ||
107 | yield ''.join((msg, "\n")) | ||
108 | else: | ||
109 | yield ''.join((json.dumps({ | ||
110 | 'chunk-stream': None | ||
111 | }), "\n")) | ||
112 | |||
113 | args = [iter(msg)] * (max_chunk - 1) | ||
114 | for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): | ||
115 | yield ''.join(itertools.chain(m, "\n")) | ||
116 | yield "\n" | ||
117 | |||
118 | |||
119 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | 98 | def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): |
120 | from . import server | 99 | from . import server |
121 | db = setup_database(dbname, sync=sync) | 100 | db = setup_database(dbname, sync=sync) |
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index f676d267fa..5f7d22ab13 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
28 | 28 | ||
29 | async def send_stream(self, msg): | 29 | async def send_stream(self, msg): |
30 | async def proc(): | 30 | async def proc(): |
31 | self.writer.write(("%s\n" % msg).encode("utf-8")) | 31 | await self.socket.send(msg) |
32 | await self.writer.drain() | 32 | return await self.socket.recv() |
33 | l = await self.reader.readline() | ||
34 | if not l: | ||
35 | raise ConnectionError("Connection closed") | ||
36 | return l.decode("utf-8").rstrip() | ||
37 | 33 | ||
38 | return await self._send_wrapper(proc) | 34 | return await self._send_wrapper(proc) |
39 | 35 | ||
40 | async def _set_mode(self, new_mode): | 36 | async def _set_mode(self, new_mode): |
37 | async def stream_to_normal(): | ||
38 | await self.socket.send("END") | ||
39 | return await self.socket.recv() | ||
40 | |||
41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 41 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: |
42 | r = await self.send_stream("END") | 42 | r = await self._send_wrapper(stream_to_normal) |
43 | if r != "ok": | 43 | if r != "ok": |
44 | raise ConnectionError("Bad response from server %r" % r) | 44 | raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r) |
45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 45 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: |
46 | r = await self.send_message({"get-stream": None}) | 46 | r = await self.invoke({"get-stream": None}) |
47 | if r != "ok": | 47 | if r != "ok": |
48 | raise ConnectionError("Bad response from server %r" % r) | 48 | raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r) |
49 | elif new_mode != self.mode: | 49 | elif new_mode != self.mode: |
50 | raise Exception( | 50 | raise Exception( |
51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 51 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) |
@@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
67 | m["method"] = method | 67 | m["method"] = method |
68 | m["outhash"] = outhash | 68 | m["outhash"] = outhash |
69 | m["unihash"] = unihash | 69 | m["unihash"] = unihash |
70 | return await self.send_message({"report": m}) | 70 | return await self.invoke({"report": m}) |
71 | 71 | ||
72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 72 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
73 | await self._set_mode(self.MODE_NORMAL) | 73 | await self._set_mode(self.MODE_NORMAL) |
@@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
75 | m["taskhash"] = taskhash | 75 | m["taskhash"] = taskhash |
76 | m["method"] = method | 76 | m["method"] = method |
77 | m["unihash"] = unihash | 77 | m["unihash"] = unihash |
78 | return await self.send_message({"report-equiv": m}) | 78 | return await self.invoke({"report-equiv": m}) |
79 | 79 | ||
80 | async def get_taskhash(self, method, taskhash, all_properties=False): | 80 | async def get_taskhash(self, method, taskhash, all_properties=False): |
81 | await self._set_mode(self.MODE_NORMAL) | 81 | await self._set_mode(self.MODE_NORMAL) |
82 | return await self.send_message( | 82 | return await self.invoke( |
83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 83 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
84 | ) | 84 | ) |
85 | 85 | ||
86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 86 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
87 | await self._set_mode(self.MODE_NORMAL) | 87 | await self._set_mode(self.MODE_NORMAL) |
88 | return await self.send_message( | 88 | return await self.invoke( |
89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} | 89 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} |
90 | ) | 90 | ) |
91 | 91 | ||
92 | async def get_stats(self): | 92 | async def get_stats(self): |
93 | await self._set_mode(self.MODE_NORMAL) | 93 | await self._set_mode(self.MODE_NORMAL) |
94 | return await self.send_message({"get-stats": None}) | 94 | return await self.invoke({"get-stats": None}) |
95 | 95 | ||
96 | async def reset_stats(self): | 96 | async def reset_stats(self): |
97 | await self._set_mode(self.MODE_NORMAL) | 97 | await self._set_mode(self.MODE_NORMAL) |
98 | return await self.send_message({"reset-stats": None}) | 98 | return await self.invoke({"reset-stats": None}) |
99 | 99 | ||
100 | async def backfill_wait(self): | 100 | async def backfill_wait(self): |
101 | await self._set_mode(self.MODE_NORMAL) | 101 | await self._set_mode(self.MODE_NORMAL) |
102 | return (await self.send_message({"backfill-wait": None}))["tasks"] | 102 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
103 | 103 | ||
104 | async def remove(self, where): | 104 | async def remove(self, where): |
105 | await self._set_mode(self.MODE_NORMAL) | 105 | await self._set_mode(self.MODE_NORMAL) |
106 | return await self.send_message({"remove": {"where": where}}) | 106 | return await self.invoke({"remove": {"where": where}}) |
107 | 107 | ||
108 | async def clean_unused(self, max_age): | 108 | async def clean_unused(self, max_age): |
109 | await self._set_mode(self.MODE_NORMAL) | 109 | await self._set_mode(self.MODE_NORMAL) |
110 | return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) | 110 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) |
111 | 111 | ||
112 | 112 | ||
113 | class Client(bb.asyncrpc.Client): | 113 | class Client(bb.asyncrpc.Client): |
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py index 45bf476bfe..13b754805b 100644 --- a/bitbake/lib/hashserv/server.py +++ b/bitbake/lib/hashserv/server.py | |||
@@ -165,8 +165,8 @@ class ServerCursor(object): | |||
165 | 165 | ||
166 | 166 | ||
167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): | 167 | class ServerClient(bb.asyncrpc.AsyncServerConnection): |
168 | def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): | 168 | def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only): |
169 | super().__init__(reader, writer, 'OEHASHEQUIV', logger) | 169 | super().__init__(socket, 'OEHASHEQUIV', logger) |
170 | self.db = db | 170 | self.db = db |
171 | self.request_stats = request_stats | 171 | self.request_stats = request_stats |
172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK | 172 | self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK |
@@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
209 | if k in msg: | 209 | if k in msg: |
210 | logger.debug('Handling %s' % k) | 210 | logger.debug('Handling %s' % k) |
211 | if 'stream' in k: | 211 | if 'stream' in k: |
212 | await self.handlers[k](msg[k]) | 212 | return await self.handlers[k](msg[k]) |
213 | else: | 213 | else: |
214 | with self.request_stats.start_sample() as self.request_sample, \ | 214 | with self.request_stats.start_sample() as self.request_sample, \ |
215 | self.request_sample.measure(): | 215 | self.request_sample.measure(): |
216 | await self.handlers[k](msg[k]) | 216 | return await self.handlers[k](msg[k]) |
217 | return | ||
218 | 217 | ||
219 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) | 218 | raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) |
220 | 219 | ||
@@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
224 | fetch_all = request.get('all', False) | 223 | fetch_all = request.get('all', False) |
225 | 224 | ||
226 | with closing(self.db.cursor()) as cursor: | 225 | with closing(self.db.cursor()) as cursor: |
227 | d = await self.get_unihash(cursor, method, taskhash, fetch_all) | 226 | return await self.get_unihash(cursor, method, taskhash, fetch_all) |
228 | |||
229 | self.write_message(d) | ||
230 | 227 | ||
231 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): | 228 | async def get_unihash(self, cursor, method, taskhash, fetch_all=False): |
232 | d = None | 229 | d = None |
@@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
274 | with_unihash = request.get("with_unihash", True) | 271 | with_unihash = request.get("with_unihash", True) |
275 | 272 | ||
276 | with closing(self.db.cursor()) as cursor: | 273 | with closing(self.db.cursor()) as cursor: |
277 | d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) | 274 | return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) |
278 | |||
279 | self.write_message(d) | ||
280 | 275 | ||
281 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): | 276 | async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): |
282 | d = None | 277 | d = None |
@@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
334 | ) | 329 | ) |
335 | 330 | ||
336 | async def handle_get_stream(self, request): | 331 | async def handle_get_stream(self, request): |
337 | self.write_message('ok') | 332 | await self.socket.send_message("ok") |
338 | 333 | ||
339 | while True: | 334 | while True: |
340 | upstream = None | 335 | upstream = None |
341 | 336 | ||
342 | l = await self.reader.readline() | 337 | l = await self.socket.recv() |
343 | if not l: | 338 | if not l: |
344 | return | 339 | break |
345 | 340 | ||
346 | try: | 341 | try: |
347 | # This inner loop is very sensitive and must be as fast as | 342 | # This inner loop is very sensitive and must be as fast as |
@@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
352 | request_measure = self.request_sample.measure() | 347 | request_measure = self.request_sample.measure() |
353 | request_measure.start() | 348 | request_measure.start() |
354 | 349 | ||
355 | l = l.decode('utf-8').rstrip() | ||
356 | if l == 'END': | 350 | if l == 'END': |
357 | self.writer.write('ok\n'.encode('utf-8')) | 351 | break |
358 | return | ||
359 | 352 | ||
360 | (method, taskhash) = l.split() | 353 | (method, taskhash) = l.split() |
361 | #logger.debug('Looking up %s %s' % (method, taskhash)) | 354 | #logger.debug('Looking up %s %s' % (method, taskhash)) |
@@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
366 | cursor.close() | 359 | cursor.close() |
367 | 360 | ||
368 | if row is not None: | 361 | if row is not None: |
369 | msg = ('%s\n' % row['unihash']).encode('utf-8') | 362 | msg = row['unihash'] |
370 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) | 363 | #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) |
371 | elif self.upstream_client is not None: | 364 | elif self.upstream_client is not None: |
372 | upstream = await self.upstream_client.get_unihash(method, taskhash) | 365 | upstream = await self.upstream_client.get_unihash(method, taskhash) |
373 | if upstream: | 366 | if upstream: |
374 | msg = ("%s\n" % upstream).encode("utf-8") | 367 | msg = upstream |
375 | else: | 368 | else: |
376 | msg = "\n".encode("utf-8") | 369 | msg = "" |
377 | else: | 370 | else: |
378 | msg = '\n'.encode('utf-8') | 371 | msg = "" |
379 | 372 | ||
380 | self.writer.write(msg) | 373 | await self.socket.send(msg) |
381 | finally: | 374 | finally: |
382 | request_measure.end() | 375 | request_measure.end() |
383 | self.request_sample.end() | 376 | self.request_sample.end() |
384 | 377 | ||
385 | await self.writer.drain() | ||
386 | |||
387 | # Post to the backfill queue after writing the result to minimize | 378 | # Post to the backfill queue after writing the result to minimize |
388 | # the turn around time on a request | 379 | # the turn around time on a request |
389 | if upstream is not None: | 380 | if upstream is not None: |
390 | await self.backfill_queue.put((method, taskhash)) | 381 | await self.backfill_queue.put((method, taskhash)) |
391 | 382 | ||
383 | await self.socket.send("ok") | ||
384 | return self.NO_RESPONSE | ||
385 | |||
392 | async def handle_report(self, data): | 386 | async def handle_report(self, data): |
393 | with closing(self.db.cursor()) as cursor: | 387 | with closing(self.db.cursor()) as cursor: |
394 | outhash_data = { | 388 | outhash_data = { |
@@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
468 | 'unihash': unihash, | 462 | 'unihash': unihash, |
469 | } | 463 | } |
470 | 464 | ||
471 | self.write_message(d) | 465 | return d |
472 | 466 | ||
473 | async def handle_equivreport(self, data): | 467 | async def handle_equivreport(self, data): |
474 | with closing(self.db.cursor()) as cursor: | 468 | with closing(self.db.cursor()) as cursor: |
@@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
491 | 485 | ||
492 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} | 486 | d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} |
493 | 487 | ||
494 | self.write_message(d) | 488 | return d |
495 | 489 | ||
496 | 490 | ||
497 | async def handle_get_stats(self, request): | 491 | async def handle_get_stats(self, request): |
498 | d = { | 492 | return { |
499 | 'requests': self.request_stats.todict(), | 493 | 'requests': self.request_stats.todict(), |
500 | } | 494 | } |
501 | 495 | ||
502 | self.write_message(d) | ||
503 | |||
504 | async def handle_reset_stats(self, request): | 496 | async def handle_reset_stats(self, request): |
505 | d = { | 497 | d = { |
506 | 'requests': self.request_stats.todict(), | 498 | 'requests': self.request_stats.todict(), |
507 | } | 499 | } |
508 | 500 | ||
509 | self.request_stats.reset() | 501 | self.request_stats.reset() |
510 | self.write_message(d) | 502 | return d |
511 | 503 | ||
512 | async def handle_backfill_wait(self, request): | 504 | async def handle_backfill_wait(self, request): |
513 | d = { | 505 | d = { |
514 | 'tasks': self.backfill_queue.qsize(), | 506 | 'tasks': self.backfill_queue.qsize(), |
515 | } | 507 | } |
516 | await self.backfill_queue.join() | 508 | await self.backfill_queue.join() |
517 | self.write_message(d) | 509 | return d |
518 | 510 | ||
519 | async def handle_remove(self, request): | 511 | async def handle_remove(self, request): |
520 | condition = request["where"] | 512 | condition = request["where"] |
@@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
541 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) | 533 | count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) |
542 | self.db.commit() | 534 | self.db.commit() |
543 | 535 | ||
544 | self.write_message({"count": count}) | 536 | return {"count": count} |
545 | 537 | ||
546 | async def handle_clean_unused(self, request): | 538 | async def handle_clean_unused(self, request): |
547 | max_age = request["max_age_seconds"] | 539 | max_age = request["max_age_seconds"] |
@@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): | |||
558 | ) | 550 | ) |
559 | count = cursor.rowcount | 551 | count = cursor.rowcount |
560 | 552 | ||
561 | self.write_message({"count": count}) | 553 | return {"count": count} |
562 | 554 | ||
563 | def query_equivalent(self, cursor, method, taskhash): | 555 | def query_equivalent(self, cursor, method, taskhash): |
564 | # This is part of the inner loop and must be as fast as possible | 556 | # This is part of the inner loop and must be as fast as possible |
@@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer): | |||
583 | self.db = db | 575 | self.db = db |
584 | self.upstream = upstream | 576 | self.upstream = upstream |
585 | self.read_only = read_only | 577 | self.read_only = read_only |
578 | self.backfill_queue = None | ||
586 | 579 | ||
587 | def accept_client(self, reader, writer): | 580 | def accept_client(self, socket): |
588 | return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) | 581 | return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) |
589 | 582 | ||
590 | @contextmanager | 583 | async def backfill_worker_task(self): |
591 | def _backfill_worker(self): | 584 | client = await create_async_client(self.upstream) |
592 | async def backfill_worker_task(): | 585 | try: |
593 | client = await create_async_client(self.upstream) | 586 | while True: |
594 | try: | 587 | item = await self.backfill_queue.get() |
595 | while True: | 588 | if item is None: |
596 | item = await self.backfill_queue.get() | ||
597 | if item is None: | ||
598 | self.backfill_queue.task_done() | ||
599 | break | ||
600 | method, taskhash = item | ||
601 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
602 | self.backfill_queue.task_done() | 589 | self.backfill_queue.task_done() |
603 | finally: | 590 | break |
604 | await client.close() | 591 | method, taskhash = item |
592 | await copy_unihash_from_upstream(client, self.db, method, taskhash) | ||
593 | self.backfill_queue.task_done() | ||
594 | finally: | ||
595 | await client.close() | ||
605 | 596 | ||
606 | async def join_worker(worker): | 597 | def start(self): |
598 | tasks = super().start() | ||
599 | if self.upstream: | ||
600 | self.backfill_queue = asyncio.Queue() | ||
601 | tasks += [self.backfill_worker_task()] | ||
602 | return tasks | ||
603 | |||
604 | async def stop(self): | ||
605 | if self.backfill_queue is not None: | ||
607 | await self.backfill_queue.put(None) | 606 | await self.backfill_queue.put(None) |
608 | await worker | 607 | await super().stop() |
609 | |||
610 | if self.upstream is not None: | ||
611 | worker = asyncio.ensure_future(backfill_worker_task()) | ||
612 | try: | ||
613 | yield | ||
614 | finally: | ||
615 | self.loop.run_until_complete(join_worker(worker)) | ||
616 | else: | ||
617 | yield | ||
618 | |||
619 | def run_loop_forever(self): | ||
620 | self.backfill_queue = asyncio.Queue() | ||
621 | |||
622 | with self._backfill_worker(): | ||
623 | super().run_loop_forever() | ||
diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py index 69ab7a4ac9..6b81356fac 100644 --- a/bitbake/lib/prserv/client.py +++ b/bitbake/lib/prserv/client.py | |||
@@ -14,28 +14,28 @@ class PRAsyncClient(bb.asyncrpc.AsyncClient): | |||
14 | super().__init__('PRSERVICE', '1.0', logger) | 14 | super().__init__('PRSERVICE', '1.0', logger) |
15 | 15 | ||
16 | async def getPR(self, version, pkgarch, checksum): | 16 | async def getPR(self, version, pkgarch, checksum): |
17 | response = await self.send_message( | 17 | response = await self.invoke( |
18 | {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} | 18 | {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} |
19 | ) | 19 | ) |
20 | if response: | 20 | if response: |
21 | return response['value'] | 21 | return response['value'] |
22 | 22 | ||
23 | async def importone(self, version, pkgarch, checksum, value): | 23 | async def importone(self, version, pkgarch, checksum, value): |
24 | response = await self.send_message( | 24 | response = await self.invoke( |
25 | {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} | 25 | {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} |
26 | ) | 26 | ) |
27 | if response: | 27 | if response: |
28 | return response['value'] | 28 | return response['value'] |
29 | 29 | ||
30 | async def export(self, version, pkgarch, checksum, colinfo): | 30 | async def export(self, version, pkgarch, checksum, colinfo): |
31 | response = await self.send_message( | 31 | response = await self.invoke( |
32 | {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} | 32 | {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} |
33 | ) | 33 | ) |
34 | if response: | 34 | if response: |
35 | return (response['metainfo'], response['datainfo']) | 35 | return (response['metainfo'], response['datainfo']) |
36 | 36 | ||
37 | async def is_readonly(self): | 37 | async def is_readonly(self): |
38 | response = await self.send_message( | 38 | response = await self.invoke( |
39 | {'is-readonly': {}} | 39 | {'is-readonly': {}} |
40 | ) | 40 | ) |
41 | if response: | 41 | if response: |
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index c686b2065c..ea7933164b 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py | |||
@@ -20,8 +20,8 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid" | |||
20 | singleton = None | 20 | singleton = None |
21 | 21 | ||
22 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): | 22 | class PRServerClient(bb.asyncrpc.AsyncServerConnection): |
23 | def __init__(self, reader, writer, table, read_only): | 23 | def __init__(self, socket, table, read_only): |
24 | super().__init__(reader, writer, 'PRSERVICE', logger) | 24 | super().__init__(socket, 'PRSERVICE', logger) |
25 | self.handlers.update({ | 25 | self.handlers.update({ |
26 | 'get-pr': self.handle_get_pr, | 26 | 'get-pr': self.handle_get_pr, |
27 | 'import-one': self.handle_import_one, | 27 | 'import-one': self.handle_import_one, |
@@ -36,12 +36,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
36 | 36 | ||
37 | async def dispatch_message(self, msg): | 37 | async def dispatch_message(self, msg): |
38 | try: | 38 | try: |
39 | await super().dispatch_message(msg) | 39 | return await super().dispatch_message(msg) |
40 | except: | 40 | except: |
41 | self.table.sync() | 41 | self.table.sync() |
42 | raise | 42 | raise |
43 | 43 | else: | |
44 | self.table.sync_if_dirty() | 44 | self.table.sync_if_dirty() |
45 | 45 | ||
46 | async def handle_get_pr(self, request): | 46 | async def handle_get_pr(self, request): |
47 | version = request['version'] | 47 | version = request['version'] |
@@ -57,7 +57,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
57 | except sqlite3.Error as exc: | 57 | except sqlite3.Error as exc: |
58 | logger.error(str(exc)) | 58 | logger.error(str(exc)) |
59 | 59 | ||
60 | self.write_message(response) | 60 | return response |
61 | 61 | ||
62 | async def handle_import_one(self, request): | 62 | async def handle_import_one(self, request): |
63 | response = None | 63 | response = None |
@@ -71,7 +71,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
71 | if value is not None: | 71 | if value is not None: |
72 | response = {'value': value} | 72 | response = {'value': value} |
73 | 73 | ||
74 | self.write_message(response) | 74 | return response |
75 | 75 | ||
76 | async def handle_export(self, request): | 76 | async def handle_export(self, request): |
77 | version = request['version'] | 77 | version = request['version'] |
@@ -85,12 +85,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection): | |||
85 | logger.error(str(exc)) | 85 | logger.error(str(exc)) |
86 | metainfo = datainfo = None | 86 | metainfo = datainfo = None |
87 | 87 | ||
88 | response = {'metainfo': metainfo, 'datainfo': datainfo} | 88 | return {'metainfo': metainfo, 'datainfo': datainfo} |
89 | self.write_message(response) | ||
90 | 89 | ||
91 | async def handle_is_readonly(self, request): | 90 | async def handle_is_readonly(self, request): |
92 | response = {'readonly': self.read_only} | 91 | return {'readonly': self.read_only} |
93 | self.write_message(response) | ||
94 | 92 | ||
95 | class PRServer(bb.asyncrpc.AsyncServer): | 93 | class PRServer(bb.asyncrpc.AsyncServer): |
96 | def __init__(self, dbfile, read_only=False): | 94 | def __init__(self, dbfile, read_only=False): |
@@ -99,20 +97,23 @@ class PRServer(bb.asyncrpc.AsyncServer): | |||
99 | self.table = None | 97 | self.table = None |
100 | self.read_only = read_only | 98 | self.read_only = read_only |
101 | 99 | ||
102 | def accept_client(self, reader, writer): | 100 | def accept_client(self, socket): |
103 | return PRServerClient(reader, writer, self.table, self.read_only) | 101 | return PRServerClient(socket, self.table, self.read_only) |
104 | 102 | ||
105 | def _serve_forever(self): | 103 | def start(self): |
104 | tasks = super().start() | ||
106 | self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) | 105 | self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) |
107 | self.table = self.db["PRMAIN"] | 106 | self.table = self.db["PRMAIN"] |
108 | 107 | ||
109 | logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % | 108 | logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % |
110 | (self.dbfile, self.address, str(os.getpid()))) | 109 | (self.dbfile, self.address, str(os.getpid()))) |
111 | 110 | ||
112 | super()._serve_forever() | 111 | return tasks |
113 | 112 | ||
113 | async def stop(self): | ||
114 | self.table.sync_if_dirty() | 114 | self.table.sync_if_dirty() |
115 | self.db.disconnect() | 115 | self.db.disconnect() |
116 | await super().stop() | ||
116 | 117 | ||
117 | def signal_handler(self): | 118 | def signal_handler(self): |
118 | super().signal_handler() | 119 | super().signal_handler() |