diff options
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 78 |
1 files changed, 23 insertions, 55 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87..7f33099b63 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -10,13 +10,13 @@ import json | |||
10 | import os | 10 | import os |
11 | import socket | 11 | import socket |
12 | import sys | 12 | import sys |
13 | from . import chunkify, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK |
14 | from .exceptions import ConnectionClosedError | ||
14 | 15 | ||
15 | 16 | ||
16 | class AsyncClient(object): | 17 | class AsyncClient(object): |
17 | def __init__(self, proto_name, proto_version, logger, timeout=30): | 18 | def __init__(self, proto_name, proto_version, logger, timeout=30): |
18 | self.reader = None | 19 | self.socket = None |
19 | self.writer = None | ||
20 | self.max_chunk = DEFAULT_MAX_CHUNK | 20 | self.max_chunk = DEFAULT_MAX_CHUNK |
21 | self.proto_name = proto_name | 21 | self.proto_name = proto_name |
22 | self.proto_version = proto_version | 22 | self.proto_version = proto_version |
@@ -25,7 +25,8 @@ class AsyncClient(object): | |||
25 | 25 | ||
26 | async def connect_tcp(self, address, port): | 26 | async def connect_tcp(self, address, port): |
27 | async def connect_sock(): | 27 | async def connect_sock(): |
28 | return await asyncio.open_connection(address, port) | 28 | reader, writer = await asyncio.open_connection(address, port) |
29 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
29 | 30 | ||
30 | self._connect_sock = connect_sock | 31 | self._connect_sock = connect_sock |
31 | 32 | ||
@@ -40,27 +41,27 @@ class AsyncClient(object): | |||
40 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) | 41 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) |
41 | sock.connect(os.path.basename(path)) | 42 | sock.connect(os.path.basename(path)) |
42 | finally: | 43 | finally: |
43 | os.chdir(cwd) | 44 | os.chdir(cwd) |
44 | return await asyncio.open_unix_connection(sock=sock) | 45 | reader, writer = await asyncio.open_unix_connection(sock=sock) |
46 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
45 | 47 | ||
46 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
47 | 49 | ||
48 | async def setup_connection(self): | 50 | async def setup_connection(self): |
49 | s = '%s %s\n\n' % (self.proto_name, self.proto_version) | 51 | # Send headers |
50 | self.writer.write(s.encode("utf-8")) | 52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
51 | await self.writer.drain() | 53 | # End of headers |
54 | await self.socket.send("") | ||
52 | 55 | ||
53 | async def connect(self): | 56 | async def connect(self): |
54 | if self.reader is None or self.writer is None: | 57 | if self.socket is None: |
55 | (self.reader, self.writer) = await self._connect_sock() | 58 | self.socket = await self._connect_sock() |
56 | await self.setup_connection() | 59 | await self.setup_connection() |
57 | 60 | ||
58 | async def close(self): | 61 | async def close(self): |
59 | self.reader = None | 62 | if self.socket is not None: |
60 | 63 | await self.socket.close() | |
61 | if self.writer is not None: | 64 | self.socket = None |
62 | self.writer.close() | ||
63 | self.writer = None | ||
64 | 65 | ||
65 | async def _send_wrapper(self, proc): | 66 | async def _send_wrapper(self, proc): |
66 | count = 0 | 67 | count = 0 |
@@ -71,6 +72,7 @@ class AsyncClient(object): | |||
71 | except ( | 72 | except ( |
72 | OSError, | 73 | OSError, |
73 | ConnectionError, | 74 | ConnectionError, |
75 | ConnectionClosedError, | ||
74 | json.JSONDecodeError, | 76 | json.JSONDecodeError, |
75 | UnicodeDecodeError, | 77 | UnicodeDecodeError, |
76 | ) as e: | 78 | ) as e: |
@@ -82,49 +84,15 @@ class AsyncClient(object): | |||
82 | await self.close() | 84 | await self.close() |
83 | count += 1 | 85 | count += 1 |
84 | 86 | ||
85 | async def send_message(self, msg): | 87 | async def invoke(self, msg): |
86 | async def get_line(): | ||
87 | try: | ||
88 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
89 | except asyncio.TimeoutError: | ||
90 | raise ConnectionError("Timed out waiting for server") | ||
91 | |||
92 | if not line: | ||
93 | raise ConnectionError("Connection closed") | ||
94 | |||
95 | line = line.decode("utf-8") | ||
96 | |||
97 | if not line.endswith("\n"): | ||
98 | raise ConnectionError("Bad message %r" % (line)) | ||
99 | |||
100 | return line | ||
101 | |||
102 | async def proc(): | 88 | async def proc(): |
103 | for c in chunkify(json.dumps(msg), self.max_chunk): | 89 | await self.socket.send_message(msg) |
104 | self.writer.write(c.encode("utf-8")) | 90 | return await self.socket.recv_message() |
105 | await self.writer.drain() | ||
106 | |||
107 | l = await get_line() | ||
108 | |||
109 | m = json.loads(l) | ||
110 | if m and "chunk-stream" in m: | ||
111 | lines = [] | ||
112 | while True: | ||
113 | l = (await get_line()).rstrip("\n") | ||
114 | if not l: | ||
115 | break | ||
116 | lines.append(l) | ||
117 | |||
118 | m = json.loads("".join(lines)) | ||
119 | |||
120 | return m | ||
121 | 91 | ||
122 | return await self._send_wrapper(proc) | 92 | return await self._send_wrapper(proc) |
123 | 93 | ||
124 | async def ping(self): | 94 | async def ping(self): |
125 | return await self.send_message( | 95 | return await self.invoke({"ping": {}}) |
126 | {'ping': {}} | ||
127 | ) | ||
128 | 96 | ||
129 | 97 | ||
130 | class Client(object): | 98 | class Client(object): |
@@ -142,7 +110,7 @@ class Client(object): | |||
142 | # required (but harmless) with it. | 110 | # required (but harmless) with it. |
143 | asyncio.set_event_loop(self.loop) | 111 | asyncio.set_event_loop(self.loop) |
144 | 112 | ||
145 | self._add_methods('connect_tcp', 'ping') | 113 | self._add_methods("connect_tcp", "ping") |
146 | 114 | ||
147 | @abc.abstractmethod | 115 | @abc.abstractmethod |
148 | def _get_async_client(self): | 116 | def _get_async_client(self): |