diff options
Diffstat (limited to 'bitbake/lib')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 11 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 44 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/serv.py | 53 | ||||
-rw-r--r-- | bitbake/lib/hashserv/__init__.py | 13 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 1 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 17 |
6 files changed, 137 insertions, 2 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 7f33099b63..802c07df1f 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -10,7 +10,7 @@ import json | |||
10 | import os | 10 | import os |
11 | import socket | 11 | import socket |
12 | import sys | 12 | import sys |
13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK |
14 | from .exceptions import ConnectionClosedError | 14 | from .exceptions import ConnectionClosedError |
15 | 15 | ||
16 | 16 | ||
@@ -47,6 +47,15 @@ class AsyncClient(object): | |||
47 | 47 | ||
48 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
49 | 49 | ||
50 | async def connect_websocket(self, uri): | ||
51 | import websockets | ||
52 | |||
53 | async def connect_sock(): | ||
54 | websocket = await websockets.connect(uri, ping_interval=None) | ||
55 | return WebsocketConnection(websocket, self.timeout) | ||
56 | |||
57 | self._connect_sock = connect_sock | ||
58 | |||
50 | async def setup_connection(self): | 59 | async def setup_connection(self): |
51 | # Send headers | 60 | # Send headers |
52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) | 61 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py index c4fd24754c..a10628f75a 100644 --- a/bitbake/lib/bb/asyncrpc/connection.py +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
@@ -93,3 +93,47 @@ class StreamConnection(object): | |||
93 | if self.writer is not None: | 93 | if self.writer is not None: |
94 | self.writer.close() | 94 | self.writer.close() |
95 | self.writer = None | 95 | self.writer = None |
96 | |||
97 | |||
98 | class WebsocketConnection(object): | ||
99 | def __init__(self, socket, timeout): | ||
100 | self.socket = socket | ||
101 | self.timeout = timeout | ||
102 | |||
103 | @property | ||
104 | def address(self): | ||
105 | return ":".join(str(s) for s in self.socket.remote_address) | ||
106 | |||
107 | async def send_message(self, msg): | ||
108 | await self.send(json.dumps(msg)) | ||
109 | |||
110 | async def recv_message(self): | ||
111 | m = await self.recv() | ||
112 | return json.loads(m) | ||
113 | |||
114 | async def send(self, msg): | ||
115 | import websockets.exceptions | ||
116 | |||
117 | try: | ||
118 | await self.socket.send(msg) | ||
119 | except websockets.exceptions.ConnectionClosed: | ||
120 | raise ConnectionClosedError("Connection closed") | ||
121 | |||
122 | async def recv(self): | ||
123 | import websockets.exceptions | ||
124 | |||
125 | try: | ||
126 | if self.timeout < 0: | ||
127 | return await self.socket.recv() | ||
128 | |||
129 | try: | ||
130 | return await asyncio.wait_for(self.socket.recv(), self.timeout) | ||
131 | except asyncio.TimeoutError: | ||
132 | raise ConnectionError("Timed out waiting for data") | ||
133 | except websockets.exceptions.ConnectionClosed: | ||
134 | raise ConnectionClosedError("Connection closed") | ||
135 | |||
136 | async def close(self): | ||
137 | if self.socket is not None: | ||
138 | await self.socket.close() | ||
139 | self.socket = None | ||
diff --git a/bitbake/lib/bb/asyncrpc/serv.py b/bitbake/lib/bb/asyncrpc/serv.py index 3e0d0632cb..dfb0377380 100644 --- a/bitbake/lib/bb/asyncrpc/serv.py +++ b/bitbake/lib/bb/asyncrpc/serv.py | |||
@@ -12,7 +12,7 @@ import signal | |||
12 | import socket | 12 | import socket |
13 | import sys | 13 | import sys |
14 | import multiprocessing | 14 | import multiprocessing |
15 | from .connection import StreamConnection | 15 | from .connection import StreamConnection, WebsocketConnection |
16 | from .exceptions import ClientError, ServerError, ConnectionClosedError | 16 | from .exceptions import ClientError, ServerError, ConnectionClosedError |
17 | 17 | ||
18 | 18 | ||
@@ -178,6 +178,54 @@ class UnixStreamServer(StreamServer): | |||
178 | os.unlink(self.path) | 178 | os.unlink(self.path) |
179 | 179 | ||
180 | 180 | ||
181 | class WebsocketsServer(object): | ||
182 | def __init__(self, host, port, handler, logger): | ||
183 | self.host = host | ||
184 | self.port = port | ||
185 | self.handler = handler | ||
186 | self.logger = logger | ||
187 | |||
188 | def start(self, loop): | ||
189 | import websockets.server | ||
190 | |||
191 | self.server = loop.run_until_complete( | ||
192 | websockets.server.serve( | ||
193 | self.client_handler, | ||
194 | self.host, | ||
195 | self.port, | ||
196 | ping_interval=None, | ||
197 | ) | ||
198 | ) | ||
199 | |||
200 | for s in self.server.sockets: | ||
201 | self.logger.debug("Listening on %r" % (s.getsockname(),)) | ||
202 | |||
203 | # Enable keep alives. This prevents broken client connections | ||
204 | # from persisting on the server for long periods of time. | ||
205 | s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | ||
206 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) | ||
207 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) | ||
208 | s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) | ||
209 | |||
210 | name = self.server.sockets[0].getsockname() | ||
211 | if self.server.sockets[0].family == socket.AF_INET6: | ||
212 | self.address = "ws://[%s]:%d" % (name[0], name[1]) | ||
213 | else: | ||
214 | self.address = "ws://%s:%d" % (name[0], name[1]) | ||
215 | |||
216 | return [self.server.wait_closed()] | ||
217 | |||
218 | async def stop(self): | ||
219 | self.server.close() | ||
220 | |||
221 | def cleanup(self): | ||
222 | pass | ||
223 | |||
224 | async def client_handler(self, websocket): | ||
225 | socket = WebsocketConnection(websocket, -1) | ||
226 | await self.handler(socket) | ||
227 | |||
228 | |||
181 | class AsyncServer(object): | 229 | class AsyncServer(object): |
182 | def __init__(self, logger): | 230 | def __init__(self, logger): |
183 | self.logger = logger | 231 | self.logger = logger |
@@ -190,6 +238,9 @@ class AsyncServer(object): | |||
190 | def start_unix_server(self, path): | 238 | def start_unix_server(self, path): |
191 | self.server = UnixStreamServer(path, self._client_handler, self.logger) | 239 | self.server = UnixStreamServer(path, self._client_handler, self.logger) |
192 | 240 | ||
241 | def start_websocket_server(self, host, port): | ||
242 | self.server = WebsocketsServer(host, port, self._client_handler, self.logger) | ||
243 | |||
193 | async def _client_handler(self, socket): | 244 | async def _client_handler(self, socket): |
194 | try: | 245 | try: |
195 | client = self.accept_client(socket) | 246 | client = self.accept_client(socket) |
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py index 3a4018353f..56b9c6bc82 100644 --- a/bitbake/lib/hashserv/__init__.py +++ b/bitbake/lib/hashserv/__init__.py | |||
@@ -9,11 +9,15 @@ import re | |||
9 | import sqlite3 | 9 | import sqlite3 |
10 | import itertools | 10 | import itertools |
11 | import json | 11 | import json |
12 | from urllib.parse import urlparse | ||
12 | 13 | ||
13 | UNIX_PREFIX = "unix://" | 14 | UNIX_PREFIX = "unix://" |
15 | WS_PREFIX = "ws://" | ||
16 | WSS_PREFIX = "wss://" | ||
14 | 17 | ||
15 | ADDR_TYPE_UNIX = 0 | 18 | ADDR_TYPE_UNIX = 0 |
16 | ADDR_TYPE_TCP = 1 | 19 | ADDR_TYPE_TCP = 1 |
20 | ADDR_TYPE_WS = 2 | ||
17 | 21 | ||
18 | UNIHASH_TABLE_DEFINITION = ( | 22 | UNIHASH_TABLE_DEFINITION = ( |
19 | ("method", "TEXT NOT NULL", "UNIQUE"), | 23 | ("method", "TEXT NOT NULL", "UNIQUE"), |
@@ -84,6 +88,8 @@ def setup_database(database, sync=True): | |||
84 | def parse_address(addr): | 88 | def parse_address(addr): |
85 | if addr.startswith(UNIX_PREFIX): | 89 | if addr.startswith(UNIX_PREFIX): |
86 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) | 90 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) |
91 | elif addr.startswith(WS_PREFIX) or addr.startswith(WSS_PREFIX): | ||
92 | return (ADDR_TYPE_WS, (addr,)) | ||
87 | else: | 93 | else: |
88 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) | 94 | m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) |
89 | if m is not None: | 95 | if m is not None: |
@@ -103,6 +109,9 @@ def create_server(addr, dbname, *, sync=True, upstream=None, read_only=False): | |||
103 | (typ, a) = parse_address(addr) | 109 | (typ, a) = parse_address(addr) |
104 | if typ == ADDR_TYPE_UNIX: | 110 | if typ == ADDR_TYPE_UNIX: |
105 | s.start_unix_server(*a) | 111 | s.start_unix_server(*a) |
112 | elif typ == ADDR_TYPE_WS: | ||
113 | url = urlparse(a[0]) | ||
114 | s.start_websocket_server(url.hostname, url.port) | ||
106 | else: | 115 | else: |
107 | s.start_tcp_server(*a) | 116 | s.start_tcp_server(*a) |
108 | 117 | ||
@@ -116,6 +125,8 @@ def create_client(addr): | |||
116 | (typ, a) = parse_address(addr) | 125 | (typ, a) = parse_address(addr) |
117 | if typ == ADDR_TYPE_UNIX: | 126 | if typ == ADDR_TYPE_UNIX: |
118 | c.connect_unix(*a) | 127 | c.connect_unix(*a) |
128 | elif typ == ADDR_TYPE_WS: | ||
129 | c.connect_websocket(*a) | ||
119 | else: | 130 | else: |
120 | c.connect_tcp(*a) | 131 | c.connect_tcp(*a) |
121 | 132 | ||
@@ -128,6 +139,8 @@ async def create_async_client(addr): | |||
128 | (typ, a) = parse_address(addr) | 139 | (typ, a) = parse_address(addr) |
129 | if typ == ADDR_TYPE_UNIX: | 140 | if typ == ADDR_TYPE_UNIX: |
130 | await c.connect_unix(*a) | 141 | await c.connect_unix(*a) |
142 | elif typ == ADDR_TYPE_WS: | ||
143 | await c.connect_websocket(*a) | ||
131 | else: | 144 | else: |
132 | await c.connect_tcp(*a) | 145 | await c.connect_tcp(*a) |
133 | 146 | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 5f7d22ab13..9542d72f6c 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -115,6 +115,7 @@ class Client(bb.asyncrpc.Client): | |||
115 | super().__init__() | 115 | super().__init__() |
116 | self._add_methods( | 116 | self._add_methods( |
117 | "connect_tcp", | 117 | "connect_tcp", |
118 | "connect_websocket", | ||
118 | "get_unihash", | 119 | "get_unihash", |
119 | "report_unihash", | 120 | "report_unihash", |
120 | "report_unihash_equiv", | 121 | "report_unihash_equiv", |
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index f343c586b5..01ffd52c1d 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -483,3 +483,20 @@ class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm | |||
483 | # If IPv6 is enabled, it should be safe to use localhost directly, in general | 483 | # If IPv6 is enabled, it should be safe to use localhost directly, in general |
484 | # case it is more reliable to resolve the IP address explicitly. | 484 | # case it is more reliable to resolve the IP address explicitly. |
485 | return socket.gethostbyname("localhost") + ":0" | 485 | return socket.gethostbyname("localhost") + ":0" |
486 | |||
487 | |||
488 | class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): | ||
489 | def setUp(self): | ||
490 | try: | ||
491 | import websockets | ||
492 | except ImportError as e: | ||
493 | self.skipTest(str(e)) | ||
494 | |||
495 | super().setUp() | ||
496 | |||
497 | def get_server_addr(self, server_idx): | ||
498 | # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled. | ||
499 | # If IPv6 is enabled, it should be safe to use localhost directly, in general | ||
500 | # case it is more reliable to resolve the IP address explicitly. | ||
501 | host = socket.gethostbyname("localhost") | ||
502 | return "ws://%s:0" % host | ||