summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/asyncrpc/connection.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/connection.py')
-rw-r--r--bitbake/lib/bb/asyncrpc/connection.py95
1 files changed, 95 insertions, 0 deletions
diff --git a/bitbake/lib/bb/asyncrpc/connection.py b/bitbake/lib/bb/asyncrpc/connection.py
new file mode 100644
index 0000000000..c4fd24754c
--- /dev/null
+++ b/bitbake/lib/bb/asyncrpc/connection.py
@@ -0,0 +1,95 @@
1#
2# Copyright BitBake Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import asyncio
8import itertools
9import json
10from .exceptions import ClientError, ConnectionClosedError
11
12
13# The Python async server defaults to a 64K receive buffer, so we hardcode our
14# maximum chunk size. It would be better if the client and server reported to
15# each other what the maximum chunk sizes were, but that will slow down the
16# connection setup with a round trip delay so I'd rather not do that unless it
17# is necessary
18DEFAULT_MAX_CHUNK = 32 * 1024
19
20
21def chunkify(msg, max_chunk):
22 if len(msg) < max_chunk - 1:
23 yield "".join((msg, "\n"))
24 else:
25 yield "".join((json.dumps({"chunk-stream": None}), "\n"))
26
27 args = [iter(msg)] * (max_chunk - 1)
28 for m in map("".join, itertools.zip_longest(*args, fillvalue="")):
29 yield "".join(itertools.chain(m, "\n"))
30 yield "\n"
31
32
33class StreamConnection(object):
34 def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK):
35 self.reader = reader
36 self.writer = writer
37 self.timeout = timeout
38 self.max_chunk = max_chunk
39
40 @property
41 def address(self):
42 return self.writer.get_extra_info("peername")
43
44 async def send_message(self, msg):
45 for c in chunkify(json.dumps(msg), self.max_chunk):
46 self.writer.write(c.encode("utf-8"))
47 await self.writer.drain()
48
49 async def recv_message(self):
50 l = await self.recv()
51
52 m = json.loads(l)
53 if not m:
54 return m
55
56 if "chunk-stream" in m:
57 lines = []
58 while True:
59 l = await self.recv()
60 if not l:
61 break
62 lines.append(l)
63
64 m = json.loads("".join(lines))
65
66 return m
67
68 async def send(self, msg):
69 self.writer.write(("%s\n" % msg).encode("utf-8"))
70 await self.writer.drain()
71
72 async def recv(self):
73 if self.timeout < 0:
74 line = await self.reader.readline()
75 else:
76 try:
77 line = await asyncio.wait_for(self.reader.readline(), self.timeout)
78 except asyncio.TimeoutError:
79 raise ConnectionError("Timed out waiting for data")
80
81 if not line:
82 raise ConnectionClosedError("Connection closed")
83
84 line = line.decode("utf-8")
85
86 if not line.endswith("\n"):
87 raise ConnectionError("Bad message %r" % (line))
88
89 return line.rstrip()
90
91 async def close(self):
92 self.reader = None
93 if self.writer is not None:
94 self.writer.close()
95 self.writer = None