summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/asyncrpc/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py78
1 files changed, 23 insertions, 55 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index fa042bbe87..7f33099b63 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -10,13 +10,13 @@ import json
10import os 10import os
11import socket 11import socket
12import sys 12import sys
13from . import chunkify, DEFAULT_MAX_CHUNK 13from .connection import StreamConnection, DEFAULT_MAX_CHUNK
14from .exceptions import ConnectionClosedError
14 15
15 16
16class AsyncClient(object): 17class AsyncClient(object):
17 def __init__(self, proto_name, proto_version, logger, timeout=30): 18 def __init__(self, proto_name, proto_version, logger, timeout=30):
18 self.reader = None 19 self.socket = None
19 self.writer = None
20 self.max_chunk = DEFAULT_MAX_CHUNK 20 self.max_chunk = DEFAULT_MAX_CHUNK
21 self.proto_name = proto_name 21 self.proto_name = proto_name
22 self.proto_version = proto_version 22 self.proto_version = proto_version
@@ -25,7 +25,8 @@ class AsyncClient(object):
25 25
26 async def connect_tcp(self, address, port): 26 async def connect_tcp(self, address, port):
27 async def connect_sock(): 27 async def connect_sock():
28 return await asyncio.open_connection(address, port) 28 reader, writer = await asyncio.open_connection(address, port)
29 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
29 30
30 self._connect_sock = connect_sock 31 self._connect_sock = connect_sock
31 32
@@ -40,27 +41,27 @@ class AsyncClient(object):
40 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 41 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
41 sock.connect(os.path.basename(path)) 42 sock.connect(os.path.basename(path))
42 finally: 43 finally:
43 os.chdir(cwd) 44 os.chdir(cwd)
44 return await asyncio.open_unix_connection(sock=sock) 45 reader, writer = await asyncio.open_unix_connection(sock=sock)
46 return StreamConnection(reader, writer, self.timeout, self.max_chunk)
45 47
46 self._connect_sock = connect_sock 48 self._connect_sock = connect_sock
47 49
48 async def setup_connection(self): 50 async def setup_connection(self):
49 s = '%s %s\n\n' % (self.proto_name, self.proto_version) 51 # Send headers
50 self.writer.write(s.encode("utf-8")) 52 await self.socket.send("%s %s" % (self.proto_name, self.proto_version))
51 await self.writer.drain() 53 # End of headers
54 await self.socket.send("")
52 55
53 async def connect(self): 56 async def connect(self):
54 if self.reader is None or self.writer is None: 57 if self.socket is None:
55 (self.reader, self.writer) = await self._connect_sock() 58 self.socket = await self._connect_sock()
56 await self.setup_connection() 59 await self.setup_connection()
57 60
58 async def close(self): 61 async def close(self):
59 self.reader = None 62 if self.socket is not None:
60 63 await self.socket.close()
61 if self.writer is not None: 64 self.socket = None
62 self.writer.close()
63 self.writer = None
64 65
65 async def _send_wrapper(self, proc): 66 async def _send_wrapper(self, proc):
66 count = 0 67 count = 0
@@ -71,6 +72,7 @@ class AsyncClient(object):
71 except ( 72 except (
72 OSError, 73 OSError,
73 ConnectionError, 74 ConnectionError,
75 ConnectionClosedError,
74 json.JSONDecodeError, 76 json.JSONDecodeError,
75 UnicodeDecodeError, 77 UnicodeDecodeError,
76 ) as e: 78 ) as e:
@@ -82,49 +84,15 @@ class AsyncClient(object):
82 await self.close() 84 await self.close()
83 count += 1 85 count += 1
84 86
85 async def send_message(self, msg): 87 async def invoke(self, msg):
86 async def get_line():
87 try:
88 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
89 except asyncio.TimeoutError:
90 raise ConnectionError("Timed out waiting for server")
91
92 if not line:
93 raise ConnectionError("Connection closed")
94
95 line = line.decode("utf-8")
96
97 if not line.endswith("\n"):
98 raise ConnectionError("Bad message %r" % (line))
99
100 return line
101
102 async def proc(): 88 async def proc():
103 for c in chunkify(json.dumps(msg), self.max_chunk): 89 await self.socket.send_message(msg)
104 self.writer.write(c.encode("utf-8")) 90 return await self.socket.recv_message()
105 await self.writer.drain()
106
107 l = await get_line()
108
109 m = json.loads(l)
110 if m and "chunk-stream" in m:
111 lines = []
112 while True:
113 l = (await get_line()).rstrip("\n")
114 if not l:
115 break
116 lines.append(l)
117
118 m = json.loads("".join(lines))
119
120 return m
121 91
122 return await self._send_wrapper(proc) 92 return await self._send_wrapper(proc)
123 93
124 async def ping(self): 94 async def ping(self):
125 return await self.send_message( 95 return await self.invoke({"ping": {}})
126 {'ping': {}}
127 )
128 96
129 97
130class Client(object): 98class Client(object):
@@ -142,7 +110,7 @@ class Client(object):
142 # required (but harmless) with it. 110 # required (but harmless) with it.
143 asyncio.set_event_loop(self.loop) 111 asyncio.set_event_loop(self.loop)
144 112
145 self._add_methods('connect_tcp', 'ping') 113 self._add_methods("connect_tcp", "ping")
146 114
147 @abc.abstractmethod 115 @abc.abstractmethod
148 def _get_async_client(self): 116 def _get_async_client(self):