diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2023-11-03 08:26:20 -0600 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2023-11-09 17:33:02 +0000 |
commit | 2484bd893189bc24ac210edf2de3b7e1115eebd3 (patch) | |
tree | be9433e53a5bb645816f5b78730830a73bc6ee10 /bitbake/lib | |
parent | 8f8501ed403dec27acbe780b936bc087fc5006d0 (diff) | |
download | poky-2484bd893189bc24ac210edf2de3b7e1115eebd3.tar.gz |
bitbake: hashserv: Add websocket connection implementation
Adds support to the hash equivalence client and server to communicate
over websockets. Since websockets are message orientated instead of
stream orientated, and new connection class is needed to handle them.
Note that websocket support does require the 3rd party websockets python
module be installed on the host, but it should not be required unless
websockets are actually being used.
(Bitbake rev: 56dd2fdbfb6350a9eef43a12aa529c8637887a7e)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 11 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 44 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/serv.py | 53 | ||||
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 13 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 1 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 17 |
6 files changed, 137 insertions, 2 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 7f33099b63..802c07df1f 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -10,7 +10,7 @@ import json | |||
10 | import os | 10 | import os |
11 | import socket | 11 | import socket |
12 | import sys | 12 | import sys |
13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK |
14 | from .exceptions import ConnectionClosedError | 14 | from .exceptions import ConnectionClosedError |
15 | 15 | ||
16 | 16 | ||
@@ -47,6 +47,15 @@ class AsyncClient(object): | |||
47 | 47 | ||
48 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
49 | 49 | ||
50 | async def connect_websocket(self, uri): | ||
51 | import websockets | ||
52 | |||
53 | async def connect_sock(): | ||
54 | websocket = await websockets.connect(uri, ping_interval=None) | ||
55 | return WebsocketConnection(websocket, self.timeout) | ||
56 | |||
57 | self._connect_sock = connect_sock | ||
58 | |||
50 | async def setup_connection(self): | 59 | async def setup_connection(self): |
51 | # Send headers | 60 | # Send headers |
52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) | 61 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index c4fd24754c..a10628f75a 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
@@ -93,3 +93,47 @@ class StreamConnection(object): | |||
93 | if self.writer is not None: | 93 | if self.writer is not None: |
94 | self.writer.close() | 94 | self.writer.close() |
95 | self.writer = None | 95 | self.writer = None |
96 | |||
97 | |||
98 | class WebsocketConnection(object): | ||
99 | def __init__(self, socket, timeout): | ||
100 | self.socket = socket | ||
101 | self.timeout = timeout | ||
102 | |||
103 | @property | ||
104 | def address(self): | ||
105 | return ":".join(str(s) for s in self.socket.remote_address) | ||
106 | |||
107 | async def send_message(self, msg): | ||
108 | await self.send(json.dumps(msg)) | ||
109 | |||
110 | async def recv_message(self): | ||
111 | m = await self.recv() | ||
112 | return json.loads(m) | ||
113 | |||
114 | async def send(self, msg): | ||
115 | import websockets.exceptions | ||
116 | |||
117 | try: | ||
118 | await self.socket.send(msg) | ||
119 | except websockets.exceptions.ConnectionClosed: | ||
120 | raise ConnectionClosedError("Connection closed") | ||
121 | |||
122 | async def recv(self): | ||
123 | import websockets.exceptions | ||
124 | |||
125 | try: | ||
126 | if self.timeout < 0: | ||
127 | return await self.socket.recv() | ||
128 | |||
129 | try: | ||
130 | return await asyncio.wait_for(self.socket.recv(), self.timeout) | ||
131 | except asyncio.TimeoutError: | ||
132 | raise ConnectionError("Timed out waiting for data") | ||
133 | except websockets.exceptions.ConnectionClosed: | ||
134 | raise ConnectionClosedError("Connection closed") | ||
135 | |||
136 | async def close(self): | ||
137 | if self.socket is not None: | ||
138 | await self.socket.close() | ||
139 | self.socket = None | ||
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index 3e0d0632cb..dfb0377380 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py | |||
@@ -12,7 +12,7 @@ import signal | |||
12 | import socket | 12 | import socket |
13 | import sys | 13 | import sys |
14 | import multiprocessing | 14 | import multiprocessing |
15 | from .connection import StreamConnection | 15 | from .connection import StreamConnection, WebsocketConnection |
16 | from .exceptions import ClientError, ServerError, ConnectionClosedError | 16 | from .exceptions import ClientError, ServerError, ConnectionClosedError |
17 | 17 | ||
18 | 18 | ||
@@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer): | |||
178 | os.unlink(self.path) | 178 | os.unlink(self.path) |
179 | 179 | ||
180 | 180 | ||
181 | class WebsocketsServer(object): | ||
182 | def __init__(self, host, port, handler, logger): | ||
183 | self.host = host | ||
184 | self.port = port | ||
185 | self.handler = handler | ||
186 | self.logger = logger | ||
187 | |||
188 | def start(self, loop): | ||
189 | import websockets.server | ||
190 | |||
191 | self.server = loop.run_until_complete( | ||
192 | websockets.server.serve( | ||
193 | self.client_handler, | ||
194 | self.host, | ||
195 | self.port, | ||
196 | ping_interval=None, | ||
197 | ) | ||
198 | ) | ||
199 | |||
200 | for s in self.server.sockets: | ||
201 | self.logger.debug("Listening on %r" % (s.getsockname(),)) | ||
202 | |||
203 | # Enable keep alives. This prevents broken client connections | ||
204 | # from persisting on the server for long periods of time. | ||
205 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
206 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
207 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
208 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
209 | |||
210 | name = self.server.sockets[0].getsockname() | ||
211 | if self.server.sockets[0].family == socket.AF_INET6: | ||
212 | self.address = "ws://[%s]:%d" % (name[0], name[1]) | ||
213 | else: | ||
214 | self.address = "ws://%s:%d" % (name[0], name[1]) | ||
215 | |||
216 | return [self.server.wait_closed()] | ||
217 | |||
218 | async def stop(self): | ||
219 | self.server.close() | ||
220 | |||
221 | def cleanup(self): | ||
222 | pass | ||
223 | |||
224 | async def client_handler(self, websocket): | ||
225 | socket = WebsocketConnection(websocket, -1) | ||
226 | await self.handler(socket) | ||
227 | |||
228 | |||
181 | class AsyncServer(object): | 229 | class AsyncServer(object): |
182 | def __init__(self, logger): | 230 | def __init__(self, logger): |
183 | self.logger = logger | 231 | self.logger = logger |
@@ -190,6 +238,9 @@ class AsyncServer(object): | |||
190 | def start_unix_server(self, path): | 238 | def start_unix_server(self, path): |
191 | self.server = UnixStreamServer(path, self._client_handler, self.logger) | 239 | self.server = UnixStreamServer(path, self._client_handler, self.logger) |
192 | 240 | ||
241 | def start_websocket_server(self, host, port): | ||
242 | self.server = WebsocketsServer(host, port, self._client_handler, self.logger) | ||
243 | |||
193 | async def _client_handler(self, socket): | 244 | async def _client_handler(self, socket): |
194 | try: | 245 | try: |
195 | client = self.accept_client(socket) | 246 | client = self.accept_client(socket) |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 3a4018353f..56b9c6bc82 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -9,11 +9,15 @@ import re | |||
9 | import sqlite3 | 9 | import sqlite3 |
10 | import itertools | 10 | import itertools |
11 | import json | 11 | import json |
12 | from urllib.parse import urlparse | ||
12 | 13 | ||
13 | UNIX_PREFIX = "unix://" | 14 | UNIX_PREFIX = "unix://" |
15 | WS_PREFIX = "ws://" | ||
16 | WSS_PREFIX = "wss://" | ||
14 | 17 | ||
15 | ADDR_TYPE_UNIX = 0 | 18 | ADDR_TYPE_UNIX = 0 |
16 | ADDR_TYPE_TCP = 1 | 19 | ADDR_TYPE_TCP = 1 |
20 | ADDR_TYPE_WS = 2 | ||
17 | 21 | ||
18 | UNIHASH_TABLE_DEFINITION = ( | 22 | UNIHASH_TABLE_DEFINITION = ( |
19 | ("method", "TEXT NOT NULL", "UNIQUE"), | 23 | ("method", "TEXT NOT NULL", "UNIQUE"), |
@@ -84,6 +88,8 @@ def setup_database(database, sync=True): | |||
84 | def parse_address(addr): | 88 | def parse_address(addr): |
85 | if addr.startswith(UNIX_PREFIX): | 89 | if addr.startswith(UNIX_PREFIX): |
86 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) | 90 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) |
91 | elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): | ||
92 | return (ADDR_TYPE_WS, (addr,)) | ||
87 | else: | 93 | else: |
88 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) | 94 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) |
89 | if m is not None: | 95 | if m is not None: |
@@ -103,6 +109,9 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | |||
103 | (typ, a) = parse_address(addr) | 109 | (typ, a) = parse_address(addr) |
104 | if typ == ADDR_TYPE_UNIX: | 110 | if typ == ADDR_TYPE_UNIX: |
105 | s.start_unix_server(*a) | 111 | s.start_unix_server(*a) |
112 | elif typ == ADDR_TYPE_WS: | ||
113 | url = urlparse(a[0]) | ||
114 | s.start_websocket_server(url.hostname, url.port) | ||
106 | else: | 115 | else: |
107 | s.start_tcp_server(*a) | 116 | s.start_tcp_server(*a) |
108 | 117 | ||
@@ -116,6 +125,8 @@ def create_client(addr): | |||
116 | (typ, a) = parse_address(addr) | 125 | (typ, a) = parse_address(addr) |
117 | if typ == ADDR_TYPE_UNIX: | 126 | if typ == ADDR_TYPE_UNIX: |
118 | c.connect_unix(*a) | 127 | c.connect_unix(*a) |
128 | elif typ == ADDR_TYPE_WS: | ||
129 | c.connect_websocket(*a) | ||
119 | else: | 130 | else: |
120 | c.connect_tcp(*a) | 131 | c.connect_tcp(*a) |
121 | 132 | ||
@@ -128,6 +139,8 @@ async def create_async_client(addr): | |||
128 | (typ, a) = parse_address(addr) | 139 | (typ, a) = parse_address(addr) |
129 | if typ == ADDR_TYPE_UNIX: | 140 | if typ == ADDR_TYPE_UNIX: |
130 | await c.connect_unix(*a) | 141 | await c.connect_unix(*a) |
142 | elif typ == ADDR_TYPE_WS: | ||
143 | await c.connect_websocket(*a) | ||
131 | else: | 144 | else: |
132 | await c.connect_tcp(*a) | 145 | await c.connect_tcp(*a) |
133 | 146 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 5f7d22ab13..9542d72f6c 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client): | |||
115 | super().__init__() | 115 | super().__init__() |
116 | self._add_methods( | 116 | self._add_methods( |
117 | "connect_tcp", | 117 | "connect_tcp", |
118 | "connect_websocket", | ||
118 | "get_unihash", | 119 | "get_unihash", |
119 | "report_unihash", | 120 | "report_unihash", |
120 | "report_unihash_equiv", | 121 | "report_unihash_equiv", |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index f343c586b5..01ffd52c1d 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -483,3 +483,20 @@ class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm | |||
483 | # If IPv6 is enabled, it should be safe to use localhost directly, in general | 483 | # If IPv6 is enabled, it should be safe to use localhost directly, in general |
484 | # case it is more reliable to resolve the IP address explicitly. | 484 | # case it is more reliable to resolve the IP address explicitly. |
485 | return socket.gethostbyname("localhost") + ":0" | 485 | return socket.gethostbyname("localhost") + ":0" |
486 | |||
487 | |||
488 | class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | ||
489 | def setUp(self): | ||
490 | try: | ||
491 | import websockets | ||
492 | except ImportError as e: | ||
493 | self.skipTest(str(e)) | ||
494 | |||
495 | super().setUp() | ||
496 | |||
497 | def get_server_addr(self, server_idx): | ||
498 | # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled. | ||
499 | # If IPv6 is enabled, it should be safe to use localhost directly, in general | ||
500 | # case it is more reliable to resolve the IP address explicitly. | ||
501 | host = socket.gethostbyname("localhost") | ||
502 | return "ws://%s:0" % host | ||