summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/client.py
diff options
context:
space:
mode:
authorPaul Barker <pbarker@konsulko.com>2021-04-26 09:16:30 +0100
committerRichard Purdie <richard.purdie@linuxfoundation.org>2021-04-27 15:12:57 +0100
commit421e86e7edadb8c88baf4df68b9fc15671e425de (patch)
tree4535af55064f7de8f41929085718e88bcd1fc15e /bitbake/lib/hashserv/client.py
parent244b044fd6d94c000fc9cb8d1b7a9dddd08017ad (diff)
downloadpoky-421e86e7edadb8c88baf4df68b9fc15671e425de.tar.gz
bitbake: hashserv: Refactor to use asyncrpc
The asyncrpc module can now be used to provide the json & asyncio based RPC system used by hashserv. (Bitbake rev: 5afb9586b0a4a23a05efb0e8ff4a97262631ae4a) Signed-off-by: Paul Barker <pbarker@konsulko.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r--bitbake/lib/hashserv/client.py137
1 files changed, 16 insertions, 121 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index f370cba63f..5311709677 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -8,106 +8,26 @@ import json
8import logging 8import logging
9import socket 9import socket
10import os 10import os
11from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client 11import bb.asyncrpc
12from . import create_async_client
12 13
13 14
14logger = logging.getLogger("hashserv.client") 15logger = logging.getLogger("hashserv.client")
15 16
16 17
17class AsyncClient(object): 18class AsyncClient(bb.asyncrpc.AsyncClient):
18 MODE_NORMAL = 0 19 MODE_NORMAL = 0
19 MODE_GET_STREAM = 1 20 MODE_GET_STREAM = 1
20 21
21 def __init__(self): 22 def __init__(self):
22 self.reader = None 23 super().__init__('OEHASHEQUIV', '1.1', logger)
23 self.writer = None
24 self.mode = self.MODE_NORMAL 24 self.mode = self.MODE_NORMAL
25 self.max_chunk = DEFAULT_MAX_CHUNK
26 25
27 async def connect_tcp(self, address, port): 26 async def setup_connection(self):
28 async def connect_sock(): 27 await super().setup_connection()
29 return await asyncio.open_connection(address, port) 28 cur_mode = self.mode
30 29 self.mode = self.MODE_NORMAL
31 self._connect_sock = connect_sock 30 await self._set_mode(cur_mode)
32
33 async def connect_unix(self, path):
34 async def connect_sock():
35 return await asyncio.open_unix_connection(path)
36
37 self._connect_sock = connect_sock
38
39 async def connect(self):
40 if self.reader is None or self.writer is None:
41 (self.reader, self.writer) = await self._connect_sock()
42
43 self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
44 await self.writer.drain()
45
46 cur_mode = self.mode
47 self.mode = self.MODE_NORMAL
48 await self._set_mode(cur_mode)
49
50 async def close(self):
51 self.reader = None
52
53 if self.writer is not None:
54 self.writer.close()
55 self.writer = None
56
57 async def _send_wrapper(self, proc):
58 count = 0
59 while True:
60 try:
61 await self.connect()
62 return await proc()
63 except (
64 OSError,
65 ConnectionError,
66 json.JSONDecodeError,
67 UnicodeDecodeError,
68 ) as e:
69 logger.warning("Error talking to server: %s" % e)
70 if count >= 3:
71 if not isinstance(e, ConnectionError):
72 raise ConnectionError(str(e))
73 raise e
74 await self.close()
75 count += 1
76
77 async def send_message(self, msg):
78 async def get_line():
79 line = await self.reader.readline()
80 if not line:
81 raise ConnectionError("Connection closed")
82
83 line = line.decode("utf-8")
84
85 if not line.endswith("\n"):
86 raise ConnectionError("Bad message %r" % message)
87
88 return line
89
90 async def proc():
91 for c in chunkify(json.dumps(msg), self.max_chunk):
92 self.writer.write(c.encode("utf-8"))
93 await self.writer.drain()
94
95 l = await get_line()
96
97 m = json.loads(l)
98 if m and "chunk-stream" in m:
99 lines = []
100 while True:
101 l = (await get_line()).rstrip("\n")
102 if not l:
103 break
104 lines.append(l)
105
106 m = json.loads("".join(lines))
107
108 return m
109
110 return await self._send_wrapper(proc)
111 31
112 async def send_stream(self, msg): 32 async def send_stream(self, msg):
113 async def proc(): 33 async def proc():
@@ -185,12 +105,10 @@ class AsyncClient(object):
185 return (await self.send_message({"backfill-wait": None}))["tasks"] 105 return (await self.send_message({"backfill-wait": None}))["tasks"]
186 106
187 107
188class Client(object): 108class Client(bb.asyncrpc.Client):
189 def __init__(self): 109 def __init__(self):
190 self.client = AsyncClient() 110 super().__init__()
191 self.loop = asyncio.new_event_loop() 111 self._add_methods(
192
193 for call in (
194 "connect_tcp", 112 "connect_tcp",
195 "close", 113 "close",
196 "get_unihash", 114 "get_unihash",
@@ -200,30 +118,7 @@ class Client(object):
200 "get_stats", 118 "get_stats",
201 "reset_stats", 119 "reset_stats",
202 "backfill_wait", 120 "backfill_wait",
203 ): 121 )
204 downcall = getattr(self.client, call) 122
205 setattr(self, call, self._get_downcall_wrapper(downcall)) 123 def _get_async_client(self):
206 124 return AsyncClient()
207 def _get_downcall_wrapper(self, downcall):
208 def wrapper(*args, **kwargs):
209 return self.loop.run_until_complete(downcall(*args, **kwargs))
210
211 return wrapper
212
213 def connect_unix(self, path):
214 # AF_UNIX has path length issues so chdir here to workaround
215 cwd = os.getcwd()
216 try:
217 os.chdir(os.path.dirname(path))
218 self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
219 self.loop.run_until_complete(self.client.connect())
220 finally:
221 os.chdir(cwd)
222
223 @property
224 def max_chunk(self):
225 return self.client.max_chunk
226
227 @max_chunk.setter
228 def max_chunk(self, value):
229 self.client.max_chunk = value