diff options
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/connection.py')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/connection.py | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py new file mode 100644 index 0000000000..c4fd24754c --- /dev/null +++ b/bitbake/lib/bb/asyncrpc/connection.py | |||
@@ -0,0 +1,95 @@ | |||
1 | # | ||
2 | # Copyright BitBake Contributors | ||
3 | # | ||
4 | # SPDX-License-Identifier: GPL-2.0-only | ||
5 | # | ||
6 | |||
7 | import asyncio | ||
8 | import itertools | ||
9 | import json | ||
10 | from .exceptions import ClientError, ConnectionClosedError | ||
11 | |||
12 | |||
13 | # The Python async server defaults to a 64K receive buffer, so we hardcode our | ||
14 | # maximum chunk size. It would be better if the client and server reported to | ||
15 | # each other what the maximum chunk sizes were, but that will slow down the | ||
16 | # connection setup with a round trip delay so I'd rather not do that unless it | ||
17 | # is necessary | ||
18 | DEFAULT_MAX_CHUNK = 32 * 1024 | ||
19 | |||
20 | |||
21 | def chunkify(msg, max_chunk): | ||
22 | if len(msg) < max_chunk - 1: | ||
23 | yield "".join((msg, "\n")) | ||
24 | else: | ||
25 | yield "".join((json.dumps({"chunk-stream": None}), "\n")) | ||
26 | |||
27 | args = [iter(msg)] * (max_chunk - 1) | ||
28 | for m in map("".join, itertools.zip_longest(*args, fillvalue="")): | ||
29 | yield "".join(itertools.chain(m, "\n")) | ||
30 | yield "\n" | ||
31 | |||
32 | |||
33 | class StreamConnection(object): | ||
34 | def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): | ||
35 | self.reader = reader | ||
36 | self.writer = writer | ||
37 | self.timeout = timeout | ||
38 | self.max_chunk = max_chunk | ||
39 | |||
40 | @property | ||
41 | def address(self): | ||
42 | return self.writer.get_extra_info("peername") | ||
43 | |||
44 | async def send_message(self, msg): | ||
45 | for c in chunkify(json.dumps(msg), self.max_chunk): | ||
46 | self.writer.write(c.encode("utf-8")) | ||
47 | await self.writer.drain() | ||
48 | |||
49 | async def recv_message(self): | ||
50 | l = await self.recv() | ||
51 | |||
52 | m = json.loads(l) | ||
53 | if not m: | ||
54 | return m | ||
55 | |||
56 | if "chunk-stream" in m: | ||
57 | lines = [] | ||
58 | while True: | ||
59 | l = await self.recv() | ||
60 | if not l: | ||
61 | break | ||
62 | lines.append(l) | ||
63 | |||
64 | m = json.loads("".join(lines)) | ||
65 | |||
66 | return m | ||
67 | |||
68 | async def send(self, msg): | ||
69 | self.writer.write(("%s\n" % msg).encode("utf-8")) | ||
70 | await self.writer.drain() | ||
71 | |||
72 | async def recv(self): | ||
73 | if self.timeout < 0: | ||
74 | line = await self.reader.readline() | ||
75 | else: | ||
76 | try: | ||
77 | line = await asyncio.wait_for(self.reader.readline(), self.timeout) | ||
78 | except asyncio.TimeoutError: | ||
79 | raise ConnectionError("Timed out waiting for data") | ||
80 | |||
81 | if not line: | ||
82 | raise ConnectionClosedError("Connection closed") | ||
83 | |||
84 | line = line.decode("utf-8") | ||
85 | |||
86 | if not line.endswith("\n"): | ||
87 | raise ConnectionError("Bad message %r" % (line)) | ||
88 | |||
89 | return line.rstrip() | ||
90 | |||
91 | async def close(self): | ||
92 | self.reader = None | ||
93 | if self.writer is not None: | ||
94 | self.writer.close() | ||
95 | self.writer = None | ||