summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2020-11-10 08:59:55 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2020-11-24 15:26:12 +0000
commit859f43e176dcaaa652e24a2289abd75e18c077cf (patch)
treeb515cd85fddd9ef20ad77ba0e6e2a340d6e1c517
parent451af0105bc934c6be239a79821193139e49ab1a (diff)
downloadpoky-859f43e176dcaaa652e24a2289abd75e18c077cf.tar.gz
bitbake: bitbake: hashserve: Add async client
Adds support for create a client that operates using Python asynchronous I/O. (Bitbake rev: cf9bc0310b0092bf52b61057405aeb51c86ba137) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--bitbake/lib/hashserv/__init__.py13
-rw-r--r--bitbake/lib/hashserv/client.py238
2 files changed, 143 insertions, 108 deletions
diff --git a/bitbake/lib/hashserv/__init__.py b/bitbake/lib/hashserv/__init__.py
index f95e8f43f1..622ca17a91 100644
--- a/bitbake/lib/hashserv/__init__.py
+++ b/bitbake/lib/hashserv/__init__.py
@@ -3,6 +3,7 @@
3# SPDX-License-Identifier: GPL-2.0-only 3# SPDX-License-Identifier: GPL-2.0-only
4# 4#
5 5
6import asyncio
6from contextlib import closing 7from contextlib import closing
7import re 8import re
8import sqlite3 9import sqlite3
@@ -113,3 +114,15 @@ def create_client(addr):
113 c.connect_tcp(*a) 114 c.connect_tcp(*a)
114 115
115 return c 116 return c
117
118async def create_async_client(addr):
119 from . import client
120 c = client.AsyncClient()
121
122 (typ, a) = parse_address(addr)
123 if typ == ADDR_TYPE_UNIX:
124 await c.connect_unix(*a)
125 else:
126 await c.connect_tcp(*a)
127
128 return c
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index a29af836d9..d0b3cf3863 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -3,189 +3,211 @@
3# SPDX-License-Identifier: GPL-2.0-only 3# SPDX-License-Identifier: GPL-2.0-only
4# 4#
5 5
6import asyncio
6import json 7import json
7import logging 8import logging
8import socket 9import socket
9import os 10import os
10from . import chunkify, DEFAULT_MAX_CHUNK 11from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
11 12
12 13
13logger = logging.getLogger('hashserv.client') 14logger = logging.getLogger("hashserv.client")
14 15
15 16
16class HashConnectionError(Exception): 17class HashConnectionError(Exception):
17 pass 18 pass
18 19
19 20
20class Client(object): 21class AsyncClient(object):
21 MODE_NORMAL = 0 22 MODE_NORMAL = 0
22 MODE_GET_STREAM = 1 23 MODE_GET_STREAM = 1
23 24
24 def __init__(self): 25 def __init__(self):
25 self._socket = None
26 self.reader = None 26 self.reader = None
27 self.writer = None 27 self.writer = None
28 self.mode = self.MODE_NORMAL 28 self.mode = self.MODE_NORMAL
29 self.max_chunk = DEFAULT_MAX_CHUNK 29 self.max_chunk = DEFAULT_MAX_CHUNK
30 30
31 def connect_tcp(self, address, port): 31 async def connect_tcp(self, address, port):
32 def connect_sock(): 32 async def connect_sock():
33 s = socket.create_connection((address, port)) 33 return await asyncio.open_connection(address, port)
34
35 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
36 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
37 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
38 return s
39 34
40 self._connect_sock = connect_sock 35 self._connect_sock = connect_sock
41 36
42 def connect_unix(self, path): 37 async def connect_unix(self, path):
43 def connect_sock(): 38 async def connect_sock():
44 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 39 return await asyncio.open_unix_connection(path)
45 # AF_UNIX has path length issues so chdir here to workaround
46 cwd = os.getcwd()
47 try:
48 os.chdir(os.path.dirname(path))
49 s.connect(os.path.basename(path))
50 finally:
51 os.chdir(cwd)
52 return s
53 40
54 self._connect_sock = connect_sock 41 self._connect_sock = connect_sock
55 42
56 def connect(self): 43 async def _connect(self):
57 if self._socket is None: 44 if self.reader is None or self.writer is None:
58 self._socket = self._connect_sock() 45 (self.reader, self.writer) = await self._connect_sock()
59
60 self.reader = self._socket.makefile('r', encoding='utf-8')
61 self.writer = self._socket.makefile('w', encoding='utf-8')
62 46
63 self.writer.write('OEHASHEQUIV 1.1\n\n') 47 self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
64 self.writer.flush() 48 await self.writer.drain()
65 49
66 # Restore mode if the socket is being re-created
67 cur_mode = self.mode 50 cur_mode = self.mode
68 self.mode = self.MODE_NORMAL 51 self.mode = self.MODE_NORMAL
69 self._set_mode(cur_mode) 52 await self._set_mode(cur_mode)
70 53
71 return self._socket 54 async def close(self):
55 self.reader = None
72 56
73 def close(self): 57 if self.writer is not None:
74 if self._socket is not None: 58 self.writer.close()
75 self._socket.close()
76 self._socket = None
77 self.reader = None
78 self.writer = None 59 self.writer = None
79 60
80 def _send_wrapper(self, proc): 61 async def _send_wrapper(self, proc):
81 count = 0 62 count = 0
82 while True: 63 while True:
83 try: 64 try:
84 self.connect() 65 await self._connect()
85 return proc() 66 return await proc()
86 except (OSError, HashConnectionError, json.JSONDecodeError, UnicodeDecodeError) as e: 67 except (
87 logger.warning('Error talking to server: %s' % e) 68 OSError,
69 HashConnectionError,
70 json.JSONDecodeError,
71 UnicodeDecodeError,
72 ) as e:
73 logger.warning("Error talking to server: %s" % e)
88 if count >= 3: 74 if count >= 3:
89 if not isinstance(e, HashConnectionError): 75 if not isinstance(e, HashConnectionError):
90 raise HashConnectionError(str(e)) 76 raise HashConnectionError(str(e))
91 raise e 77 raise e
92 self.close() 78 await self.close()
93 count += 1 79 count += 1
94 80
95 def send_message(self, msg): 81 async def send_message(self, msg):
96 def get_line(): 82 async def get_line():
97 line = self.reader.readline() 83 line = await self.reader.readline()
98 if not line: 84 if not line:
99 raise HashConnectionError('Connection closed') 85 raise HashConnectionError("Connection closed")
86
87 line = line.decode("utf-8")
100 88
101 if not line.endswith('\n'): 89 if not line.endswith("\n"):
102 raise HashConnectionError('Bad message %r' % message) 90 raise HashConnectionError("Bad message %r" % message)
103 91
104 return line 92 return line
105 93
106 def proc(): 94 async def proc():
107 for c in chunkify(json.dumps(msg), self.max_chunk): 95 for c in chunkify(json.dumps(msg), self.max_chunk):
108 self.writer.write(c) 96 self.writer.write(c.encode("utf-8"))
109 self.writer.flush() 97 await self.writer.drain()
110 98
111 l = get_line() 99 l = await get_line()
112 100
113 m = json.loads(l) 101 m = json.loads(l)
114 if 'chunk-stream' in m: 102 if "chunk-stream" in m:
115 lines = [] 103 lines = []
116 while True: 104 while True:
117 l = get_line().rstrip('\n') 105 l = (await get_line()).rstrip("\n")
118 if not l: 106 if not l:
119 break 107 break
120 lines.append(l) 108 lines.append(l)
121 109
122 m = json.loads(''.join(lines)) 110 m = json.loads("".join(lines))
123 111
124 return m 112 return m
125 113
126 return self._send_wrapper(proc) 114 return await self._send_wrapper(proc)
127 115
128 def send_stream(self, msg): 116 async def send_stream(self, msg):
129 def proc(): 117 async def proc():
130 self.writer.write("%s\n" % msg) 118 self.writer.write(("%s\n" % msg).encode("utf-8"))
131 self.writer.flush() 119 await self.writer.drain()
132 l = self.reader.readline() 120 l = await self.reader.readline()
133 if not l: 121 if not l:
134 raise HashConnectionError('Connection closed') 122 raise HashConnectionError("Connection closed")
135 return l.rstrip() 123 return l.decode("utf-8").rstrip()
136 124
137 return self._send_wrapper(proc) 125 return await self._send_wrapper(proc)
138 126
139 def _set_mode(self, new_mode): 127 async def _set_mode(self, new_mode):
140 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 128 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
141 r = self.send_stream('END') 129 r = await self.send_stream("END")
142 if r != 'ok': 130 if r != "ok":
143 raise HashConnectionError('Bad response from server %r' % r) 131 raise HashConnectionError("Bad response from server %r" % r)
144 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 132 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
145 r = self.send_message({'get-stream': None}) 133 r = await self.send_message({"get-stream": None})
146 if r != 'ok': 134 if r != "ok":
147 raise HashConnectionError('Bad response from server %r' % r) 135 raise HashConnectionError("Bad response from server %r" % r)
148 elif new_mode != self.mode: 136 elif new_mode != self.mode:
149 raise Exception('Undefined mode transition %r -> %r' % (self.mode, new_mode)) 137 raise Exception(
138 "Undefined mode transition %r -> %r" % (self.mode, new_mode)
139 )
150 140
151 self.mode = new_mode 141 self.mode = new_mode
152 142
153 def get_unihash(self, method, taskhash): 143 async def get_unihash(self, method, taskhash):
154 self._set_mode(self.MODE_GET_STREAM) 144 await self._set_mode(self.MODE_GET_STREAM)
155 r = self.send_stream('%s %s' % (method, taskhash)) 145 r = await self.send_stream("%s %s" % (method, taskhash))
156 if not r: 146 if not r:
157 return None 147 return None
158 return r 148 return r
159 149
160 def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 150 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
161 self._set_mode(self.MODE_NORMAL) 151 await self._set_mode(self.MODE_NORMAL)
162 m = extra.copy() 152 m = extra.copy()
163 m['taskhash'] = taskhash 153 m["taskhash"] = taskhash
164 m['method'] = method 154 m["method"] = method
165 m['outhash'] = outhash 155 m["outhash"] = outhash
166 m['unihash'] = unihash 156 m["unihash"] = unihash
167 return self.send_message({'report': m}) 157 return await self.send_message({"report": m})
168 158
169 def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 159 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
170 self._set_mode(self.MODE_NORMAL) 160 await self._set_mode(self.MODE_NORMAL)
171 m = extra.copy() 161 m = extra.copy()
172 m['taskhash'] = taskhash 162 m["taskhash"] = taskhash
173 m['method'] = method 163 m["method"] = method
174 m['unihash'] = unihash 164 m["unihash"] = unihash
175 return self.send_message({'report-equiv': m}) 165 return await self.send_message({"report-equiv": m})
176 166
177 def get_taskhash(self, method, taskhash, all_properties=False): 167 async def get_taskhash(self, method, taskhash, all_properties=False):
178 self._set_mode(self.MODE_NORMAL) 168 await self._set_mode(self.MODE_NORMAL)
179 return self.send_message({'get': { 169 return await self.send_message(
180 'taskhash': taskhash, 170 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
181 'method': method, 171 )
182 'all': all_properties 172
183 }}) 173 async def get_stats(self):
184 174 await self._set_mode(self.MODE_NORMAL)
185 def get_stats(self): 175 return await self.send_message({"get-stats": None})
186 self._set_mode(self.MODE_NORMAL) 176
187 return self.send_message({'get-stats': None}) 177 async def reset_stats(self):
188 178 await self._set_mode(self.MODE_NORMAL)
189 def reset_stats(self): 179 return await self.send_message({"reset-stats": None})
190 self._set_mode(self.MODE_NORMAL) 180
191 return self.send_message({'reset-stats': None}) 181
182class Client(object):
183 def __init__(self):
184 self.client = AsyncClient()
185 self.loop = asyncio.new_event_loop()
186
187 def get_wrapper(self, downcall):
188 def wrapper(*args, **kwargs):
189 return self.loop.run_until_complete(downcall(*args, **kwargs))
190
191 return wrapper
192
193 for call in (
194 "connect_tcp",
195 "connect_unix",
196 "close",
197 "get_unihash",
198 "report_unihash",
199 "report_unihash_equiv",
200 "get_taskhash",
201 "get_stats",
202 "reset_stats",
203 ):
204 downcall = getattr(self.client, call)
205 setattr(self, call, get_wrapper(self, downcall))
206
207 @property
208 def max_chunk(self):
209 return self.client.max_chunk
210
211 @max_chunk.setter
212 def max_chunk(self, value):
213 self.client.max_chunk = value