summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/hashserv/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r--bitbake/lib/hashserv/client.py439
1 files changed, 276 insertions, 163 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index e05c1eb568..0b254beddd 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -3,231 +3,344 @@
3# SPDX-License-Identifier: GPL-2.0-only 3# SPDX-License-Identifier: GPL-2.0-only
4# 4#
5 5
6import asyncio
7import json
8import logging 6import logging
9import socket 7import socket
10import os 8import bb.asyncrpc
11from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client 9import json
10from . import create_async_client
12 11
13 12
14logger = logging.getLogger("hashserv.client") 13logger = logging.getLogger("hashserv.client")
15 14
16 15
17class HashConnectionError(Exception): 16class AsyncClient(bb.asyncrpc.AsyncClient):
18 pass
19
20
21class AsyncClient(object):
22 MODE_NORMAL = 0 17 MODE_NORMAL = 0
23 MODE_GET_STREAM = 1 18 MODE_GET_STREAM = 1
19 MODE_EXIST_STREAM = 2
24 20
25 def __init__(self): 21 def __init__(self, username=None, password=None):
26 self.reader = None 22 super().__init__("OEHASHEQUIV", "1.1", logger)
27 self.writer = None
28 self.mode = self.MODE_NORMAL 23 self.mode = self.MODE_NORMAL
29 self.max_chunk = DEFAULT_MAX_CHUNK 24 self.username = username
30 25 self.password = password
31 async def connect_tcp(self, address, port): 26 self.saved_become_user = None
32 async def connect_sock():
33 return await asyncio.open_connection(address, port)
34
35 self._connect_sock = connect_sock
36
37 async def connect_unix(self, path):
38 async def connect_sock():
39 return await asyncio.open_unix_connection(path)
40
41 self._connect_sock = connect_sock
42
43 async def connect(self):
44 if self.reader is None or self.writer is None:
45 (self.reader, self.writer) = await self._connect_sock()
46
47 self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
48 await self.writer.drain()
49
50 cur_mode = self.mode
51 self.mode = self.MODE_NORMAL
52 await self._set_mode(cur_mode)
53
54 async def close(self):
55 self.reader = None
56
57 if self.writer is not None:
58 self.writer.close()
59 self.writer = None
60
61 async def _send_wrapper(self, proc):
62 count = 0
63 while True:
64 try:
65 await self.connect()
66 return await proc()
67 except (
68 OSError,
69 HashConnectionError,
70 json.JSONDecodeError,
71 UnicodeDecodeError,
72 ) as e:
73 logger.warning("Error talking to server: %s" % e)
74 if count >= 3:
75 if not isinstance(e, HashConnectionError):
76 raise HashConnectionError(str(e))
77 raise e
78 await self.close()
79 count += 1
80
81 async def send_message(self, msg):
82 async def get_line():
83 line = await self.reader.readline()
84 if not line:
85 raise HashConnectionError("Connection closed")
86
87 line = line.decode("utf-8")
88
89 if not line.endswith("\n"):
90 raise HashConnectionError("Bad message %r" % message)
91
92 return line
93 27
28 async def setup_connection(self):
29 await super().setup_connection()
30 self.mode = self.MODE_NORMAL
31 if self.username:
32 # Save off become user temporarily because auth() resets it
33 become = self.saved_become_user
34 await self.auth(self.username, self.password)
35
36 if become:
37 await self.become_user(become)
38
39 async def send_stream(self, mode, msg):
94 async def proc(): 40 async def proc():
95 for c in chunkify(json.dumps(msg), self.max_chunk): 41 await self._set_mode(mode)
96 self.writer.write(c.encode("utf-8")) 42 await self.socket.send(msg)
97 await self.writer.drain() 43 return await self.socket.recv()
98 44
99 l = await get_line() 45 return await self._send_wrapper(proc)
100 46
101 m = json.loads(l) 47 async def invoke(self, *args, **kwargs):
102 if m and "chunk-stream" in m: 48 # It's OK if connection errors cause a failure here, because the mode
103 lines = [] 49 # is also reset to normal on a new connection
104 while True: 50 await self._set_mode(self.MODE_NORMAL)
105 l = (await get_line()).rstrip("\n") 51 return await super().invoke(*args, **kwargs)
106 if not l:
107 break
108 lines.append(l)
109 52
110 m = json.loads("".join(lines)) 53 async def _set_mode(self, new_mode):
54 async def stream_to_normal():
55 await self.socket.send("END")
56 return await self.socket.recv()
111 57
112 return m 58 async def normal_to_stream(command):
59 r = await self.invoke({command: None})
60 if r != "ok":
61 raise ConnectionError(
62 f"Unable to transition to stream mode: Bad response from server {r!r}"
63 )
113 64
114 return await self._send_wrapper(proc) 65 self.logger.debug("Mode is now %s", command)
115 66
116 async def send_stream(self, msg): 67 if new_mode == self.mode:
117 async def proc(): 68 return
118 self.writer.write(("%s\n" % msg).encode("utf-8"))
119 await self.writer.drain()
120 l = await self.reader.readline()
121 if not l:
122 raise HashConnectionError("Connection closed")
123 return l.decode("utf-8").rstrip()
124 69
125 return await self._send_wrapper(proc) 70 self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode)
126 71
127 async def _set_mode(self, new_mode): 72 # Always transition to normal mode before switching to any other mode
128 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 73 if self.mode != self.MODE_NORMAL:
129 r = await self.send_stream("END") 74 r = await self._send_wrapper(stream_to_normal)
130 if r != "ok": 75 if r != "ok":
131 raise HashConnectionError("Bad response from server %r" % r) 76 self.check_invoke_error(r)
132 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 77 raise ConnectionError(
133 r = await self.send_message({"get-stream": None}) 78 f"Unable to transition to normal mode: Bad response from server {r!r}"
134 if r != "ok": 79 )
135 raise HashConnectionError("Bad response from server %r" % r) 80 self.logger.debug("Mode is now normal")
136 elif new_mode != self.mode: 81
137 raise Exception( 82 if new_mode == self.MODE_GET_STREAM:
138 "Undefined mode transition %r -> %r" % (self.mode, new_mode) 83 await normal_to_stream("get-stream")
139 ) 84 elif new_mode == self.MODE_EXIST_STREAM:
85 await normal_to_stream("exists-stream")
86 elif new_mode != self.MODE_NORMAL:
87 raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}")
140 88
141 self.mode = new_mode 89 self.mode = new_mode
142 90
143 async def get_unihash(self, method, taskhash): 91 async def get_unihash(self, method, taskhash):
144 await self._set_mode(self.MODE_GET_STREAM) 92 r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash))
145 r = await self.send_stream("%s %s" % (method, taskhash))
146 if not r: 93 if not r:
147 return None 94 return None
148 return r 95 return r
149 96
150 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 97 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
151 await self._set_mode(self.MODE_NORMAL)
152 m = extra.copy() 98 m = extra.copy()
153 m["taskhash"] = taskhash 99 m["taskhash"] = taskhash
154 m["method"] = method 100 m["method"] = method
155 m["outhash"] = outhash 101 m["outhash"] = outhash
156 m["unihash"] = unihash 102 m["unihash"] = unihash
157 return await self.send_message({"report": m}) 103 return await self.invoke({"report": m})
158 104
159 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 105 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
160 await self._set_mode(self.MODE_NORMAL)
161 m = extra.copy() 106 m = extra.copy()
162 m["taskhash"] = taskhash 107 m["taskhash"] = taskhash
163 m["method"] = method 108 m["method"] = method
164 m["unihash"] = unihash 109 m["unihash"] = unihash
165 return await self.send_message({"report-equiv": m}) 110 return await self.invoke({"report-equiv": m})
166 111
167 async def get_taskhash(self, method, taskhash, all_properties=False): 112 async def get_taskhash(self, method, taskhash, all_properties=False):
168 await self._set_mode(self.MODE_NORMAL) 113 return await self.invoke(
169 return await self.send_message(
170 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 114 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
171 ) 115 )
172 116
173 async def get_outhash(self, method, outhash, taskhash): 117 async def unihash_exists(self, unihash):
174 await self._set_mode(self.MODE_NORMAL) 118 r = await self.send_stream(self.MODE_EXIST_STREAM, unihash)
175 return await self.send_message( 119 return r == "true"
176 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method}} 120
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 return await self.invoke(
123 {
124 "get-outhash": {
125 "outhash": outhash,
126 "taskhash": taskhash,
127 "method": method,
128 "with_unihash": with_unihash,
129 }
130 }
177 ) 131 )
178 132
179 async def get_stats(self): 133 async def get_stats(self):
180 await self._set_mode(self.MODE_NORMAL) 134 return await self.invoke({"get-stats": None})
181 return await self.send_message({"get-stats": None})
182 135
183 async def reset_stats(self): 136 async def reset_stats(self):
184 await self._set_mode(self.MODE_NORMAL) 137 return await self.invoke({"reset-stats": None})
185 return await self.send_message({"reset-stats": None})
186 138
187 async def backfill_wait(self): 139 async def backfill_wait(self):
188 await self._set_mode(self.MODE_NORMAL) 140 return (await self.invoke({"backfill-wait": None}))["tasks"]
189 return (await self.send_message({"backfill-wait": None}))["tasks"] 141
142 async def remove(self, where):
143 return await self.invoke({"remove": {"where": where}})
144
145 async def clean_unused(self, max_age):
146 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
147
148 async def auth(self, username, token):
149 result = await self.invoke({"auth": {"username": username, "token": token}})
150 self.username = username
151 self.password = token
152 self.saved_become_user = None
153 return result
154
155 async def refresh_token(self, username=None):
156 m = {}
157 if username:
158 m["username"] = username
159 result = await self.invoke({"refresh-token": m})
160 if (
161 self.username
162 and not self.saved_become_user
163 and result["username"] == self.username
164 ):
165 self.password = result["token"]
166 return result
167
168 async def set_user_perms(self, username, permissions):
169 return await self.invoke(
170 {"set-user-perms": {"username": username, "permissions": permissions}}
171 )
172
173 async def get_user(self, username=None):
174 m = {}
175 if username:
176 m["username"] = username
177 return await self.invoke({"get-user": m})
178
179 async def get_all_users(self):
180 return (await self.invoke({"get-all-users": {}}))["users"]
181
182 async def new_user(self, username, permissions):
183 return await self.invoke(
184 {"new-user": {"username": username, "permissions": permissions}}
185 )
186
187 async def delete_user(self, username):
188 return await self.invoke({"delete-user": {"username": username}})
189
190 async def become_user(self, username):
191 result = await self.invoke({"become-user": {"username": username}})
192 if username == self.username:
193 self.saved_become_user = None
194 else:
195 self.saved_become_user = username
196 return result
197
198 async def get_db_usage(self):
199 return (await self.invoke({"get-db-usage": {}}))["usage"]
200
201 async def get_db_query_columns(self):
202 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
203
204 async def gc_status(self):
205 return await self.invoke({"gc-status": {}})
206
207 async def gc_mark(self, mark, where):
208 """
209 Starts a new garbage collection operation identified by "mark". If
210 garbage collection is already in progress with "mark", the collection
211 is continued.
212
213 All unihash entries that match the "where" clause are marked to be
214 kept. In addition, any new entries added to the database after this
215 command will be automatically marked with "mark"
216 """
217 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
190 218
219 async def gc_sweep(self, mark):
220 """
221 Finishes garbage collection for "mark". All unihash entries that have
222 not been marked will be deleted.
191 223
192class Client(object): 224 It is recommended to clean unused outhash entries after running this to
193 def __init__(self): 225 cleanup any dangling outhashes
194 self.client = AsyncClient() 226 """
195 self.loop = asyncio.new_event_loop() 227 return await self.invoke({"gc-sweep": {"mark": mark}})
196 228
197 for call in ( 229
230class Client(bb.asyncrpc.Client):
231 def __init__(self, username=None, password=None):
232 self.username = username
233 self.password = password
234
235 super().__init__()
236 self._add_methods(
198 "connect_tcp", 237 "connect_tcp",
199 "close", 238 "connect_websocket",
200 "get_unihash", 239 "get_unihash",
201 "report_unihash", 240 "report_unihash",
202 "report_unihash_equiv", 241 "report_unihash_equiv",
203 "get_taskhash", 242 "get_taskhash",
243 "unihash_exists",
244 "get_outhash",
204 "get_stats", 245 "get_stats",
205 "reset_stats", 246 "reset_stats",
206 "backfill_wait", 247 "backfill_wait",
207 ): 248 "remove",
208 downcall = getattr(self.client, call) 249 "clean_unused",
209 setattr(self, call, self._get_downcall_wrapper(downcall)) 250 "auth",
210 251 "refresh_token",
211 def _get_downcall_wrapper(self, downcall): 252 "set_user_perms",
212 def wrapper(*args, **kwargs): 253 "get_user",
213 return self.loop.run_until_complete(downcall(*args, **kwargs)) 254 "get_all_users",
214 255 "new_user",
215 return wrapper 256 "delete_user",
216 257 "become_user",
217 def connect_unix(self, path): 258 "get_db_usage",
218 # AF_UNIX has path length issues so chdir here to workaround 259 "get_db_query_columns",
219 cwd = os.getcwd() 260 "gc_status",
220 try: 261 "gc_mark",
221 os.chdir(os.path.dirname(path)) 262 "gc_sweep",
222 self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path))) 263 )
223 self.loop.run_until_complete(self.client.connect()) 264
224 finally: 265 def _get_async_client(self):
225 os.chdir(cwd) 266 return AsyncClient(self.username, self.password)
226 267
227 @property 268
228 def max_chunk(self): 269class ClientPool(bb.asyncrpc.ClientPool):
229 return self.client.max_chunk 270 def __init__(
230 271 self,
231 @max_chunk.setter 272 address,
232 def max_chunk(self, value): 273 max_clients,
233 self.client.max_chunk = value 274 *,
275 username=None,
276 password=None,
277 become=None,
278 ):
279 super().__init__(max_clients)
280 self.address = address
281 self.username = username
282 self.password = password
283 self.become = become
284
285 async def _new_client(self):
286 client = await create_async_client(
287 self.address,
288 username=self.username,
289 password=self.password,
290 )
291 if self.become:
292 await client.become_user(self.become)
293 return client
294
295 def _run_key_tasks(self, queries, call):
296 results = {key: None for key in queries.keys()}
297
298 def make_task(key, args):
299 async def task(client):
300 nonlocal results
301 unihash = await call(client, args)
302 results[key] = unihash
303
304 return task
305
306 def gen_tasks():
307 for key, args in queries.items():
308 yield make_task(key, args)
309
310 self.run_tasks(gen_tasks())
311 return results
312
313 def get_unihashes(self, queries):
314 """
315 Query multiple unihashes in parallel.
316
317 The queries argument is a dictionary with arbitrary key. The values
318 must be a tuple of (method, taskhash).
319
320 Returns a dictionary with a corresponding key for each input key, and
321 the value is the queried unihash (which might be none if the query
322 failed)
323 """
324
325 async def call(client, args):
326 method, taskhash = args
327 return await client.get_unihash(method, taskhash)
328
329 return self._run_key_tasks(queries, call)
330
331 def unihashes_exist(self, queries):
332 """
333 Query multiple unihash existence checks in parallel.
334
335 The queries argument is a dictionary with arbitrary key. The values
336 must be a unihash.
337
338 Returns a dictionary with a corresponding key for each input key, and
339 the value is True or False if the unihash is known by the server (or
340 None if there was a failure)
341 """
342
343 async def call(client, unihash):
344 return await client.unihash_exists(unihash)
345
346 return self._run_key_tasks(queries, call)