summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:20 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:33:02 +0000
commit2484bd893189bc24ac210edf2de3b7e1115eebd3 (patch)
treebe9433e53a5bb645816f5b78730830a73bc6ee10 /bitbake/lib/bb
parent8f8501ed403dec27acbe780b936bc087fc5006d0 (diff)
downloadpoky-2484bd893189bc24ac210edf2de3b7e1115eebd3.tar.gz
bitbake: hashserv: Add websocket connection implementation
Adds support to the hash equivalence client and server to communicate over websockets. Since websockets are message orientated instead of stream orientated, and new connection class is needed to handle them. Note that websocket support does require the 3rd party websockets python module be installed on the host, but it should not be required unless websockets are actually being used. (Bitbake rev: 56dd2fdbfb6350a9eef43a12aa529c8637887a7e) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb')
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py11
-rw-r--r--bitbake/lib/bb/asyncrpc/connection.py44
-rw-r--r--bitbake/lib/bb/asyncrpc/serv.py53
3 files changed, 106 insertions, 2 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index 7f33099b63..802c07df1f 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -10,7 +10,7 @@ import json
10import os 10import os
11import socket 11import socket
12import sys 12import sys
13from .connection import StreamConnection, DEFAULT_MAX_CHUNK 13from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
14from .exceptions import ConnectionClosedError 14from .exceptions import ConnectionClosedError
15 15
16 16
@@ -47,6 +47,15 @@ class AsyncClient(object):
47 47
48 self._connect_sock = connect_sock 48 self._connect_sock = connect_sock
49 49
50 async def connect_websocket(self, uri):
51 import websockets
52
53 async def connect_sock():
54 websocket = await websockets.connect(uri, ping_interval=None)
55 return WebsocketConnection(websocket, self.timeout)
56
57 self._connect_sock = connect_sock
58
50 async def setup_connection(self): 59 async def setup_connection(self):
51 # Send headers 60 # Send headers
52 await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) 61 await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py
index c4fd24754c..a10628f75a 100644
--- a/bitbake/lib/bb/asyncrpc/connection.py
+++ b/bitbake/lib/bb/asyncrpc/connection.py
@@ -93,3 +93,47 @@ class StreamConnection(object):
93 if self.writer is not None: 93 if self.writer is not None:
94 self.writer.close() 94 self.writer.close()
95 self.writer = None 95 self.writer = None
96
97
98class WebsocketConnection(object):
99 def __init__(self, socket, timeout):
100 self.socket = socket
101 self.timeout = timeout
102
103 @property
104 def address(self):
105 return ":".join(str(s) for s in self.socket.remote_address)
106
107 async def send_message(self, msg):
108 await self.send(json.dumps(msg))
109
110 async def recv_message(self):
111 m = await self.recv()
112 return json.loads(m)
113
114 async def send(self, msg):
115 import websockets.exceptions
116
117 try:
118 await self.socket.send(msg)
119 except websockets.exceptions.ConnectionClosed:
120 raise ConnectionClosedError("Connection closed")
121
122 async def recv(self):
123 import websockets.exceptions
124
125 try:
126 if self.timeout < 0:
127 return await self.socket.recv()
128
129 try:
130 return await asyncio.wait_for(self.socket.recv(), self.timeout)
131 except asyncio.TimeoutError:
132 raise ConnectionError("Timed out waiting for data")
133 except websockets.exceptions.ConnectionClosed:
134 raise ConnectionClosedError("Connection closed")
135
136 async def close(self):
137 if self.socket is not None:
138 await self.socket.close()
139 self.socket = None
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py
index 3e0d0632cb..dfb0377380 100644
--- a/bitbake/lib/bb/asyncrpc/serv.py
+++ b/bitbake/lib/bb/asyncrpc/serv.py
@@ -12,7 +12,7 @@ import signal
12import socket 12import socket
13import sys 13import sys
14import multiprocessing 14import multiprocessing
15from .connection import StreamConnection 15from .connection import StreamConnection, WebsocketConnection
16from .exceptions import ClientError, ServerError, ConnectionClosedError 16from .exceptions import ClientError, ServerError, ConnectionClosedError
17 17
18 18
@@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer):
178 os.unlink(self.path) 178 os.unlink(self.path)
179 179
180 180
181class WebsocketsServer(object):
182 def __init__(self, host, port, handler, logger):
183 self.host = host
184 self.port = port
185 self.handler = handler
186 self.logger = logger
187
188 def start(self, loop):
189 import websockets.server
190
191 self.server = loop.run_until_complete(
192 websockets.server.serve(
193 self.client_handler,
194 self.host,
195 self.port,
196 ping_interval=None,
197 )
198 )
199
200 for s in self.server.sockets:
201 self.logger.debug("Listening on %r" % (s.getsockname(),))
202
203 # Enable keep alives. This prevents broken client connections
204 # from persisting on the server for long periods of time.
205 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
206 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
207 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15)
208 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4)
209
210 name = self.server.sockets[0].getsockname()
211 if self.server.sockets[0].family == socket.AF_INET6:
212 self.address = "ws://[%s]:%d" % (name[0], name[1])
213 else:
214 self.address = "ws://%s:%d" % (name[0], name[1])
215
216 return [self.server.wait_closed()]
217
218 async def stop(self):
219 self.server.close()
220
221 def cleanup(self):
222 pass
223
224 async def client_handler(self, websocket):
225 socket = WebsocketConnection(websocket, -1)
226 await self.handler(socket)
227
228
181class AsyncServer(object): 229class AsyncServer(object):
182 def __init__(self, logger): 230 def __init__(self, logger):
183 self.logger = logger 231 self.logger = logger
@@ -190,6 +238,9 @@ class AsyncServer(object):
190 def start_unix_server(self, path): 238 def start_unix_server(self, path):
191 self.server = UnixStreamServer(path, self._client_handler, self.logger) 239 self.server = UnixStreamServer(path, self._client_handler, self.logger)
192 240
241 def start_websocket_server(self, host, port):
242 self.server = WebsocketsServer(host, port, self._client_handler, self.logger)
243
193 async def _client_handler(self, socket): 244 async def _client_handler(self, socket):
194 try: 245 try:
195 client = self.accept_client(socket) 246 client = self.accept_client(socket)