summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bitbake/lib/bb/asyncrpc/__init__.py32
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py78
-rw-r--r--bitbake/lib/bb/asyncrpc/connection.py95
-rw-r--r--bitbake/lib/bb/asyncrpc/exceptions.py17
-rw-r--r--bitbake/lib/bb/asyncrpc/serv.py304
-rw-r--r--bitbake/lib/hashserv/__init__.py21
-rw-r--r--bitbake/lib/hashserv/client.py38
-rw-r--r--bitbake/lib/hashserv/server.py116
-rw-r--r--bitbake/lib/prserv/client.py8
-rw-r--r--bitbake/lib/prserv/serv.py31
10 files changed, 387 insertions, 353 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py
index 9a85e9965b..9f677eac4c 100644
--- a/bitbake/lib/bb/asyncrpc/__init__.py
+++ b/bitbake/lib/bb/asyncrpc/__init__.py
@@ -4,30 +4,12 @@
4# SPDX-License-Identifier: GPL-2.0-only 4# SPDX-License-Identifier: GPL-2.0-only
5# 5#
6 6
7import itertools
8import json
9
10# The Python async server defaults to a 64K receive buffer, so we hardcode our
11# maximum chunk size. It would be better if the client and server reported to
12# each other what the maximum chunk sizes were, but that will slow down the
13# connection setup with a round trip delay so I'd rather not do that unless it
14# is necessary
15DEFAULT_MAX_CHUNK = 32 * 1024
16
17
18def chunkify(msg, max_chunk):
19 if len(msg) < max_chunk - 1:
20 yield ''.join((msg, "\n"))
21 else:
22 yield ''.join((json.dumps({
23 'chunk-stream': None
24 }), "\n"))
25
26 args = [iter(msg)] * (max_chunk - 1)
27 for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
28 yield ''.join(itertools.chain(m, "\n"))
29 yield "\n"
30
31 7
32from .client import AsyncClient, Client 8from .client import AsyncClient, Client
33from .serv import AsyncServer, AsyncServerConnection, ClientError, ServerError 9from .serv import AsyncServer, AsyncServerConnection
10from .connection import DEFAULT_MAX_CHUNK
11from .exceptions import (
12 ClientError,
13 ServerError,
14 ConnectionClosedError,
15)
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index fa042bbe87..7f33099b63 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -10,13 +10,13 @@ import json
10import os 10import os
11import socket 11import socket
12import sys 12import sys
13from . import chunkify, DEFAULT_MAX_CHUNK 13from .connection import StreamConnection, DEFAULT_MAX_CHUNK
14from .exceptions import ConnectionClosedError
14 15
15 16
16class AsyncClient(object): 17class AsyncClient(object):
17 def __init__(self, proto_name, proto_version, logger, timeout=30): 18 def __init__(self, proto_name, proto_version, logger, timeout=30):
18 self.reader = None 19 self.socket = None
19 self.writer = None
20 self.max_chunk = DEFAULT_MAX_CHUNK 20 self.max_chunk = DEFAULT_MAX_CHUNK
21 self.proto_name = proto_name 21 self.proto_name = proto_name
22 self.proto_version = proto_version 22 self.proto_version = proto_version
@@ -25,7 +25,8 @@ class AsyncClient(object):
25 25
26 async def connect_tcp(self, address, port): 26 async def connect_tcp(self, address, port):
27 async def connect_sock(): 27 async def connect_sock():
28 return await asyncio.open_connection(address, port) 28 reader, writer = await asyncio.open_connection(address, port)
29 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
29 30
30 self._connect_sock = connect_sock 31 self._connect_sock = connect_sock
31 32
@@ -40,27 +41,27 @@ class AsyncClient(object):
40 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 41 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
41 sock.connect(os.path.basename(path)) 42 sock.connect(os.path.basename(path))
42 finally: 43 finally:
43 os.chdir(cwd) 44 os.chdir(cwd)
44 return await asyncio.open_unix_connection(sock=sock) 45 reader, writer = await asyncio.open_unix_connection(sock=sock)
46 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
45 47
46 self._connect_sock = connect_sock 48 self._connect_sock = connect_sock
47 49
48 async def setup_connection(self): 50 async def setup_connection(self):
49 s = '%s %s\n\n' % (self.proto_name, self.proto_version) 51 # Send headers
50 self.writer.write(s.encode("utf-8")) 52 await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
51 await self.writer.drain() 53 # End of headers
54 await self.socket.send("")
52 55
53 async def connect(self): 56 async def connect(self):
54 if self.reader is None or self.writer is None: 57 if self.socket is None:
55 (self.reader, self.writer) = await self._connect_sock() 58 self.socket = await self._connect_sock()
56 await self.setup_connection() 59 await self.setup_connection()
57 60
58 async def close(self): 61 async def close(self):
59 self.reader = None 62 if self.socket is not None:
60 63 await self.socket.close()
61 if self.writer is not None: 64 self.socket = None
62 self.writer.close()
63 self.writer = None
64 65
65 async def _send_wrapper(self, proc): 66 async def _send_wrapper(self, proc):
66 count = 0 67 count = 0
@@ -71,6 +72,7 @@ class AsyncClient(object):
71 except ( 72 except (
72 OSError, 73 OSError,
73 ConnectionError, 74 ConnectionError,
75 ConnectionClosedError,
74 json.JSONDecodeError, 76 json.JSONDecodeError,
75 UnicodeDecodeError, 77 UnicodeDecodeError,
76 ) as e: 78 ) as e:
@@ -82,49 +84,15 @@ class AsyncClient(object):
82 await self.close() 84 await self.close()
83 count += 1 85 count += 1
84 86
85 async def send_message(self, msg): 87 async def invoke(self, msg):
86 async def get_line():
87 try:
88 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
89 except asyncio.TimeoutError:
90 raise ConnectionError("Timed out waiting for server")
91
92 if not line:
93 raise ConnectionError("Connection closed")
94
95 line = line.decode("utf-8")
96
97 if not line.endswith("\n"):
98 raise ConnectionError("Bad message %r" % (line))
99
100 return line
101
102 async def proc(): 88 async def proc():
103 for c in chunkify(json.dumps(msg), self.max_chunk): 89 await self.socket.send_message(msg)
104 self.writer.write(c.encode("utf-8")) 90 return await self.socket.recv_message()
105 await self.writer.drain()
106
107 l = await get_line()
108
109 m = json.loads(l)
110 if m and "chunk-stream" in m:
111 lines = []
112 while True:
113 l = (await get_line()).rstrip("\n")
114 if not l:
115 break
116 lines.append(l)
117
118 m = json.loads("".join(lines))
119
120 return m
121 91
122 return await self._send_wrapper(proc) 92 return await self._send_wrapper(proc)
123 93
124 async def ping(self): 94 async def ping(self):
125 return await self.send_message( 95 return await self.invoke({"ping": {}})
126 {'ping': {}}
127 )
128 96
129 97
130class Client(object): 98class Client(object):
@@ -142,7 +110,7 @@ class Client(object):
142 # required (but harmless) with it. 110 # required (but harmless) with it.
143 asyncio.set_event_loop(self.loop) 111 asyncio.set_event_loop(self.loop)
144 112
145 self._add_methods('connect_tcp', 'ping') 113 self._add_methods("connect_tcp", "ping")
146 114
147 @abc.abstractmethod 115 @abc.abstractmethod
148 def _get_async_client(self): 116 def _get_async_client(self):
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py
new file mode 100644
index 0000000000..c4fd24754c
--- /dev/null
+++ b/bitbake/lib/bb/asyncrpc/connection.py
@@ -0,0 +1,95 @@
1#
2# Copyright BitBake Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import asyncio
8import itertools
9import json
10from .exceptions import ClientError, ConnectionClosedError
11
12
13# The Python async server defaults to a 64K receive buffer, so we hardcode our
14# maximum chunk size. It would be better if the client and server reported to
15# each other what the maximum chunk sizes were, but that will slow down the
16# connection setup with a round trip delay so I'd rather not do that unless it
17# is necessary
18DEFAULT_MAX_CHUNK = 32 * 1024
19
20
21def chunkify(msg, max_chunk):
22 if len(msg) < max_chunk - 1:
23 yield "".join((msg, "\n"))
24 else:
25 yield "".join((json.dumps({"chunk-stream": None}), "\n"))
26
27 args = [iter(msg)] * (max_chunk - 1)
28 for m in map("".join, itertools.zip_longest(*args, fillvalue="")):
29 yield "".join(itertools.chain(m, "\n"))
30 yield "\n"
31
32
33class StreamConnection(object):
34 def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK):
35 self.reader = reader
36 self.writer = writer
37 self.timeout = timeout
38 self.max_chunk = max_chunk
39
40 @property
41 def address(self):
42 return self.writer.get_extra_info("peername")
43
44 async def send_message(self, msg):
45 for c in chunkify(json.dumps(msg), self.max_chunk):
46 self.writer.write(c.encode("utf-8"))
47 await self.writer.drain()
48
49 async def recv_message(self):
50 l = await self.recv()
51
52 m = json.loads(l)
53 if not m:
54 return m
55
56 if "chunk-stream" in m:
57 lines = []
58 while True:
59 l = await self.recv()
60 if not l:
61 break
62 lines.append(l)
63
64 m = json.loads("".join(lines))
65
66 return m
67
68 async def send(self, msg):
69 self.writer.write(("%s\n" % msg).encode("utf-8"))
70 await self.writer.drain()
71
72 async def recv(self):
73 if self.timeout < 0:
74 line = await self.reader.readline()
75 else:
76 try:
77 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
78 except asyncio.TimeoutError:
79 raise ConnectionError("Timed out waiting for data")
80
81 if not line:
82 raise ConnectionClosedError("Connection closed")
83
84 line = line.decode("utf-8")
85
86 if not line.endswith("\n"):
87 raise ConnectionError("Bad message %r" % (line))
88
89 return line.rstrip()
90
91 async def close(self):
92 self.reader = None
93 if self.writer is not None:
94 self.writer.close()
95 self.writer = None
diff --git a/bitbake/lib/bb/asyncrpc/exceptions.py b/bitbake/lib/bb/asyncrpc/exceptions.py
new file mode 100644
index 0000000000..a8942b4f0c
--- /dev/null
+++ b/bitbake/lib/bb/asyncrpc/exceptions.py
@@ -0,0 +1,17 @@
1#
2# Copyright BitBake Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7
8class ClientError(Exception):
9 pass
10
11
12class ServerError(Exception):
13 pass
14
15
16class ConnectionClosedError(Exception):
17 pass
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py
index d2de4891b8..3e0d0632cb 100644
--- a/bitbake/lib/bb/asyncrpc/serv.py
+++ b/bitbake/lib/bb/asyncrpc/serv.py
@@ -12,241 +12,248 @@ import signal
12import socket 12import socket
13import sys 13import sys
14import multiprocessing 14import multiprocessing
15from . import chunkify, DEFAULT_MAX_CHUNK 15from .connection import StreamConnection
16 16from .exceptions import ClientError, ServerError, ConnectionClosedError
17
18class ClientError(Exception):
19 pass
20
21
22class ServerError(Exception):
23 pass
24 17
25 18
26class AsyncServerConnection(object): 19class AsyncServerConnection(object):
27 def __init__(self, reader, writer, proto_name, logger): 20 # If a handler returns this object (e.g. `return self.NO_RESPONSE`), no
28 self.reader = reader 21 # return message will be automatically be sent back to the client
29 self.writer = writer 22 NO_RESPONSE = object()
23
24 def __init__(self, socket, proto_name, logger):
25 self.socket = socket
30 self.proto_name = proto_name 26 self.proto_name = proto_name
31 self.max_chunk = DEFAULT_MAX_CHUNK
32 self.handlers = { 27 self.handlers = {
33 'chunk-stream': self.handle_chunk, 28 "ping": self.handle_ping,
34 'ping': self.handle_ping,
35 } 29 }
36 self.logger = logger 30 self.logger = logger
37 31
32 async def close(self):
33 await self.socket.close()
34
38 async def process_requests(self): 35 async def process_requests(self):
39 try: 36 try:
40 self.addr = self.writer.get_extra_info('peername') 37 self.logger.info("Client %r connected" % (self.socket.address,))
41 self.logger.debug('Client %r connected' % (self.addr,))
42 38
43 # Read protocol and version 39 # Read protocol and version
44 client_protocol = await self.reader.readline() 40 client_protocol = await self.socket.recv()
45 if not client_protocol: 41 if not client_protocol:
46 return 42 return
47 43
48 (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split() 44 (client_proto_name, client_proto_version) = client_protocol.split()
49 if client_proto_name != self.proto_name: 45 if client_proto_name != self.proto_name:
50 self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name)) 46 self.logger.debug("Rejecting invalid protocol %s" % (self.proto_name))
51 return 47 return
52 48
53 self.proto_version = tuple(int(v) for v in client_proto_version.split('.')) 49 self.proto_version = tuple(int(v) for v in client_proto_version.split("."))
54 if not self.validate_proto_version(): 50 if not self.validate_proto_version():
55 self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version)) 51 self.logger.debug(
52 "Rejecting invalid protocol version %s" % (client_proto_version)
53 )
56 return 54 return
57 55
58 # Read headers. Currently, no headers are implemented, so look for 56 # Read headers. Currently, no headers are implemented, so look for
59 # an empty line to signal the end of the headers 57 # an empty line to signal the end of the headers
60 while True: 58 while True:
61 line = await self.reader.readline() 59 header = await self.socket.recv()
62 if not line: 60 if not header:
63 return
64
65 line = line.decode('utf-8').rstrip()
66 if not line:
67 break 61 break
68 62
69 # Handle messages 63 # Handle messages
70 while True: 64 while True:
71 d = await self.read_message() 65 d = await self.socket.recv_message()
72 if d is None: 66 if d is None:
73 break 67 break
74 await self.dispatch_message(d) 68 response = await self.dispatch_message(d)
75 await self.writer.drain() 69 if response is not self.NO_RESPONSE:
76 except ClientError as e: 70 await self.socket.send_message(response)
71
72 except ConnectionClosedError as e:
73 self.logger.info(str(e))
74 except (ClientError, ConnectionError) as e:
77 self.logger.error(str(e)) 75 self.logger.error(str(e))
78 finally: 76 finally:
79 self.writer.close() 77 await self.close()
80 78
81 async def dispatch_message(self, msg): 79 async def dispatch_message(self, msg):
82 for k in self.handlers.keys(): 80 for k in self.handlers.keys():
83 if k in msg: 81 if k in msg:
84 self.logger.debug('Handling %s' % k) 82 self.logger.debug("Handling %s" % k)
85 await self.handlers[k](msg[k]) 83 return await self.handlers[k](msg[k])
86 return
87 84
88 raise ClientError("Unrecognized command %r" % msg) 85 raise ClientError("Unrecognized command %r" % msg)
89 86
90 def write_message(self, msg): 87 async def handle_ping(self, request):
91 for c in chunkify(json.dumps(msg), self.max_chunk): 88 return {"alive": True}
92 self.writer.write(c.encode('utf-8'))
93 89
94 async def read_message(self):
95 l = await self.reader.readline()
96 if not l:
97 return None
98 90
99 try: 91class StreamServer(object):
100 message = l.decode('utf-8') 92 def __init__(self, handler, logger):
93 self.handler = handler
94 self.logger = logger
95 self.closed = False
101 96
102 if not message.endswith('\n'): 97 async def handle_stream_client(self, reader, writer):
103 return None 98 # writer.transport.set_write_buffer_limits(0)
99 socket = StreamConnection(reader, writer, -1)
100 if self.closed:
101 await socket.close()
102 return
103
104 await self.handler(socket)
105
106 async def stop(self):
107 self.closed = True
108
109
110class TCPStreamServer(StreamServer):
111 def __init__(self, host, port, handler, logger):
112 super().__init__(handler, logger)
113 self.host = host
114 self.port = port
115
116 def start(self, loop):
117 self.server = loop.run_until_complete(
118 asyncio.start_server(self.handle_stream_client, self.host, self.port)
119 )
120
121 for s in self.server.sockets:
122 self.logger.debug("Listening on %r" % (s.getsockname(),))
123 # Newer python does this automatically. Do it manually here for
124 # maximum compatibility
125 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
126 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
127
128 # Enable keep alives. This prevents broken client connections
129 # from persisting on the server for long periods of time.
130 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
131 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
132 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
133 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
134
135 name = self.server.sockets[0].getsockname()
136 if self.server.sockets[0].family == socket.AF_INET6:
137 self.address = "[%s]:%d" % (name[0], name[1])
138 else:
139 self.address = "%s:%d" % (name[0], name[1])
140
141 return [self.server.wait_closed()]
142
143 async def stop(self):
144 await super().stop()
145 self.server.close()
146
147 def cleanup(self):
148 pass
104 149
105 return json.loads(message)
106 except (json.JSONDecodeError, UnicodeDecodeError) as e:
107 self.logger.error('Bad message from client: %r' % message)
108 raise e
109 150
110 async def handle_chunk(self, request): 151class UnixStreamServer(StreamServer):
111 lines = [] 152 def __init__(self, path, handler, logger):
112 try: 153 super().__init__(handler, logger)
113 while True: 154 self.path = path
114 l = await self.reader.readline()
115 l = l.rstrip(b"\n").decode("utf-8")
116 if not l:
117 break
118 lines.append(l)
119 155
120 msg = json.loads(''.join(lines)) 156 def start(self, loop):
121 except (json.JSONDecodeError, UnicodeDecodeError) as e: 157 cwd = os.getcwd()
122 self.logger.error('Bad message from client: %r' % lines) 158 try:
123 raise e 159 # Work around path length limits in AF_UNIX
160 os.chdir(os.path.dirname(self.path))
161 self.server = loop.run_until_complete(
162 asyncio.start_unix_server(
163 self.handle_stream_client, os.path.basename(self.path)
164 )
165 )
166 finally:
167 os.chdir(cwd)
124 168
125 if 'chunk-stream' in msg: 169 self.logger.debug("Listening on %r" % self.path)
126 raise ClientError("Nested chunks are not allowed") 170 self.address = "unix://%s" % os.path.abspath(self.path)
171 return [self.server.wait_closed()]
127 172
128 await self.dispatch_message(msg) 173 async def stop(self):
174 await super().stop()
175 self.server.close()
129 176
130 async def handle_ping(self, request): 177 def cleanup(self):
131 response = {'alive': True} 178 os.unlink(self.path)
132 self.write_message(response)
133 179
134 180
135class AsyncServer(object): 181class AsyncServer(object):
136 def __init__(self, logger): 182 def __init__(self, logger):
137 self._cleanup_socket = None
138 self.logger = logger 183 self.logger = logger
139 self.start = None
140 self.address = None
141 self.loop = None 184 self.loop = None
185 self.run_tasks = []
142 186
143 def start_tcp_server(self, host, port): 187 def start_tcp_server(self, host, port):
144 def start_tcp(): 188 self.server = TCPStreamServer(host, port, self._client_handler, self.logger)
145 self.server = self.loop.run_until_complete(
146 asyncio.start_server(self.handle_client, host, port)
147 )
148
149 for s in self.server.sockets:
150 self.logger.debug('Listening on %r' % (s.getsockname(),))
151 # Newer python does this automatically. Do it manually here for
152 # maximum compatibility
153 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
154 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
155
156 # Enable keep alives. This prevents broken client connections
157 # from persisting on the server for long periods of time.
158 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
159 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
160 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
161 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
162
163 name = self.server.sockets[0].getsockname()
164 if self.server.sockets[0].family == socket.AF_INET6:
165 self.address = "[%s]:%d" % (name[0], name[1])
166 else:
167 self.address = "%s:%d" % (name[0], name[1])
168
169 self.start = start_tcp
170 189
171 def start_unix_server(self, path): 190 def start_unix_server(self, path):
172 def cleanup(): 191 self.server = UnixStreamServer(path, self._client_handler, self.logger)
173 os.unlink(path)
174
175 def start_unix():
176 cwd = os.getcwd()
177 try:
178 # Work around path length limits in AF_UNIX
179 os.chdir(os.path.dirname(path))
180 self.server = self.loop.run_until_complete(
181 asyncio.start_unix_server(self.handle_client, os.path.basename(path))
182 )
183 finally:
184 os.chdir(cwd)
185
186 self.logger.debug('Listening on %r' % path)
187 192
188 self._cleanup_socket = cleanup 193 async def _client_handler(self, socket):
189 self.address = "unix://%s" % os.path.abspath(path)
190
191 self.start = start_unix
192
193 @abc.abstractmethod
194 def accept_client(self, reader, writer):
195 pass
196
197 async def handle_client(self, reader, writer):
198 # writer.transport.set_write_buffer_limits(0)
199 try: 194 try:
200 client = self.accept_client(reader, writer) 195 client = self.accept_client(socket)
201 await client.process_requests() 196 await client.process_requests()
202 except Exception as e: 197 except Exception as e:
203 import traceback 198 import traceback
204 self.logger.error('Error from client: %s' % str(e), exc_info=True) 199
200 self.logger.error("Error from client: %s" % str(e), exc_info=True)
205 traceback.print_exc() 201 traceback.print_exc()
206 writer.close() 202 await socket.close()
207 self.logger.debug('Client disconnected') 203 self.logger.debug("Client disconnected")
208 204
209 def run_loop_forever(self): 205 @abc.abstractmethod
210 try: 206 def accept_client(self, socket):
211 self.loop.run_forever() 207 pass
212 except KeyboardInterrupt: 208
213 pass 209 async def stop(self):
210 self.logger.debug("Stopping server")
211 await self.server.stop()
212
213 def start(self):
214 tasks = self.server.start(self.loop)
215 self.address = self.server.address
216 return tasks
214 217
215 def signal_handler(self): 218 def signal_handler(self):
216 self.logger.debug("Got exit signal") 219 self.logger.debug("Got exit signal")
217 self.loop.stop() 220 self.loop.create_task(self.stop())
218 221
219 def _serve_forever(self): 222 def _serve_forever(self, tasks):
220 try: 223 try:
221 self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) 224 self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
225 self.loop.add_signal_handler(signal.SIGINT, self.signal_handler)
226 self.loop.add_signal_handler(signal.SIGQUIT, self.signal_handler)
222 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) 227 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
223 228
224 self.run_loop_forever() 229 self.loop.run_until_complete(asyncio.gather(*tasks))
225 self.server.close()
226 230
227 self.loop.run_until_complete(self.server.wait_closed()) 231 self.logger.debug("Server shutting down")
228 self.logger.debug('Server shutting down')
229 finally: 232 finally:
230 if self._cleanup_socket is not None: 233 self.server.cleanup()
231 self._cleanup_socket()
232 234
233 def serve_forever(self): 235 def serve_forever(self):
234 """ 236 """
235 Serve requests in the current process 237 Serve requests in the current process
236 """ 238 """
239 self._create_loop()
240 tasks = self.start()
241 self._serve_forever(tasks)
242 self.loop.close()
243
244 def _create_loop(self):
237 # Create loop and override any loop that may have existed in 245 # Create loop and override any loop that may have existed in
238 # a parent process. It is possible that the usecases of 246 # a parent process. It is possible that the usecases of
239 # serve_forever might be constrained enough to allow using 247 # serve_forever might be constrained enough to allow using
240 # get_event_loop here, but better safe than sorry for now. 248 # get_event_loop here, but better safe than sorry for now.
241 self.loop = asyncio.new_event_loop() 249 self.loop = asyncio.new_event_loop()
242 asyncio.set_event_loop(self.loop) 250 asyncio.set_event_loop(self.loop)
243 self.start()
244 self._serve_forever()
245 251
246 def serve_as_process(self, *, prefunc=None, args=()): 252 def serve_as_process(self, *, prefunc=None, args=()):
247 """ 253 """
248 Serve requests in a child process 254 Serve requests in a child process
249 """ 255 """
256
250 def run(queue): 257 def run(queue):
251 # Create loop and override any loop that may have existed 258 # Create loop and override any loop that may have existed
252 # in a parent process. Without doing this and instead 259 # in a parent process. Without doing this and instead
@@ -259,18 +266,19 @@ class AsyncServer(object):
259 # more general, though, as any potential use of asyncio in 266 # more general, though, as any potential use of asyncio in
260 # Cooker could create a loop that needs to replaced in this 267 # Cooker could create a loop that needs to replaced in this
261 # new process. 268 # new process.
262 self.loop = asyncio.new_event_loop() 269 self._create_loop()
263 asyncio.set_event_loop(self.loop)
264 try: 270 try:
265 self.start() 271 self.address = None
272 tasks = self.start()
266 finally: 273 finally:
274 # Always put the server address to wake up the parent task
267 queue.put(self.address) 275 queue.put(self.address)
268 queue.close() 276 queue.close()
269 277
270 if prefunc is not None: 278 if prefunc is not None:
271 prefunc(self, *args) 279 prefunc(self, *args)
272 280
273 self._serve_forever() 281 self._serve_forever(tasks)
274 282
275 if sys.version_info >= (3, 6): 283 if sys.version_info >= (3, 6):
276 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 284 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index 9cb3fd57a5..3a4018353f 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -15,13 +15,6 @@ UNIX_PREFIX = "unix://"
15ADDR_TYPE_UNIX = 0 15ADDR_TYPE_UNIX = 0
16ADDR_TYPE_TCP = 1 16ADDR_TYPE_TCP = 1
17 17
18# The Python async server defaults to a 64K receive buffer, so we hardcode our
19# maximum chunk size. It would be better if the client and server reported to
20# each other what the maximum chunk sizes were, but that will slow down the
21# connection setup with a round trip delay so I'd rather not do that unless it
22# is necessary
23DEFAULT_MAX_CHUNK = 32 * 1024
24
25UNIHASH_TABLE_DEFINITION = ( 18UNIHASH_TABLE_DEFINITION = (
26 ("method", "TEXT NOT NULL", "UNIQUE"), 19 ("method", "TEXT NOT NULL", "UNIQUE"),
27 ("taskhash", "TEXT NOT NULL", "UNIQUE"), 20 ("taskhash", "TEXT NOT NULL", "UNIQUE"),
@@ -102,20 +95,6 @@ def parse_address(addr):
102 return (ADDR_TYPE_TCP, (host, int(port))) 95 return (ADDR_TYPE_TCP, (host, int(port)))
103 96
104 97
105def chunkify(msg, max_chunk):
106 if len(msg) < max_chunk - 1:
107 yield ''.join((msg, "\n"))
108 else:
109 yield ''.join((json.dumps({
110 'chunk-stream': None
111 }), "\n"))
112
113 args = [iter(msg)] * (max_chunk - 1)
114 for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
115 yield ''.join(itertools.chain(m, "\n"))
116 yield "\n"
117
118
119def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): 98def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
120 from . import server 99 from . import server
121 db = setup_database(dbname, sync=sync) 100 db = setup_database(dbname, sync=sync)
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index f676d267fa..5f7d22ab13 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -28,24 +28,24 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
28 28
29 async def send_stream(self, msg): 29 async def send_stream(self, msg):
30 async def proc(): 30 async def proc():
31 self.writer.write(("%s\n" % msg).encode("utf-8")) 31 await self.socket.send(msg)
32 await self.writer.drain() 32 return await self.socket.recv()
33 l = await self.reader.readline()
34 if not l:
35 raise ConnectionError("Connection closed")
36 return l.decode("utf-8").rstrip()
37 33
38 return await self._send_wrapper(proc) 34 return await self._send_wrapper(proc)
39 35
40 async def _set_mode(self, new_mode): 36 async def _set_mode(self, new_mode):
37 async def stream_to_normal():
38 await self.socket.send("END")
39 return await self.socket.recv()
40
41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
42 r = await self.send_stream("END") 42 r = await self._send_wrapper(stream_to_normal)
43 if r != "ok": 43 if r != "ok":
44 raise ConnectionError("Bad response from server %r" % r) 44 raise ConnectionError("Unable to transition to normal mode: Bad response from server %r" % r)
45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
46 r = await self.send_message({"get-stream": None}) 46 r = await self.invoke({"get-stream": None})
47 if r != "ok": 47 if r != "ok":
48 raise ConnectionError("Bad response from server %r" % r) 48 raise ConnectionError("Unable to transition to stream mode: Bad response from server %r" % r)
49 elif new_mode != self.mode: 49 elif new_mode != self.mode:
50 raise Exception( 50 raise Exception(
51 "Undefined mode transition %r -> %r" % (self.mode, new_mode) 51 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
@@ -67,7 +67,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
67 m["method"] = method 67 m["method"] = method
68 m["outhash"] = outhash 68 m["outhash"] = outhash
69 m["unihash"] = unihash 69 m["unihash"] = unihash
70 return await self.send_message({"report": m}) 70 return await self.invoke({"report": m})
71 71
72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
73 await self._set_mode(self.MODE_NORMAL) 73 await self._set_mode(self.MODE_NORMAL)
@@ -75,39 +75,39 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
75 m["taskhash"] = taskhash 75 m["taskhash"] = taskhash
76 m["method"] = method 76 m["method"] = method
77 m["unihash"] = unihash 77 m["unihash"] = unihash
78 return await self.send_message({"report-equiv": m}) 78 return await self.invoke({"report-equiv": m})
79 79
80 async def get_taskhash(self, method, taskhash, all_properties=False): 80 async def get_taskhash(self, method, taskhash, all_properties=False):
81 await self._set_mode(self.MODE_NORMAL) 81 await self._set_mode(self.MODE_NORMAL)
82 return await self.send_message( 82 return await self.invoke(
83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
84 ) 84 )
85 85
86 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 86 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
87 await self._set_mode(self.MODE_NORMAL) 87 await self._set_mode(self.MODE_NORMAL)
88 return await self.send_message( 88 return await self.invoke(
89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}} 89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method, "with_unihash": with_unihash}}
90 ) 90 )
91 91
92 async def get_stats(self): 92 async def get_stats(self):
93 await self._set_mode(self.MODE_NORMAL) 93 await self._set_mode(self.MODE_NORMAL)
94 return await self.send_message({"get-stats": None}) 94 return await self.invoke({"get-stats": None})
95 95
96 async def reset_stats(self): 96 async def reset_stats(self):
97 await self._set_mode(self.MODE_NORMAL) 97 await self._set_mode(self.MODE_NORMAL)
98 return await self.send_message({"reset-stats": None}) 98 return await self.invoke({"reset-stats": None})
99 99
100 async def backfill_wait(self): 100 async def backfill_wait(self):
101 await self._set_mode(self.MODE_NORMAL) 101 await self._set_mode(self.MODE_NORMAL)
102 return (await self.send_message({"backfill-wait": None}))["tasks"] 102 return (await self.invoke({"backfill-wait": None}))["tasks"]
103 103
104 async def remove(self, where): 104 async def remove(self, where):
105 await self._set_mode(self.MODE_NORMAL) 105 await self._set_mode(self.MODE_NORMAL)
106 return await self.send_message({"remove": {"where": where}}) 106 return await self.invoke({"remove": {"where": where}})
107 107
108 async def clean_unused(self, max_age): 108 async def clean_unused(self, max_age):
109 await self._set_mode(self.MODE_NORMAL) 109 await self._set_mode(self.MODE_NORMAL)
110 return await self.send_message({"clean-unused": {"max_age_seconds": max_age}}) 110 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
111 111
112 112
113class Client(bb.asyncrpc.Client): 113class Client(bb.asyncrpc.Client):
diff --git a/bitbake/lib/hashserv/server.py b/bitbake/lib/hashserv/server.py
index 45bf476bfe..13b754805b 100644
--- a/bitbake/lib/hashserv/server.py
+++ b/bitbake/lib/hashserv/server.py
@@ -165,8 +165,8 @@ class ServerCursor(object):
165 165
166 166
167class ServerClient(bb.asyncrpc.AsyncServerConnection): 167class ServerClient(bb.asyncrpc.AsyncServerConnection):
168 def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): 168 def __init__(self, socket, db, request_stats, backfill_queue, upstream, read_only):
169 super().__init__(reader, writer, 'OEHASHEQUIV', logger) 169 super().__init__(socket, 'OEHASHEQUIV', logger)
170 self.db = db 170 self.db = db
171 self.request_stats = request_stats 171 self.request_stats = request_stats
172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 172 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
@@ -209,12 +209,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
209 if k in msg: 209 if k in msg:
210 logger.debug('Handling %s' % k) 210 logger.debug('Handling %s' % k)
211 if 'stream' in k: 211 if 'stream' in k:
212 await self.handlers[k](msg[k]) 212 return await self.handlers[k](msg[k])
213 else: 213 else:
214 with self.request_stats.start_sample() as self.request_sample, \ 214 with self.request_stats.start_sample() as self.request_sample, \
215 self.request_sample.measure(): 215 self.request_sample.measure():
216 await self.handlers[k](msg[k]) 216 return await self.handlers[k](msg[k])
217 return
218 217
219 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 218 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
220 219
@@ -224,9 +223,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
224 fetch_all = request.get('all', False) 223 fetch_all = request.get('all', False)
225 224
226 with closing(self.db.cursor()) as cursor: 225 with closing(self.db.cursor()) as cursor:
227 d = await self.get_unihash(cursor, method, taskhash, fetch_all) 226 return await self.get_unihash(cursor, method, taskhash, fetch_all)
228
229 self.write_message(d)
230 227
231 async def get_unihash(self, cursor, method, taskhash, fetch_all=False): 228 async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
232 d = None 229 d = None
@@ -274,9 +271,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
274 with_unihash = request.get("with_unihash", True) 271 with_unihash = request.get("with_unihash", True)
275 272
276 with closing(self.db.cursor()) as cursor: 273 with closing(self.db.cursor()) as cursor:
277 d = await self.get_outhash(cursor, method, outhash, taskhash, with_unihash) 274 return await self.get_outhash(cursor, method, outhash, taskhash, with_unihash)
278
279 self.write_message(d)
280 275
281 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True): 276 async def get_outhash(self, cursor, method, outhash, taskhash, with_unihash=True):
282 d = None 277 d = None
@@ -334,14 +329,14 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
334 ) 329 )
335 330
336 async def handle_get_stream(self, request): 331 async def handle_get_stream(self, request):
337 self.write_message('ok') 332 await self.socket.send_message("ok")
338 333
339 while True: 334 while True:
340 upstream = None 335 upstream = None
341 336
342 l = await self.reader.readline() 337 l = await self.socket.recv()
343 if not l: 338 if not l:
344 return 339 break
345 340
346 try: 341 try:
347 # This inner loop is very sensitive and must be as fast as 342 # This inner loop is very sensitive and must be as fast as
@@ -352,10 +347,8 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
352 request_measure = self.request_sample.measure() 347 request_measure = self.request_sample.measure()
353 request_measure.start() 348 request_measure.start()
354 349
355 l = l.decode('utf-8').rstrip()
356 if l == 'END': 350 if l == 'END':
357 self.writer.write('ok\n'.encode('utf-8')) 351 break
358 return
359 352
360 (method, taskhash) = l.split() 353 (method, taskhash) = l.split()
361 #logger.debug('Looking up %s %s' % (method, taskhash)) 354 #logger.debug('Looking up %s %s' % (method, taskhash))
@@ -366,29 +359,30 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
366 cursor.close() 359 cursor.close()
367 360
368 if row is not None: 361 if row is not None:
369 msg = ('%s\n' % row['unihash']).encode('utf-8') 362 msg = row['unihash']
370 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 363 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
371 elif self.upstream_client is not None: 364 elif self.upstream_client is not None:
372 upstream = await self.upstream_client.get_unihash(method, taskhash) 365 upstream = await self.upstream_client.get_unihash(method, taskhash)
373 if upstream: 366 if upstream:
374 msg = ("%s\n" % upstream).encode("utf-8") 367 msg = upstream
375 else: 368 else:
376 msg = "\n".encode("utf-8") 369 msg = ""
377 else: 370 else:
378 msg = '\n'.encode('utf-8') 371 msg = ""
379 372
380 self.writer.write(msg) 373 await self.socket.send(msg)
381 finally: 374 finally:
382 request_measure.end() 375 request_measure.end()
383 self.request_sample.end() 376 self.request_sample.end()
384 377
385 await self.writer.drain()
386
387 # Post to the backfill queue after writing the result to minimize 378 # Post to the backfill queue after writing the result to minimize
388 # the turn around time on a request 379 # the turn around time on a request
389 if upstream is not None: 380 if upstream is not None:
390 await self.backfill_queue.put((method, taskhash)) 381 await self.backfill_queue.put((method, taskhash))
391 382
383 await self.socket.send("ok")
384 return self.NO_RESPONSE
385
392 async def handle_report(self, data): 386 async def handle_report(self, data):
393 with closing(self.db.cursor()) as cursor: 387 with closing(self.db.cursor()) as cursor:
394 outhash_data = { 388 outhash_data = {
@@ -468,7 +462,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
468 'unihash': unihash, 462 'unihash': unihash,
469 } 463 }
470 464
471 self.write_message(d) 465 return d
472 466
473 async def handle_equivreport(self, data): 467 async def handle_equivreport(self, data):
474 with closing(self.db.cursor()) as cursor: 468 with closing(self.db.cursor()) as cursor:
@@ -491,30 +485,28 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
491 485
492 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 486 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
493 487
494 self.write_message(d) 488 return d
495 489
496 490
497 async def handle_get_stats(self, request): 491 async def handle_get_stats(self, request):
498 d = { 492 return {
499 'requests': self.request_stats.todict(), 493 'requests': self.request_stats.todict(),
500 } 494 }
501 495
502 self.write_message(d)
503
504 async def handle_reset_stats(self, request): 496 async def handle_reset_stats(self, request):
505 d = { 497 d = {
506 'requests': self.request_stats.todict(), 498 'requests': self.request_stats.todict(),
507 } 499 }
508 500
509 self.request_stats.reset() 501 self.request_stats.reset()
510 self.write_message(d) 502 return d
511 503
512 async def handle_backfill_wait(self, request): 504 async def handle_backfill_wait(self, request):
513 d = { 505 d = {
514 'tasks': self.backfill_queue.qsize(), 506 'tasks': self.backfill_queue.qsize(),
515 } 507 }
516 await self.backfill_queue.join() 508 await self.backfill_queue.join()
517 self.write_message(d) 509 return d
518 510
519 async def handle_remove(self, request): 511 async def handle_remove(self, request):
520 condition = request["where"] 512 condition = request["where"]
@@ -541,7 +533,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
541 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor) 533 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v2", cursor)
542 self.db.commit() 534 self.db.commit()
543 535
544 self.write_message({"count": count}) 536 return {"count": count}
545 537
546 async def handle_clean_unused(self, request): 538 async def handle_clean_unused(self, request):
547 max_age = request["max_age_seconds"] 539 max_age = request["max_age_seconds"]
@@ -558,7 +550,7 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection):
558 ) 550 )
559 count = cursor.rowcount 551 count = cursor.rowcount
560 552
561 self.write_message({"count": count}) 553 return {"count": count}
562 554
563 def query_equivalent(self, cursor, method, taskhash): 555 def query_equivalent(self, cursor, method, taskhash):
564 # This is part of the inner loop and must be as fast as possible 556 # This is part of the inner loop and must be as fast as possible
@@ -583,41 +575,33 @@ class Server(bb.asyncrpc.AsyncServer):
583 self.db = db 575 self.db = db
584 self.upstream = upstream 576 self.upstream = upstream
585 self.read_only = read_only 577 self.read_only = read_only
578 self.backfill_queue = None
586 579
587 def accept_client(self, reader, writer): 580 def accept_client(self, socket):
588 return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) 581 return ServerClient(socket, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
589 582
590 @contextmanager 583 async def backfill_worker_task(self):
591 def _backfill_worker(self): 584 client = await create_async_client(self.upstream)
592 async def backfill_worker_task(): 585 try:
593 client = await create_async_client(self.upstream) 586 while True:
594 try: 587 item = await self.backfill_queue.get()
595 while True: 588 if item is None:
596 item = await self.backfill_queue.get()
597 if item is None:
598 self.backfill_queue.task_done()
599 break
600 method, taskhash = item
601 await copy_unihash_from_upstream(client, self.db, method, taskhash)
602 self.backfill_queue.task_done() 589 self.backfill_queue.task_done()
603 finally: 590 break
604 await client.close() 591 method, taskhash = item
592 await copy_unihash_from_upstream(client, self.db, method, taskhash)
593 self.backfill_queue.task_done()
594 finally:
595 await client.close()
605 596
606 async def join_worker(worker): 597 def start(self):
598 tasks = super().start()
599 if self.upstream:
600 self.backfill_queue = asyncio.Queue()
601 tasks += [self.backfill_worker_task()]
602 return tasks
603
604 async def stop(self):
605 if self.backfill_queue is not None:
607 await self.backfill_queue.put(None) 606 await self.backfill_queue.put(None)
608 await worker 607 await super().stop()
609
610 if self.upstream is not None:
611 worker = asyncio.ensure_future(backfill_worker_task())
612 try:
613 yield
614 finally:
615 self.loop.run_until_complete(join_worker(worker))
616 else:
617 yield
618
619 def run_loop_forever(self):
620 self.backfill_queue = asyncio.Queue()
621
622 with self._backfill_worker():
623 super().run_loop_forever()
diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py
index 69ab7a4ac9..6b81356fac 100644
--- a/bitbake/lib/prserv/client.py
+++ b/bitbake/lib/prserv/client.py
@@ -14,28 +14,28 @@ class PRAsyncClient(bb.asyncrpc.AsyncClient):
14 super().__init__('PRSERVICE', '1.0', logger) 14 super().__init__('PRSERVICE', '1.0', logger)
15 15
16 async def getPR(self, version, pkgarch, checksum): 16 async def getPR(self, version, pkgarch, checksum):
17 response = await self.send_message( 17 response = await self.invoke(
18 {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} 18 {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}}
19 ) 19 )
20 if response: 20 if response:
21 return response['value'] 21 return response['value']
22 22
23 async def importone(self, version, pkgarch, checksum, value): 23 async def importone(self, version, pkgarch, checksum, value):
24 response = await self.send_message( 24 response = await self.invoke(
25 {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} 25 {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}}
26 ) 26 )
27 if response: 27 if response:
28 return response['value'] 28 return response['value']
29 29
30 async def export(self, version, pkgarch, checksum, colinfo): 30 async def export(self, version, pkgarch, checksum, colinfo):
31 response = await self.send_message( 31 response = await self.invoke(
32 {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} 32 {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}}
33 ) 33 )
34 if response: 34 if response:
35 return (response['metainfo'], response['datainfo']) 35 return (response['metainfo'], response['datainfo'])
36 36
37 async def is_readonly(self): 37 async def is_readonly(self):
38 response = await self.send_message( 38 response = await self.invoke(
39 {'is-readonly': {}} 39 {'is-readonly': {}}
40 ) 40 )
41 if response: 41 if response:
diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py
index c686b2065c..ea7933164b 100644
--- a/bitbake/lib/prserv/serv.py
+++ b/bitbake/lib/prserv/serv.py
@@ -20,8 +20,8 @@ PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
20singleton = None 20singleton = None
21 21
22class PRServerClient(bb.asyncrpc.AsyncServerConnection): 22class PRServerClient(bb.asyncrpc.AsyncServerConnection):
23 def __init__(self, reader, writer, table, read_only): 23 def __init__(self, socket, table, read_only):
24 super().__init__(reader, writer, 'PRSERVICE', logger) 24 super().__init__(socket, 'PRSERVICE', logger)
25 self.handlers.update({ 25 self.handlers.update({
26 'get-pr': self.handle_get_pr, 26 'get-pr': self.handle_get_pr,
27 'import-one': self.handle_import_one, 27 'import-one': self.handle_import_one,
@@ -36,12 +36,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
36 36
37 async def dispatch_message(self, msg): 37 async def dispatch_message(self, msg):
38 try: 38 try:
39 await super().dispatch_message(msg) 39 return await super().dispatch_message(msg)
40 except: 40 except:
41 self.table.sync() 41 self.table.sync()
42 raise 42 raise
43 43 else:
44 self.table.sync_if_dirty() 44 self.table.sync_if_dirty()
45 45
46 async def handle_get_pr(self, request): 46 async def handle_get_pr(self, request):
47 version = request['version'] 47 version = request['version']
@@ -57,7 +57,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
57 except sqlite3.Error as exc: 57 except sqlite3.Error as exc:
58 logger.error(str(exc)) 58 logger.error(str(exc))
59 59
60 self.write_message(response) 60 return response
61 61
62 async def handle_import_one(self, request): 62 async def handle_import_one(self, request):
63 response = None 63 response = None
@@ -71,7 +71,7 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
71 if value is not None: 71 if value is not None:
72 response = {'value': value} 72 response = {'value': value}
73 73
74 self.write_message(response) 74 return response
75 75
76 async def handle_export(self, request): 76 async def handle_export(self, request):
77 version = request['version'] 77 version = request['version']
@@ -85,12 +85,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
85 logger.error(str(exc)) 85 logger.error(str(exc))
86 metainfo = datainfo = None 86 metainfo = datainfo = None
87 87
88 response = {'metainfo': metainfo, 'datainfo': datainfo} 88 return {'metainfo': metainfo, 'datainfo': datainfo}
89 self.write_message(response)
90 89
91 async def handle_is_readonly(self, request): 90 async def handle_is_readonly(self, request):
92 response = {'readonly': self.read_only} 91 return {'readonly': self.read_only}
93 self.write_message(response)
94 92
95class PRServer(bb.asyncrpc.AsyncServer): 93class PRServer(bb.asyncrpc.AsyncServer):
96 def __init__(self, dbfile, read_only=False): 94 def __init__(self, dbfile, read_only=False):
@@ -99,20 +97,23 @@ class PRServer(bb.asyncrpc.AsyncServer):
99 self.table = None 97 self.table = None
100 self.read_only = read_only 98 self.read_only = read_only
101 99
102 def accept_client(self, reader, writer): 100 def accept_client(self, socket):
103 return PRServerClient(reader, writer, self.table, self.read_only) 101 return PRServerClient(socket, self.table, self.read_only)
104 102
105 def _serve_forever(self): 103 def start(self):
104 tasks = super().start()
106 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) 105 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
107 self.table = self.db["PRMAIN"] 106 self.table = self.db["PRMAIN"]
108 107
109 logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % 108 logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
110 (self.dbfile, self.address, str(os.getpid()))) 109 (self.dbfile, self.address, str(os.getpid())))
111 110
112 super()._serve_forever() 111 return tasks
113 112
113 async def stop(self):
114 self.table.sync_if_dirty() 114 self.table.sync_if_dirty()
115 self.db.disconnect() 115 self.db.disconnect()
116 await super().stop()
116 117
117 def signal_handler(self): 118 def signal_handler(self):
118 super().signal_handler() 119 super().signal_handler()