summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/asyncrpc/client.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2023-11-03 08:26:19 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2023-11-09 17:33:02 +0000
commit8f8501ed403dec27acbe780b936bc087fc5006d0 (patch)
tree60e6415075c7c71eacec23ca7dda53e4a324b12e /bitbake/lib/bb/asyncrpc/client.py
parentf97b686884166dd77d1818e70615027c6ba8c348 (diff)
downloadpoky-8f8501ed403dec27acbe780b936bc087fc5006d0.tar.gz
bitbake: asyncrpc: Abstract sockets
Rewrites the asyncrpc client and server code to make it possible to have other transport backends that are not stream based (e.g. websockets which are message based). The connection handling classes are now shared between both the client and server to make it easier to implement new transport mechanisms (Bitbake rev: 2aaeae53696e4c2f13a169830c3b7089cbad6eca) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py78
1 files changed, 23 insertions, 55 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index fa042bbe87..7f33099b63 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -10,13 +10,13 @@ import json
10import os 10import os
11import socket 11import socket
12import sys 12import sys
13from . import chunkify, DEFAULT_MAX_CHUNK 13from .connection import StreamConnection, DEFAULT_MAX_CHUNK
14from .exceptions import ConnectionClosedError
14 15
15 16
16class AsyncClient(object): 17class AsyncClient(object):
17 def __init__(self, proto_name, proto_version, logger, timeout=30): 18 def __init__(self, proto_name, proto_version, logger, timeout=30):
18 self.reader = None 19 self.socket = None
19 self.writer = None
20 self.max_chunk = DEFAULT_MAX_CHUNK 20 self.max_chunk = DEFAULT_MAX_CHUNK
21 self.proto_name = proto_name 21 self.proto_name = proto_name
22 self.proto_version = proto_version 22 self.proto_version = proto_version
@@ -25,7 +25,8 @@ class AsyncClient(object):
25 25
26 async def connect_tcp(self, address, port): 26 async def connect_tcp(self, address, port):
27 async def connect_sock(): 27 async def connect_sock():
28 return await asyncio.open_connection(address, port) 28 reader, writer = await asyncio.open_connection(address, port)
29 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
29 30
30 self._connect_sock = connect_sock 31 self._connect_sock = connect_sock
31 32
@@ -40,27 +41,27 @@ class AsyncClient(object):
40 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 41 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
41 sock.connect(os.path.basename(path)) 42 sock.connect(os.path.basename(path))
42 finally: 43 finally:
43 os.chdir(cwd) 44 os.chdir(cwd)
44 return await asyncio.open_unix_connection(sock=sock) 45 reader, writer = await asyncio.open_unix_connection(sock=sock)
46 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
45 47
46 self._connect_sock = connect_sock 48 self._connect_sock = connect_sock
47 49
48 async def setup_connection(self): 50 async def setup_connection(self):
49 s = '%s %s\n\n' % (self.proto_name, self.proto_version) 51 # Send headers
50 self.writer.write(s.encode("utf-8")) 52 await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
51 await self.writer.drain() 53 # End of headers
54 await self.socket.send("")
52 55
53 async def connect(self): 56 async def connect(self):
54 if self.reader is None or self.writer is None: 57 if self.socket is None:
55 (self.reader, self.writer) = await self._connect_sock() 58 self.socket = await self._connect_sock()
56 await self.setup_connection() 59 await self.setup_connection()
57 60
58 async def close(self): 61 async def close(self):
59 self.reader = None 62 if self.socket is not None:
60 63 await self.socket.close()
61 if self.writer is not None: 64 self.socket = None
62 self.writer.close()
63 self.writer = None
64 65
65 async def _send_wrapper(self, proc): 66 async def _send_wrapper(self, proc):
66 count = 0 67 count = 0
@@ -71,6 +72,7 @@ class AsyncClient(object):
71 except ( 72 except (
72 OSError, 73 OSError,
73 ConnectionError, 74 ConnectionError,
75 ConnectionClosedError,
74 json.JSONDecodeError, 76 json.JSONDecodeError,
75 UnicodeDecodeError, 77 UnicodeDecodeError,
76 ) as e: 78 ) as e:
@@ -82,49 +84,15 @@ class AsyncClient(object):
82 await self.close() 84 await self.close()
83 count += 1 85 count += 1
84 86
85 async def send_message(self, msg): 87 async def invoke(self, msg):
86 async def get_line():
87 try:
88 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
89 except asyncio.TimeoutError:
90 raise ConnectionError("Timed out waiting for server")
91
92 if not line:
93 raise ConnectionError("Connection closed")
94
95 line = line.decode("utf-8")
96
97 if not line.endswith("\n"):
98 raise ConnectionError("Bad message %r" % (line))
99
100 return line
101
102 async def proc(): 88 async def proc():
103 for c in chunkify(json.dumps(msg), self.max_chunk): 89 await self.socket.send_message(msg)
104 self.writer.write(c.encode("utf-8")) 90 return await self.socket.recv_message()
105 await self.writer.drain()
106
107 l = await get_line()
108
109 m = json.loads(l)
110 if m and "chunk-stream" in m:
111 lines = []
112 while True:
113 l = (await get_line()).rstrip("\n")
114 if not l:
115 break
116 lines.append(l)
117
118 m = json.loads("".join(lines))
119
120 return m
121 91
122 return await self._send_wrapper(proc) 92 return await self._send_wrapper(proc)
123 93
124 async def ping(self): 94 async def ping(self):
125 return await self.send_message( 95 return await self.invoke({"ping": {}})
126 {'ping': {}}
127 )
128 96
129 97
130class Client(object): 98class Client(object):
@@ -142,7 +110,7 @@ class Client(object):
142 # required (but harmless) with it. 110 # required (but harmless) with it.
143 asyncio.set_event_loop(self.loop) 111 asyncio.set_event_loop(self.loop)
144 112
145 self._add_methods('connect_tcp', 'ping') 113 self._add_methods("connect_tcp", "ping")
146 114
147 @abc.abstractmethod 115 @abc.abstractmethod
148 def _get_async_client(self): 116 def _get_async_client(self):