diff options
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 78 |
1 files changed, 23 insertions, 55 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87..7f33099b63 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
| @@ -10,13 +10,13 @@ import json | |||
| 10 | import os | 10 | import os |
| 11 | import socket | 11 | import socket |
| 12 | import sys | 12 | import sys |
| 13 | from . import chunkify, DEFAULT_MAX_CHUNK | 13 | from .connection import StreamConnection, DEFAULT_MAX_CHUNK |
| 14 | from .exceptions import ConnectionClosedError | ||
| 14 | 15 | ||
| 15 | 16 | ||
| 16 | class AsyncClient(object): | 17 | class AsyncClient(object): |
| 17 | def __init__(self, proto_name, proto_version, logger, timeout=30): | 18 | def __init__(self, proto_name, proto_version, logger, timeout=30): |
| 18 | self.reader = None | 19 | self.socket = None |
| 19 | self.writer = None | ||
| 20 | self.max_chunk = DEFAULT_MAX_CHUNK | 20 | self.max_chunk = DEFAULT_MAX_CHUNK |
| 21 | self.proto_name = proto_name | 21 | self.proto_name = proto_name |
| 22 | self.proto_version = proto_version | 22 | self.proto_version = proto_version |
| @@ -25,7 +25,8 @@ class AsyncClient(object): | |||
| 25 | 25 | ||
| 26 | async def connect_tcp(self, address, port): | 26 | async def connect_tcp(self, address, port): |
| 27 | async def connect_sock(): | 27 | async def connect_sock(): |
| 28 | return await asyncio.open_connection(address, port) | 28 | reader, writer = await asyncio.open_connection(address, port) |
| 29 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
| 29 | 30 | ||
| 30 | self._connect_sock = connect_sock | 31 | self._connect_sock = connect_sock |
| 31 | 32 | ||
| @@ -40,27 +41,27 @@ class AsyncClient(object): | |||
| 40 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) | 41 | sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) |
| 41 | sock.connect(os.path.basename(path)) | 42 | sock.connect(os.path.basename(path)) |
| 42 | finally: | 43 | finally: |
| 43 | os.chdir(cwd) | 44 | os.chdir(cwd) |
| 44 | return await asyncio.open_unix_connection(sock=sock) | 45 | reader, writer = await asyncio.open_unix_connection(sock=sock) |
| 46 | return StreamConnection(reader, writer, self.timeout, self.max_chunk) | ||
| 45 | 47 | ||
| 46 | self._connect_sock = connect_sock | 48 | self._connect_sock = connect_sock |
| 47 | 49 | ||
| 48 | async def setup_connection(self): | 50 | async def setup_connection(self): |
| 49 | s = '%s %s\n\n' % (self.proto_name, self.proto_version) | 51 | # Send headers |
| 50 | self.writer.write(s.encode("utf-8")) | 52 | await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) |
| 51 | await self.writer.drain() | 53 | # End of headers |
| 54 | await self.socket.send("") | ||
| 52 | 55 | ||
| 53 | async def connect(self): | 56 | async def connect(self): |
| 54 | if self.reader is None or self.writer is None: | 57 | if self.socket is None: |
| 55 | (self.reader, self.writer) = await self._connect_sock() | 58 | self.socket = await self._connect_sock() |
| 56 | await self.setup_connection() | 59 | await self.setup_connection() |
| 57 | 60 | ||
| 58 | async def close(self): | 61 | async def close(self): |
| 59 | self.reader = None | 62 | if self.socket is not None: |
| 60 | 63 | await self.socket.close() | |
| 61 | if self.writer is not None: | 64 | self.socket = None |
| 62 | self.writer.close() | ||
| 63 | self.writer = None | ||
| 64 | 65 | ||
| 65 | async def _send_wrapper(self, proc): | 66 | async def _send_wrapper(self, proc): |
| 66 | count = 0 | 67 | count = 0 |
| @@ -71,6 +72,7 @@ class AsyncClient(object): | |||
| 71 | except ( | 72 | except ( |
| 72 | OSError, | 73 | OSError, |
| 73 | ConnectionError, | 74 | ConnectionError, |
| 75 | ConnectionClosedError, | ||
| 74 | json.JSONDecodeError, | 76 | json.JSONDecodeError, |
| 75 | UnicodeDecodeError, | 77 | UnicodeDecodeError, |
| 76 | ) as e: | 78 | ) as e: |
| @@ -82,49 +84,15 @@ class AsyncClient(object): | |||
| 82 | await self.close() | 84 | await self.close() |
| 83 | count += 1 | 85 | count += 1 |
| 84 | 86 | ||
| 85 | async def send_message(self, msg): | 87 | async def invoke(self, msg): |
| 86 | async def get_line(): | ||
| 87 | try: | ||
| 88 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
| 89 | except asyncio.TimeoutError: | ||
| 90 | raise ConnectionError("Timed out waiting for server") | ||
| 91 | |||
| 92 | if not line: | ||
| 93 | raise ConnectionError("Connection closed") | ||
| 94 | |||
| 95 | line = line.decode("utf-8") | ||
| 96 | |||
| 97 | if not line.endswith("\n"): | ||
| 98 | raise ConnectionError("Bad message %r" % (line)) | ||
| 99 | |||
| 100 | return line | ||
| 101 | |||
| 102 | async def proc(): | 88 | async def proc(): |
| 103 | for c in chunkify(json.dumps(msg), self.max_chunk): | 89 | await self.socket.send_message(msg) |
| 104 | self.writer.write(c.encode("utf-8")) | 90 | return await self.socket.recv_message() |
| 105 | await self.writer.drain() | ||
| 106 | |||
| 107 | l = await get_line() | ||
| 108 | |||
| 109 | m = json.loads(l) | ||
| 110 | if m and "chunk-stream" in m: | ||
| 111 | lines = [] | ||
| 112 | while True: | ||
| 113 | l = (await get_line()).rstrip("\n") | ||
| 114 | if not l: | ||
| 115 | break | ||
| 116 | lines.append(l) | ||
| 117 | |||
| 118 | m = json.loads("".join(lines)) | ||
| 119 | |||
| 120 | return m | ||
| 121 | 91 | ||
| 122 | return await self._send_wrapper(proc) | 92 | return await self._send_wrapper(proc) |
| 123 | 93 | ||
| 124 | async def ping(self): | 94 | async def ping(self): |
| 125 | return await self.send_message( | 95 | return await self.invoke({"ping": {}}) |
| 126 | {'ping': {}} | ||
| 127 | ) | ||
| 128 | 96 | ||
| 129 | 97 | ||
| 130 | class Client(object): | 98 | class Client(object): |
| @@ -142,7 +110,7 @@ class Client(object): | |||
| 142 | # required (but harmless) with it. | 110 | # required (but harmless) with it. |
| 143 | asyncio.set_event_loop(self.loop) | 111 | asyncio.set_event_loop(self.loop) |
| 144 | 112 | ||
| 145 | self._add_methods('connect_tcp', 'ping') | 113 | self._add_methods("connect_tcp", "ping") |
| 146 | 114 | ||
| 147 | @abc.abstractmethod | 115 | @abc.abstractmethod |
| 148 | def _get_async_client(self): | 116 | def _get_async_client(self): |
