diff options
Diffstat (limited to 'bitbake/lib/hashserv/client.py')
-rw-r--r-- | bitbake/lib/hashserv/client.py | 439 |
1 files changed, 276 insertions, 163 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index e05c1eb568..0b254beddd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -3,231 +3,344 @@ | |||
3 | # SPDX-License-Identifier: GPL-2.0-only | 3 | # SPDX-License-Identifier: GPL-2.0-only |
4 | # | 4 | # |
5 | 5 | ||
6 | import asyncio | ||
7 | import json | ||
8 | import logging | 6 | import logging |
9 | import socket | 7 | import socket |
10 | import os | 8 | import bb.asyncrpc |
11 | from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client | 9 | import json |
10 | from . import create_async_client | ||
12 | 11 | ||
13 | 12 | ||
14 | logger = logging.getLogger("hashserv.client") | 13 | logger = logging.getLogger("hashserv.client") |
15 | 14 | ||
16 | 15 | ||
17 | class HashConnectionError(Exception): | 16 | class AsyncClient(bb.asyncrpc.AsyncClient): |
18 | pass | ||
19 | |||
20 | |||
21 | class AsyncClient(object): | ||
22 | MODE_NORMAL = 0 | 17 | MODE_NORMAL = 0 |
23 | MODE_GET_STREAM = 1 | 18 | MODE_GET_STREAM = 1 |
19 | MODE_EXIST_STREAM = 2 | ||
24 | 20 | ||
25 | def __init__(self): | 21 | def __init__(self, username=None, password=None): |
26 | self.reader = None | 22 | super().__init__("OEHASHEQUIV", "1.1", logger) |
27 | self.writer = None | ||
28 | self.mode = self.MODE_NORMAL | 23 | self.mode = self.MODE_NORMAL |
29 | self.max_chunk = DEFAULT_MAX_CHUNK | 24 | self.username = username |
30 | 25 | self.password = password | |
31 | async def connect_tcp(self, address, port): | 26 | self.saved_become_user = None |
32 | async def connect_sock(): | ||
33 | return await asyncio.open_connection(address, port) | ||
34 | |||
35 | self._connect_sock = connect_sock | ||
36 | |||
37 | async def connect_unix(self, path): | ||
38 | async def connect_sock(): | ||
39 | return await asyncio.open_unix_connection(path) | ||
40 | |||
41 | self._connect_sock = connect_sock | ||
42 | |||
43 | async def connect(self): | ||
44 | if self.reader is None or self.writer is None: | ||
45 | (self.reader, self.writer) = await self._connect_sock() | ||
46 | |||
47 | self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8")) | ||
48 | await self.writer.drain() | ||
49 | |||
50 | cur_mode = self.mode | ||
51 | self.mode = self.MODE_NORMAL | ||
52 | await self._set_mode(cur_mode) | ||
53 | |||
54 | async def close(self): | ||
55 | self.reader = None | ||
56 | |||
57 | if self.writer is not None: | ||
58 | self.writer.close() | ||
59 | self.writer = None | ||
60 | |||
61 | async def _send_wrapper(self, proc): | ||
62 | count = 0 | ||
63 | while True: | ||
64 | try: | ||
65 | await self.connect() | ||
66 | return await proc() | ||
67 | except ( | ||
68 | OSError, | ||
69 | HashConnectionError, | ||
70 | json.JSONDecodeError, | ||
71 | UnicodeDecodeError, | ||
72 | ) as e: | ||
73 | logger.warning("Error talking to server: %s" % e) | ||
74 | if count >= 3: | ||
75 | if not isinstance(e, HashConnectionError): | ||
76 | raise HashConnectionError(str(e)) | ||
77 | raise e | ||
78 | await self.close() | ||
79 | count += 1 | ||
80 | |||
81 | async def send_message(self, msg): | ||
82 | async def get_line(): | ||
83 | line = await self.reader.readline() | ||
84 | if not line: | ||
85 | raise HashConnectionError("Connection closed") | ||
86 | |||
87 | line = line.decode("utf-8") | ||
88 | |||
89 | if not line.endswith("\n"): | ||
90 | raise HashConnectionError("Bad message %r" % message) | ||
91 | |||
92 | return line | ||
93 | 27 | ||
28 | async def setup_connection(self): | ||
29 | await super().setup_connection() | ||
30 | self.mode = self.MODE_NORMAL | ||
31 | if self.username: | ||
32 | # Save off become user temporarily because auth() resets it | ||
33 | become = self.saved_become_user | ||
34 | await self.auth(self.username, self.password) | ||
35 | |||
36 | if become: | ||
37 | await self.become_user(become) | ||
38 | |||
39 | async def send_stream(self, mode, msg): | ||
94 | async def proc(): | 40 | async def proc(): |
95 | for c in chunkify(json.dumps(msg), self.max_chunk): | 41 | await self._set_mode(mode) |
96 | self.writer.write(c.encode("utf-8")) | 42 | await self.socket.send(msg) |
97 | await self.writer.drain() | 43 | return await self.socket.recv() |
98 | 44 | ||
99 | l = await get_line() | 45 | return await self._send_wrapper(proc) |
100 | 46 | ||
101 | m = json.loads(l) | 47 | async def invoke(self, *args, **kwargs): |
102 | if m and "chunk-stream" in m: | 48 | # It's OK if connection errors cause a failure here, because the mode |
103 | lines = [] | 49 | # is also reset to normal on a new connection |
104 | while True: | 50 | await self._set_mode(self.MODE_NORMAL) |
105 | l = (await get_line()).rstrip("\n") | 51 | return await super().invoke(*args, **kwargs) |
106 | if not l: | ||
107 | break | ||
108 | lines.append(l) | ||
109 | 52 | ||
110 | m = json.loads("".join(lines)) | 53 | async def _set_mode(self, new_mode): |
54 | async def stream_to_normal(): | ||
55 | await self.socket.send("END") | ||
56 | return await self.socket.recv() | ||
111 | 57 | ||
112 | return m | 58 | async def normal_to_stream(command): |
59 | r = await self.invoke({command: None}) | ||
60 | if r != "ok": | ||
61 | raise ConnectionError( | ||
62 | f"Unable to transition to stream mode: Bad response from server {r!r}" | ||
63 | ) | ||
113 | 64 | ||
114 | return await self._send_wrapper(proc) | 65 | self.logger.debug("Mode is now %s", command) |
115 | 66 | ||
116 | async def send_stream(self, msg): | 67 | if new_mode == self.mode: |
117 | async def proc(): | 68 | return |
118 | self.writer.write(("%s\n" % msg).encode("utf-8")) | ||
119 | await self.writer.drain() | ||
120 | l = await self.reader.readline() | ||
121 | if not l: | ||
122 | raise HashConnectionError("Connection closed") | ||
123 | return l.decode("utf-8").rstrip() | ||
124 | 69 | ||
125 | return await self._send_wrapper(proc) | 70 | self.logger.debug("Transitioning mode %s -> %s", self.mode, new_mode) |
126 | 71 | ||
127 | async def _set_mode(self, new_mode): | 72 | # Always transition to normal mode before switching to any other mode |
128 | if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: | 73 | if self.mode != self.MODE_NORMAL: |
129 | r = await self.send_stream("END") | 74 | r = await self._send_wrapper(stream_to_normal) |
130 | if r != "ok": | 75 | if r != "ok": |
131 | raise HashConnectionError("Bad response from server %r" % r) | 76 | self.check_invoke_error(r) |
132 | elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: | 77 | raise ConnectionError( |
133 | r = await self.send_message({"get-stream": None}) | 78 | f"Unable to transition to normal mode: Bad response from server {r!r}" |
134 | if r != "ok": | 79 | ) |
135 | raise HashConnectionError("Bad response from server %r" % r) | 80 | self.logger.debug("Mode is now normal") |
136 | elif new_mode != self.mode: | 81 | |
137 | raise Exception( | 82 | if new_mode == self.MODE_GET_STREAM: |
138 | "Undefined mode transition %r -> %r" % (self.mode, new_mode) | 83 | await normal_to_stream("get-stream") |
139 | ) | 84 | elif new_mode == self.MODE_EXIST_STREAM: |
85 | await normal_to_stream("exists-stream") | ||
86 | elif new_mode != self.MODE_NORMAL: | ||
87 | raise Exception("Undefined mode transition {self.mode!r} -> {new_mode!r}") | ||
140 | 88 | ||
141 | self.mode = new_mode | 89 | self.mode = new_mode |
142 | 90 | ||
143 | async def get_unihash(self, method, taskhash): | 91 | async def get_unihash(self, method, taskhash): |
144 | await self._set_mode(self.MODE_GET_STREAM) | 92 | r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) |
145 | r = await self.send_stream("%s %s" % (method, taskhash)) | ||
146 | if not r: | 93 | if not r: |
147 | return None | 94 | return None |
148 | return r | 95 | return r |
149 | 96 | ||
150 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): | 97 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): |
151 | await self._set_mode(self.MODE_NORMAL) | ||
152 | m = extra.copy() | 98 | m = extra.copy() |
153 | m["taskhash"] = taskhash | 99 | m["taskhash"] = taskhash |
154 | m["method"] = method | 100 | m["method"] = method |
155 | m["outhash"] = outhash | 101 | m["outhash"] = outhash |
156 | m["unihash"] = unihash | 102 | m["unihash"] = unihash |
157 | return await self.send_message({"report": m}) | 103 | return await self.invoke({"report": m}) |
158 | 104 | ||
159 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 105 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
160 | await self._set_mode(self.MODE_NORMAL) | ||
161 | m = extra.copy() | 106 | m = extra.copy() |
162 | m["taskhash"] = taskhash | 107 | m["taskhash"] = taskhash |
163 | m["method"] = method | 108 | m["method"] = method |
164 | m["unihash"] = unihash | 109 | m["unihash"] = unihash |
165 | return await self.send_message({"report-equiv": m}) | 110 | return await self.invoke({"report-equiv": m}) |
166 | 111 | ||
167 | async def get_taskhash(self, method, taskhash, all_properties=False): | 112 | async def get_taskhash(self, method, taskhash, all_properties=False): |
168 | await self._set_mode(self.MODE_NORMAL) | 113 | return await self.invoke( |
169 | return await self.send_message( | ||
170 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 114 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
171 | ) | 115 | ) |
172 | 116 | ||
173 | async def get_outhash(self, method, outhash, taskhash): | 117 | async def unihash_exists(self, unihash): |
174 | await self._set_mode(self.MODE_NORMAL) | 118 | r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) |
175 | return await self.send_message( | 119 | return r == "true" |
176 | {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method}} | 120 | |
121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | ||
122 | return await self.invoke( | ||
123 | { | ||
124 | "get-outhash": { | ||
125 | "outhash": outhash, | ||
126 | "taskhash": taskhash, | ||
127 | "method": method, | ||
128 | "with_unihash": with_unihash, | ||
129 | } | ||
130 | } | ||
177 | ) | 131 | ) |
178 | 132 | ||
179 | async def get_stats(self): | 133 | async def get_stats(self): |
180 | await self._set_mode(self.MODE_NORMAL) | 134 | return await self.invoke({"get-stats": None}) |
181 | return await self.send_message({"get-stats": None}) | ||
182 | 135 | ||
183 | async def reset_stats(self): | 136 | async def reset_stats(self): |
184 | await self._set_mode(self.MODE_NORMAL) | 137 | return await self.invoke({"reset-stats": None}) |
185 | return await self.send_message({"reset-stats": None}) | ||
186 | 138 | ||
187 | async def backfill_wait(self): | 139 | async def backfill_wait(self): |
188 | await self._set_mode(self.MODE_NORMAL) | 140 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
189 | return (await self.send_message({"backfill-wait": None}))["tasks"] | 141 | |
142 | async def remove(self, where): | ||
143 | return await self.invoke({"remove": {"where": where}}) | ||
144 | |||
145 | async def clean_unused(self, max_age): | ||
146 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) | ||
147 | |||
148 | async def auth(self, username, token): | ||
149 | result = await self.invoke({"auth": {"username": username, "token": token}}) | ||
150 | self.username = username | ||
151 | self.password = token | ||
152 | self.saved_become_user = None | ||
153 | return result | ||
154 | |||
155 | async def refresh_token(self, username=None): | ||
156 | m = {} | ||
157 | if username: | ||
158 | m["username"] = username | ||
159 | result = await self.invoke({"refresh-token": m}) | ||
160 | if ( | ||
161 | self.username | ||
162 | and not self.saved_become_user | ||
163 | and result["username"] == self.username | ||
164 | ): | ||
165 | self.password = result["token"] | ||
166 | return result | ||
167 | |||
168 | async def set_user_perms(self, username, permissions): | ||
169 | return await self.invoke( | ||
170 | {"set-user-perms": {"username": username, "permissions": permissions}} | ||
171 | ) | ||
172 | |||
173 | async def get_user(self, username=None): | ||
174 | m = {} | ||
175 | if username: | ||
176 | m["username"] = username | ||
177 | return await self.invoke({"get-user": m}) | ||
178 | |||
179 | async def get_all_users(self): | ||
180 | return (await self.invoke({"get-all-users": {}}))["users"] | ||
181 | |||
182 | async def new_user(self, username, permissions): | ||
183 | return await self.invoke( | ||
184 | {"new-user": {"username": username, "permissions": permissions}} | ||
185 | ) | ||
186 | |||
187 | async def delete_user(self, username): | ||
188 | return await self.invoke({"delete-user": {"username": username}}) | ||
189 | |||
190 | async def become_user(self, username): | ||
191 | result = await self.invoke({"become-user": {"username": username}}) | ||
192 | if username == self.username: | ||
193 | self.saved_become_user = None | ||
194 | else: | ||
195 | self.saved_become_user = username | ||
196 | return result | ||
197 | |||
198 | async def get_db_usage(self): | ||
199 | return (await self.invoke({"get-db-usage": {}}))["usage"] | ||
200 | |||
201 | async def get_db_query_columns(self): | ||
202 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] | ||
203 | |||
204 | async def gc_status(self): | ||
205 | return await self.invoke({"gc-status": {}}) | ||
206 | |||
207 | async def gc_mark(self, mark, where): | ||
208 | """ | ||
209 | Starts a new garbage collection operation identified by "mark". If | ||
210 | garbage collection is already in progress with "mark", the collection | ||
211 | is continued. | ||
212 | |||
213 | All unihash entries that match the "where" clause are marked to be | ||
214 | kept. In addition, any new entries added to the database after this | ||
215 | command will be automatically marked with "mark" | ||
216 | """ | ||
217 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | ||
190 | 218 | ||
219 | async def gc_sweep(self, mark): | ||
220 | """ | ||
221 | Finishes garbage collection for "mark". All unihash entries that have | ||
222 | not been marked will be deleted. | ||
191 | 223 | ||
192 | class Client(object): | 224 | It is recommended to clean unused outhash entries after running this to |
193 | def __init__(self): | 225 | cleanup any dangling outhashes |
194 | self.client = AsyncClient() | 226 | """ |
195 | self.loop = asyncio.new_event_loop() | 227 | return await self.invoke({"gc-sweep": {"mark": mark}}) |
196 | 228 | ||
197 | for call in ( | 229 | |
230 | class Client(bb.asyncrpc.Client): | ||
231 | def __init__(self, username=None, password=None): | ||
232 | self.username = username | ||
233 | self.password = password | ||
234 | |||
235 | super().__init__() | ||
236 | self._add_methods( | ||
198 | "connect_tcp", | 237 | "connect_tcp", |
199 | "close", | 238 | "connect_websocket", |
200 | "get_unihash", | 239 | "get_unihash", |
201 | "report_unihash", | 240 | "report_unihash", |
202 | "report_unihash_equiv", | 241 | "report_unihash_equiv", |
203 | "get_taskhash", | 242 | "get_taskhash", |
243 | "unihash_exists", | ||
244 | "get_outhash", | ||
204 | "get_stats", | 245 | "get_stats", |
205 | "reset_stats", | 246 | "reset_stats", |
206 | "backfill_wait", | 247 | "backfill_wait", |
207 | ): | 248 | "remove", |
208 | downcall = getattr(self.client, call) | 249 | "clean_unused", |
209 | setattr(self, call, self._get_downcall_wrapper(downcall)) | 250 | "auth", |
210 | 251 | "refresh_token", | |
211 | def _get_downcall_wrapper(self, downcall): | 252 | "set_user_perms", |
212 | def wrapper(*args, **kwargs): | 253 | "get_user", |
213 | return self.loop.run_until_complete(downcall(*args, **kwargs)) | 254 | "get_all_users", |
214 | 255 | "new_user", | |
215 | return wrapper | 256 | "delete_user", |
216 | 257 | "become_user", | |
217 | def connect_unix(self, path): | 258 | "get_db_usage", |
218 | # AF_UNIX has path length issues so chdir here to workaround | 259 | "get_db_query_columns", |
219 | cwd = os.getcwd() | 260 | "gc_status", |
220 | try: | 261 | "gc_mark", |
221 | os.chdir(os.path.dirname(path)) | 262 | "gc_sweep", |
222 | self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path))) | 263 | ) |
223 | self.loop.run_until_complete(self.client.connect()) | 264 | |
224 | finally: | 265 | def _get_async_client(self): |
225 | os.chdir(cwd) | 266 | return AsyncClient(self.username, self.password) |
226 | 267 | ||
227 | @property | 268 | |
228 | def max_chunk(self): | 269 | class ClientPool(bb.asyncrpc.ClientPool): |
229 | return self.client.max_chunk | 270 | def __init__( |
230 | 271 | self, | |
231 | @max_chunk.setter | 272 | address, |
232 | def max_chunk(self, value): | 273 | max_clients, |
233 | self.client.max_chunk = value | 274 | *, |
275 | username=None, | ||
276 | password=None, | ||
277 | become=None, | ||
278 | ): | ||
279 | super().__init__(max_clients) | ||
280 | self.address = address | ||
281 | self.username = username | ||
282 | self.password = password | ||
283 | self.become = become | ||
284 | |||
285 | async def _new_client(self): | ||
286 | client = await create_async_client( | ||
287 | self.address, | ||
288 | username=self.username, | ||
289 | password=self.password, | ||
290 | ) | ||
291 | if self.become: | ||
292 | await client.become_user(self.become) | ||
293 | return client | ||
294 | |||
295 | def _run_key_tasks(self, queries, call): | ||
296 | results = {key: None for key in queries.keys()} | ||
297 | |||
298 | def make_task(key, args): | ||
299 | async def task(client): | ||
300 | nonlocal results | ||
301 | unihash = await call(client, args) | ||
302 | results[key] = unihash | ||
303 | |||
304 | return task | ||
305 | |||
306 | def gen_tasks(): | ||
307 | for key, args in queries.items(): | ||
308 | yield make_task(key, args) | ||
309 | |||
310 | self.run_tasks(gen_tasks()) | ||
311 | return results | ||
312 | |||
313 | def get_unihashes(self, queries): | ||
314 | """ | ||
315 | Query multiple unihashes in parallel. | ||
316 | |||
317 | The queries argument is a dictionary with arbitrary key. The values | ||
318 | must be a tuple of (method, taskhash). | ||
319 | |||
320 | Returns a dictionary with a corresponding key for each input key, and | ||
321 | the value is the queried unihash (which might be none if the query | ||
322 | failed) | ||
323 | """ | ||
324 | |||
325 | async def call(client, args): | ||
326 | method, taskhash = args | ||
327 | return await client.get_unihash(method, taskhash) | ||
328 | |||
329 | return self._run_key_tasks(queries, call) | ||
330 | |||
331 | def unihashes_exist(self, queries): | ||
332 | """ | ||
333 | Query multiple unihash existence checks in parallel. | ||
334 | |||
335 | The queries argument is a dictionary with arbitrary key. The values | ||
336 | must be a unihash. | ||
337 | |||
338 | Returns a dictionary with a corresponding key for each input key, and | ||
339 | the value is True or False if the unihash is known by the server (or | ||
340 | None if there was a failure) | ||
341 | """ | ||
342 | |||
343 | async def call(client, unihash): | ||
344 | return await client.unihash_exists(unihash) | ||
345 | |||
346 | return self._run_key_tasks(queries, call) | ||