diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-04-12 14:28:16 -0600 |
---|---|---|
committer | Chee Yang Lee <chee.yang.lee@intel.com> | 2024-04-12 14:42:54 -0700 |
commit | 150f45c53cf3cadf5c5171a78bcdce35733c744d (patch) | |
tree | c9e7ff05db3977286a2642dc1af1a61635a415fd | |
parent | 5950c63d54228f15a973d8d8c843b0dc9987ab7f (diff) | |
download | poky-halstead/hashclient.tar.gz |
hashserv: client: Fix mode state errorshalstead/hashclient
Careful reading of the code can contrive cases where poorly timed
ConnectionError's will result in the client mode being incorrectly reset
to MODE_NORMAL when it should actual be a stream mode for the current
command. Fix this by no longer attempting to restore the mode when the
connection is setup. Instead, attempt to set the stream mode inside the
send wrapper for the stream data, which means that it should always end
up in the correct mode before continuing.
Also, factor out the transition to normal mode into a invoke() override
so it doesn't need to be specified over and over again.
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
-rw-r--r-- | bitbake/lib/hashserv/client.py | 39 |
1 files changed, 10 insertions, 29 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index b269879ecf..0b254beddd 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -27,9 +27,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
27 | 27 | ||
28 | async def setup_connection(self): | 28 | async def setup_connection(self): |
29 | await super().setup_connection() | 29 | await super().setup_connection() |
30 | cur_mode = self.mode | ||
31 | self.mode = self.MODE_NORMAL | 30 | self.mode = self.MODE_NORMAL |
32 | await self._set_mode(cur_mode) | ||
33 | if self.username: | 31 | if self.username: |
34 | # Save off become user temporarily because auth() resets it | 32 | # Save off become user temporarily because auth() resets it |
35 | become = self.saved_become_user | 33 | become = self.saved_become_user |
@@ -38,13 +36,20 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
38 | if become: | 36 | if become: |
39 | await self.become_user(become) | 37 | await self.become_user(become) |
40 | 38 | ||
41 | async def send_stream(self, msg): | 39 | async def send_stream(self, mode, msg): |
42 | async def proc(): | 40 | async def proc(): |
41 | await self._set_mode(mode) | ||
43 | await self.socket.send(msg) | 42 | await self.socket.send(msg) |
44 | return await self.socket.recv() | 43 | return await self.socket.recv() |
45 | 44 | ||
46 | return await self._send_wrapper(proc) | 45 | return await self._send_wrapper(proc) |
47 | 46 | ||
47 | async def invoke(self, *args, **kwargs): | ||
48 | # It's OK if connection errors cause a failure here, because the mode | ||
49 | # is also reset to normal on a new connection | ||
50 | await self._set_mode(self.MODE_NORMAL) | ||
51 | return await super().invoke(*args, **kwargs) | ||
52 | |||
48 | async def _set_mode(self, new_mode): | 53 | async def _set_mode(self, new_mode): |
49 | async def stream_to_normal(): | 54 | async def stream_to_normal(): |
50 | await self.socket.send("END") | 55 | await self.socket.send("END") |
@@ -84,14 +89,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
84 | self.mode = new_mode | 89 | self.mode = new_mode |
85 | 90 | ||
86 | async def get_unihash(self, method, taskhash): | 91 | async def get_unihash(self, method, taskhash): |
87 | await self._set_mode(self.MODE_GET_STREAM) | 92 | r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) |
88 | r = await self.send_stream("%s %s" % (method, taskhash)) | ||
89 | if not r: | 93 | if not r: |
90 | return None | 94 | return None |
91 | return r | 95 | return r |
92 | 96 | ||
93 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): | 97 | async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): |
94 | await self._set_mode(self.MODE_NORMAL) | ||
95 | m = extra.copy() | 98 | m = extra.copy() |
96 | m["taskhash"] = taskhash | 99 | m["taskhash"] = taskhash |
97 | m["method"] = method | 100 | m["method"] = method |
@@ -100,7 +103,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
100 | return await self.invoke({"report": m}) | 103 | return await self.invoke({"report": m}) |
101 | 104 | ||
102 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): | 105 | async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): |
103 | await self._set_mode(self.MODE_NORMAL) | ||
104 | m = extra.copy() | 106 | m = extra.copy() |
105 | m["taskhash"] = taskhash | 107 | m["taskhash"] = taskhash |
106 | m["method"] = method | 108 | m["method"] = method |
@@ -108,18 +110,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
108 | return await self.invoke({"report-equiv": m}) | 110 | return await self.invoke({"report-equiv": m}) |
109 | 111 | ||
110 | async def get_taskhash(self, method, taskhash, all_properties=False): | 112 | async def get_taskhash(self, method, taskhash, all_properties=False): |
111 | await self._set_mode(self.MODE_NORMAL) | ||
112 | return await self.invoke( | 113 | return await self.invoke( |
113 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} | 114 | {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} |
114 | ) | 115 | ) |
115 | 116 | ||
116 | async def unihash_exists(self, unihash): | 117 | async def unihash_exists(self, unihash): |
117 | await self._set_mode(self.MODE_EXIST_STREAM) | 118 | r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) |
118 | r = await self.send_stream(unihash) | ||
119 | return r == "true" | 119 | return r == "true" |
120 | 120 | ||
121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): | 121 | async def get_outhash(self, method, outhash, taskhash, with_unihash=True): |
122 | await self._set_mode(self.MODE_NORMAL) | ||
123 | return await self.invoke( | 122 | return await self.invoke( |
124 | { | 123 | { |
125 | "get-outhash": { | 124 | "get-outhash": { |
@@ -132,27 +131,21 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
132 | ) | 131 | ) |
133 | 132 | ||
134 | async def get_stats(self): | 133 | async def get_stats(self): |
135 | await self._set_mode(self.MODE_NORMAL) | ||
136 | return await self.invoke({"get-stats": None}) | 134 | return await self.invoke({"get-stats": None}) |
137 | 135 | ||
138 | async def reset_stats(self): | 136 | async def reset_stats(self): |
139 | await self._set_mode(self.MODE_NORMAL) | ||
140 | return await self.invoke({"reset-stats": None}) | 137 | return await self.invoke({"reset-stats": None}) |
141 | 138 | ||
142 | async def backfill_wait(self): | 139 | async def backfill_wait(self): |
143 | await self._set_mode(self.MODE_NORMAL) | ||
144 | return (await self.invoke({"backfill-wait": None}))["tasks"] | 140 | return (await self.invoke({"backfill-wait": None}))["tasks"] |
145 | 141 | ||
146 | async def remove(self, where): | 142 | async def remove(self, where): |
147 | await self._set_mode(self.MODE_NORMAL) | ||
148 | return await self.invoke({"remove": {"where": where}}) | 143 | return await self.invoke({"remove": {"where": where}}) |
149 | 144 | ||
150 | async def clean_unused(self, max_age): | 145 | async def clean_unused(self, max_age): |
151 | await self._set_mode(self.MODE_NORMAL) | ||
152 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) | 146 | return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) |
153 | 147 | ||
154 | async def auth(self, username, token): | 148 | async def auth(self, username, token): |
155 | await self._set_mode(self.MODE_NORMAL) | ||
156 | result = await self.invoke({"auth": {"username": username, "token": token}}) | 149 | result = await self.invoke({"auth": {"username": username, "token": token}}) |
157 | self.username = username | 150 | self.username = username |
158 | self.password = token | 151 | self.password = token |
@@ -160,7 +153,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
160 | return result | 153 | return result |
161 | 154 | ||
162 | async def refresh_token(self, username=None): | 155 | async def refresh_token(self, username=None): |
163 | await self._set_mode(self.MODE_NORMAL) | ||
164 | m = {} | 156 | m = {} |
165 | if username: | 157 | if username: |
166 | m["username"] = username | 158 | m["username"] = username |
@@ -174,34 +166,28 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
174 | return result | 166 | return result |
175 | 167 | ||
176 | async def set_user_perms(self, username, permissions): | 168 | async def set_user_perms(self, username, permissions): |
177 | await self._set_mode(self.MODE_NORMAL) | ||
178 | return await self.invoke( | 169 | return await self.invoke( |
179 | {"set-user-perms": {"username": username, "permissions": permissions}} | 170 | {"set-user-perms": {"username": username, "permissions": permissions}} |
180 | ) | 171 | ) |
181 | 172 | ||
182 | async def get_user(self, username=None): | 173 | async def get_user(self, username=None): |
183 | await self._set_mode(self.MODE_NORMAL) | ||
184 | m = {} | 174 | m = {} |
185 | if username: | 175 | if username: |
186 | m["username"] = username | 176 | m["username"] = username |
187 | return await self.invoke({"get-user": m}) | 177 | return await self.invoke({"get-user": m}) |
188 | 178 | ||
189 | async def get_all_users(self): | 179 | async def get_all_users(self): |
190 | await self._set_mode(self.MODE_NORMAL) | ||
191 | return (await self.invoke({"get-all-users": {}}))["users"] | 180 | return (await self.invoke({"get-all-users": {}}))["users"] |
192 | 181 | ||
193 | async def new_user(self, username, permissions): | 182 | async def new_user(self, username, permissions): |
194 | await self._set_mode(self.MODE_NORMAL) | ||
195 | return await self.invoke( | 183 | return await self.invoke( |
196 | {"new-user": {"username": username, "permissions": permissions}} | 184 | {"new-user": {"username": username, "permissions": permissions}} |
197 | ) | 185 | ) |
198 | 186 | ||
199 | async def delete_user(self, username): | 187 | async def delete_user(self, username): |
200 | await self._set_mode(self.MODE_NORMAL) | ||
201 | return await self.invoke({"delete-user": {"username": username}}) | 188 | return await self.invoke({"delete-user": {"username": username}}) |
202 | 189 | ||
203 | async def become_user(self, username): | 190 | async def become_user(self, username): |
204 | await self._set_mode(self.MODE_NORMAL) | ||
205 | result = await self.invoke({"become-user": {"username": username}}) | 191 | result = await self.invoke({"become-user": {"username": username}}) |
206 | if username == self.username: | 192 | if username == self.username: |
207 | self.saved_become_user = None | 193 | self.saved_become_user = None |
@@ -210,15 +196,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
210 | return result | 196 | return result |
211 | 197 | ||
212 | async def get_db_usage(self): | 198 | async def get_db_usage(self): |
213 | await self._set_mode(self.MODE_NORMAL) | ||
214 | return (await self.invoke({"get-db-usage": {}}))["usage"] | 199 | return (await self.invoke({"get-db-usage": {}}))["usage"] |
215 | 200 | ||
216 | async def get_db_query_columns(self): | 201 | async def get_db_query_columns(self): |
217 | await self._set_mode(self.MODE_NORMAL) | ||
218 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] | 202 | return (await self.invoke({"get-db-query-columns": {}}))["columns"] |
219 | 203 | ||
220 | async def gc_status(self): | 204 | async def gc_status(self): |
221 | await self._set_mode(self.MODE_NORMAL) | ||
222 | return await self.invoke({"gc-status": {}}) | 205 | return await self.invoke({"gc-status": {}}) |
223 | 206 | ||
224 | async def gc_mark(self, mark, where): | 207 | async def gc_mark(self, mark, where): |
@@ -231,7 +214,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
231 | kept. In addition, any new entries added to the database after this | 214 | kept. In addition, any new entries added to the database after this |
232 | command will be automatically marked with "mark" | 215 | command will be automatically marked with "mark" |
233 | """ | 216 | """ |
234 | await self._set_mode(self.MODE_NORMAL) | ||
235 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) | 217 | return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) |
236 | 218 | ||
237 | async def gc_sweep(self, mark): | 219 | async def gc_sweep(self, mark): |
@@ -242,7 +224,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient): | |||
242 | It is recommended to clean unused outhash entries after running this to | 224 | It is recommended to clean unused outhash entries after running this to |
243 | cleanup any dangling outhashes | 225 | cleanup any dangling outhashes |
244 | """ | 226 | """ |
245 | await self._set_mode(self.MODE_NORMAL) | ||
246 | return await self.invoke({"gc-sweep": {"mark": mark}}) | 227 | return await self.invoke({"gc-sweep": {"mark": mark}}) |
247 | 228 | ||
248 | 229 | ||