diff options
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 11 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 44 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/serv.py | 53 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/__init__.py | 13 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/client.py | 1 | ||||
| -rw-r--r-- | bitbake/lib/hashserv/tests.py | 17 |
6 files changed, 137 insertions, 2 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 7f33099b63..802c07df1f 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
| @@ -10,7 +10,7 @@ import json | |||
| 10 | import os | 10 | import os |
| 11 | import socket | 11 | import socket |
| 12 | import sys | 12 | import sys |
| 13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK |
| 14 | from .exceptions import ConnectionClosedError | 14 | from .exceptions import ConnectionClosedError |
| 15 | 15 | ||
| 16 | 16 | ||
| @@ -47,6 +47,15 @@ class AsyncClient(object): | |||
| 47 | 47 | ||
| 48 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
| 49 | 49 | ||
| 50 | async def connect_websocket(self, uri): | ||
| 51 | import websockets | ||
| 52 | |||
| 53 | async def connect_sock(): | ||
| 54 | websocket = await websockets.connect(uri, ping_interval=None) | ||
| 55 | return WebsocketConnection(websocket, self.timeout) | ||
| 56 | |||
| 57 | self._connect_sock = connect_sock | ||
| 58 | |||
| 50 | async def setup_connection(self): | 59 | async def setup_connection(self): |
| 51 | # Send headers | 60 | # Send headers |
| 52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) | 61 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index c4fd24754c..a10628f75a 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
| @@ -93,3 +93,47 @@ class StreamConnection(object): | |||
| 93 | if self.writer is not None: | 93 | if self.writer is not None: |
| 94 | self.writer.close() | 94 | self.writer.close() |
| 95 | self.writer = None | 95 | self.writer = None |
| 96 | |||
| 97 | |||
| 98 | class WebsocketConnection(object): | ||
| 99 | def __init__(self, socket, timeout): | ||
| 100 | self.socket = socket | ||
| 101 | self.timeout = timeout | ||
| 102 | |||
| 103 | @property | ||
| 104 | def address(self): | ||
| 105 | return ":".join(str(s) for s in self.socket.remote_address) | ||
| 106 | |||
| 107 | async def send_message(self, msg): | ||
| 108 | await self.send(json.dumps(msg)) | ||
| 109 | |||
| 110 | async def recv_message(self): | ||
| 111 | m = await self.recv() | ||
| 112 | return json.loads(m) | ||
| 113 | |||
| 114 | async def send(self, msg): | ||
| 115 | import websockets.exceptions | ||
| 116 | |||
| 117 | try: | ||
| 118 | await self.socket.send(msg) | ||
| 119 | except websockets.exceptions.ConnectionClosed: | ||
| 120 | raise ConnectionClosedError("Connection closed") | ||
| 121 | |||
| 122 | async def recv(self): | ||
| 123 | import websockets.exceptions | ||
| 124 | |||
| 125 | try: | ||
| 126 | if self.timeout < 0: | ||
| 127 | return await self.socket.recv() | ||
| 128 | |||
| 129 | try: | ||
| 130 | return await asyncio.wait_for(self.socket.recv(), self.timeout) | ||
| 131 | except asyncio.TimeoutError: | ||
| 132 | raise ConnectionError("Timed out waiting for data") | ||
| 133 | except websockets.exceptions.ConnectionClosed: | ||
| 134 | raise ConnectionClosedError("Connection closed") | ||
| 135 | |||
| 136 | async def close(self): | ||
| 137 | if self.socket is not None: | ||
| 138 | await self.socket.close() | ||
| 139 | self.socket = None | ||
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index 3e0d0632cb..dfb0377380 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py | |||
| @@ -12,7 +12,7 @@ import signal | |||
| 12 | import socket | 12 | import socket |
| 13 | import sys | 13 | import sys |
| 14 | import multiprocessing | 14 | import multiprocessing |
| 15 | from .connection import StreamConnection | 15 | from .connection import StreamConnection, WebsocketConnection |
| 16 | from .exceptions import ClientError, ServerError, ConnectionClosedError | 16 | from .exceptions import ClientError, ServerError, ConnectionClosedError |
| 17 | 17 | ||
| 18 | 18 | ||
| @@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer): | |||
| 178 | os.unlink(self.path) | 178 | os.unlink(self.path) |
| 179 | 179 | ||
| 180 | 180 | ||
| 181 | class WebsocketsServer(object): | ||
| 182 | def __init__(self, host, port, handler, logger): | ||
| 183 | self.host = host | ||
| 184 | self.port = port | ||
| 185 | self.handler = handler | ||
| 186 | self.logger = logger | ||
| 187 | |||
| 188 | def start(self, loop): | ||
| 189 | import websockets.server | ||
| 190 | |||
| 191 | self.server = loop.run_until_complete( | ||
| 192 | websockets.server.serve( | ||
| 193 | self.client_handler, | ||
| 194 | self.host, | ||
| 195 | self.port, | ||
| 196 | ping_interval=None, | ||
| 197 | ) | ||
| 198 | ) | ||
| 199 | |||
| 200 | for s in self.server.sockets: | ||
| 201 | self.logger.debug("Listening on %r" % (s.getsockname(),)) | ||
| 202 | |||
| 203 | # Enable keep alives. This prevents broken client connections | ||
| 204 | # from persisting on the server for long periods of time. | ||
| 205 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
| 206 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
| 207 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
| 208 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
| 209 | |||
| 210 | name = self.server.sockets[0].getsockname() | ||
| 211 | if self.server.sockets[0].family == socket.AF_INET6: | ||
| 212 | self.address = "ws://[%s]:%d" % (name[0], name[1]) | ||
| 213 | else: | ||
| 214 | self.address = "ws://%s:%d" % (name[0], name[1]) | ||
| 215 | |||
| 216 | return [self.server.wait_closed()] | ||
| 217 | |||
| 218 | async def stop(self): | ||
| 219 | self.server.close() | ||
| 220 | |||
| 221 | def cleanup(self): | ||
| 222 | pass | ||
| 223 | |||
| 224 | async def client_handler(self, websocket): | ||
| 225 | socket = WebsocketConnection(websocket, -1) | ||
| 226 | await self.handler(socket) | ||
| 227 | |||
| 228 | |||
| 181 | class AsyncServer(object): | 229 | class AsyncServer(object): |
| 182 | def __init__(self, logger): | 230 | def __init__(self, logger): |
| 183 | self.logger = logger | 231 | self.logger = logger |
| @@ -190,6 +238,9 @@ class AsyncServer(object): | |||
| 190 | def start_unix_server(self, path): | 238 | def start_unix_server(self, path): |
| 191 | self.server = UnixStreamServer(path, self._client_handler, self.logger) | 239 | self.server = UnixStreamServer(path, self._client_handler, self.logger) |
| 192 | 240 | ||
| 241 | def start_websocket_server(self, host, port): | ||
| 242 | self.server = WebsocketsServer(host, port, self._client_handler, self.logger) | ||
| 243 | |||
| 193 | async def _client_handler(self, socket): | 244 | async def _client_handler(self, socket): |
| 194 | try: | 245 | try: |
| 195 | client = self.accept_client(socket) | 246 | client = self.accept_client(socket) |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 3a4018353f..56b9c6bc82 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
| @@ -9,11 +9,15 @@ import re | |||
| 9 | import sqlite3 | 9 | import sqlite3 |
| 10 | import itertools | 10 | import itertools |
| 11 | import json | 11 | import json |
| 12 | from urllib.parse import urlparse | ||
| 12 | 13 | ||
| 13 | UNIX_PREFIX = "unix://" | 14 | UNIX_PREFIX = "unix://" |
| 15 | WS_PREFIX = "ws://" | ||
| 16 | WSS_PREFIX = "wss://" | ||
| 14 | 17 | ||
| 15 | ADDR_TYPE_UNIX = 0 | 18 | ADDR_TYPE_UNIX = 0 |
| 16 | ADDR_TYPE_TCP = 1 | 19 | ADDR_TYPE_TCP = 1 |
| 20 | ADDR_TYPE_WS = 2 | ||
| 17 | 21 | ||
| 18 | UNIHASH_TABLE_DEFINITION = ( | 22 | UNIHASH_TABLE_DEFINITION = ( |
| 19 | ("method", "TEXT NOT NULL", "UNIQUE"), | 23 | ("method", "TEXT NOT NULL", "UNIQUE"), |
| @@ -84,6 +88,8 @@ def setup_database(database, sync=True): | |||
| 84 | def parse_address(addr): | 88 | def parse_address(addr): |
| 85 | if addr.startswith(UNIX_PREFIX): | 89 | if addr.startswith(UNIX_PREFIX): |
| 86 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) | 90 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) |
| 91 | elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): | ||
| 92 | return (ADDR_TYPE_WS, (addr,)) | ||
| 87 | else: | 93 | else: |
| 88 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) | 94 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) |
| 89 | if m is not None: | 95 | if m is not None: |
| @@ -103,6 +109,9 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | |||
| 103 | (typ, a) = parse_address(addr) | 109 | (typ, a) = parse_address(addr) |
| 104 | if typ == ADDR_TYPE_UNIX: | 110 | if typ == ADDR_TYPE_UNIX: |
| 105 | s.start_unix_server(*a) | 111 | s.start_unix_server(*a) |
| 112 | elif typ == ADDR_TYPE_WS: | ||
| 113 | url = urlparse(a[0]) | ||
| 114 | s.start_websocket_server(url.hostname, url.port) | ||
| 106 | else: | 115 | else: |
| 107 | s.start_tcp_server(*a) | 116 | s.start_tcp_server(*a) |
| 108 | 117 | ||
| @@ -116,6 +125,8 @@ def create_client(addr): | |||
| 116 | (typ, a) = parse_address(addr) | 125 | (typ, a) = parse_address(addr) |
| 117 | if typ == ADDR_TYPE_UNIX: | 126 | if typ == ADDR_TYPE_UNIX: |
| 118 | c.connect_unix(*a) | 127 | c.connect_unix(*a) |
| 128 | elif typ == ADDR_TYPE_WS: | ||
| 129 | c.connect_websocket(*a) | ||
| 119 | else: | 130 | else: |
| 120 | c.connect_tcp(*a) | 131 | c.connect_tcp(*a) |
| 121 | 132 | ||
| @@ -128,6 +139,8 @@ async def create_async_client(addr): | |||
| 128 | (typ, a) = parse_address(addr) | 139 | (typ, a) = parse_address(addr) |
| 129 | if typ == ADDR_TYPE_UNIX: | 140 | if typ == ADDR_TYPE_UNIX: |
| 130 | await c.connect_unix(*a) | 141 | await c.connect_unix(*a) |
| 142 | elif typ == ADDR_TYPE_WS: | ||
| 143 | await c.connect_websocket(*a) | ||
| 131 | else: | 144 | else: |
| 132 | await c.connect_tcp(*a) | 145 | await c.connect_tcp(*a) |
| 133 | 146 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 5f7d22ab13..9542d72f6c 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
| @@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client): | |||
| 115 | super().__init__() | 115 | super().__init__() |
| 116 | self._add_methods( | 116 | self._add_methods( |
| 117 | "connect_tcp", | 117 | "connect_tcp", |
| 118 | "connect_websocket", | ||
| 118 | "get_unihash", | 119 | "get_unihash", |
| 119 | "report_unihash", | 120 | "report_unihash", |
| 120 | "report_unihash_equiv", | 121 | "report_unihash_equiv", |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index f343c586b5..01ffd52c1d 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
| @@ -483,3 +483,20 @@ class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm | |||
| 483 | # If IPv6 is enabled, it should be safe to use localhost directly, in general | 483 | # If IPv6 is enabled, it should be safe to use localhost directly, in general |
| 484 | # case it is more reliable to resolve the IP address explicitly. | 484 | # case it is more reliable to resolve the IP address explicitly. |
| 485 | return socket.gethostbyname("localhost") + ":0" | 485 | return socket.gethostbyname("localhost") + ":0" |
| 486 | |||
| 487 | |||
| 488 | class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | ||
| 489 | def setUp(self): | ||
| 490 | try: | ||
| 491 | import websockets | ||
| 492 | except ImportError as e: | ||
| 493 | self.skipTest(str(e)) | ||
| 494 | |||
| 495 | super().setUp() | ||
| 496 | |||
| 497 | def get_server_addr(self, server_idx): | ||
| 498 | # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled. | ||
| 499 | # If IPv6 is enabled, it should be safe to use localhost directly, in general | ||
| 500 | # case it is more reliable to resolve the IP address explicitly. | ||
| 501 | host = socket.gethostbyname("localhost") | ||
| 502 | return "ws://%s:0" % host | ||
