diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2023-11-03 08:26:19 -0600 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2023-11-09 17:33:02 +0000 |
commit | 8f8501ed403dec27acbe780b936bc087fc5006d0 (patch) | |
tree | 60e6415075c7c71eacec23ca7dda53e4a324b12e /bitbake/lib/bb/asyncrpc/client.py | |
parent | f97b686884166dd77d1818e70615027c6ba8c348 (diff) | |
download | poky-8f8501ed403dec27acbe780b936bc087fc5006d0.tar.gz |
bitbake: asyncrpc: Abstract sockets
Rewrites the asyncrpc client and server code to make it possible to have
other transport backends that are not stream based (e.g. websockets
which are message based). The connection handling classes are now shared
between both the client and server to make it easier to implement new
transport mechanisms
(Bitbake rev: 2aaeae53696e4c2f13a169830c3b7089cbad6eca)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 78 |
1 files changed, 23 insertions, 55 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87..7f33099b63 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -10,13 +10,13 @@ import json | |||
10 | import os | 10 | import os |
11 | import socket | 11 | import socket |
12 | import sys | 12 | import sys |
13 | from . import chunkify, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK |
14 | from .exceptions import ConnectionClosedError | ||
14 | 15 | ||
15 | 16 | ||
16 | class AsyncClient(object): | 17 | class AsyncClient(object): |
17 | def __init__(self, proto_name, proto_version, logger, timeout=30): | 18 | def __init__(self, proto_name, proto_version, logger, timeout=30): |
18 | self.reader = None | 19 | self.socket = None |
19 | self.writer = None | ||
20 | self.max_chunk = DEFAULT_MAX_CHUNK | 20 | self.max_chunk = DEFAULT_MAX_CHUNK |
21 | self.proto_name = proto_name | 21 | self.proto_name = proto_name |
22 | self.proto_version = proto_version | 22 | self.proto_version = proto_version |
@@ -25,7 +25,8 @@ class AsyncClient(object): | |||
25 | 25 | ||
26 | async def connect_tcp(self, address, port): | 26 | async def connect_tcp(self, address, port): |
27 | async def connect_sock(): | 27 | async def connect_sock(): |
28 | return await asyncio.open_connection(address, port) | 28 | reader, writer = await asyncio.open_connection(address, port) |
29 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
29 | 30 | ||
30 | self._connect_sock = connect_sock | 31 | self._connect_sock = connect_sock |
31 | 32 | ||
@@ -40,27 +41,27 @@ class AsyncClient(object): | |||
40 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) | 41 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) |
41 | sock.connect(os.path.basename(path)) | 42 | sock.connect(os.path.basename(path)) |
42 | finally: | 43 | finally: |
43 | os.chdir(cwd) | 44 | os.chdir(cwd) |
44 | return await asyncio.open_unix_connection(sock=sock) | 45 | reader, writer = await asyncio.open_unix_connection(sock=sock) |
46 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
45 | 47 | ||
46 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
47 | 49 | ||
48 | async def setup_connection(self): | 50 | async def setup_connection(self): |
49 | s = '%s %s\n\n' % (self.proto_name, self.proto_version) | 51 | # Send headers |
50 | self.writer.write(s.encode("utf-8")) | 52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
51 | await self.writer.drain() | 53 | # End of headers |
54 | await self.socket.send("") | ||
52 | 55 | ||
53 | async def connect(self): | 56 | async def connect(self): |
54 | if self.reader is None or self.writer is None: | 57 | if self.socket is None: |
55 | (self.reader, self.writer) = await self._connect_sock() | 58 | self.socket = await self._connect_sock() |
56 | await self.setup_connection() | 59 | await self.setup_connection() |
57 | 60 | ||
58 | async def close(self): | 61 | async def close(self): |
59 | self.reader = None | 62 | if self.socket is not None: |
60 | 63 | await self.socket.close() | |
61 | if self.writer is not None: | 64 | self.socket = None |
62 | self.writer.close() | ||
63 | self.writer = None | ||
64 | 65 | ||
65 | async def _send_wrapper(self, proc): | 66 | async def _send_wrapper(self, proc): |
66 | count = 0 | 67 | count = 0 |
@@ -71,6 +72,7 @@ class AsyncClient(object): | |||
71 | except ( | 72 | except ( |
72 | OSError, | 73 | OSError, |
73 | ConnectionError, | 74 | ConnectionError, |
75 | ConnectionClosedError, | ||
74 | json.JSONDecodeError, | 76 | json.JSONDecodeError, |
75 | UnicodeDecodeError, | 77 | UnicodeDecodeError, |
76 | ) as e: | 78 | ) as e: |
@@ -82,49 +84,15 @@ class AsyncClient(object): | |||
82 | await self.close() | 84 | await self.close() |
83 | count += 1 | 85 | count += 1 |
84 | 86 | ||
85 | async def send_message(self, msg): | 87 | async def invoke(self, msg): |
86 | async def get_line(): | ||
87 | try: | ||
88 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
89 | except asyncio.TimeoutError: | ||
90 | raise ConnectionError("Timed out waiting for server") | ||
91 | |||
92 | if not line: | ||
93 | raise ConnectionError("Connection closed") | ||
94 | |||
95 | line = line.decode("utf-8") | ||
96 | |||
97 | if not line.endswith("\n"): | ||
98 | raise ConnectionError("Bad message %r" % (line)) | ||
99 | |||
100 | return line | ||
101 | |||
102 | async def proc(): | 88 | async def proc(): |
103 | for c in chunkify(json.dumps(msg), self.max_chunk): | 89 | await self.socket.send_message(msg) |
104 | self.writer.write(c.encode("utf-8")) | 90 | return await self.socket.recv_message() |
105 | await self.writer.drain() | ||
106 | |||
107 | l = await get_line() | ||
108 | |||
109 | m = json.loads(l) | ||
110 | if m and "chunk-stream" in m: | ||
111 | lines = [] | ||
112 | while True: | ||
113 | l = (await get_line()).rstrip("\n") | ||
114 | if not l: | ||
115 | break | ||
116 | lines.append(l) | ||
117 | |||
118 | m = json.loads("".join(lines)) | ||
119 | |||
120 | return m | ||
121 | 91 | ||
122 | return await self._send_wrapper(proc) | 92 | return await self._send_wrapper(proc) |
123 | 93 | ||
124 | async def ping(self): | 94 | async def ping(self): |
125 | return await self.send_message( | 95 | return await self.invoke({"ping": {}}) |
126 | {'ping': {}} | ||
127 | ) | ||
128 | 96 | ||
129 | 97 | ||
130 | class Client(object): | 98 | class Client(object): |
@@ -142,7 +110,7 @@ class Client(object): | |||
142 | # required (but harmless) with it. | 110 | # required (but harmless) with it. |
143 | asyncio.set_event_loop(self.loop) | 111 | asyncio.set_event_loop(self.loop) |
144 | 112 | ||
145 | self._add_methods('connect_tcp', 'ping') | 113 | self._add_methods("connect_tcp", "ping") |
146 | 114 | ||
147 | @abc.abstractmethod | 115 | @abc.abstractmethod |
148 | def _get_async_client(self): | 116 | def _get_async_client(self): |