summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-04-12 14:28:16 -0600
committerChee Yang Lee <chee.yang.lee@intel.com>2024-04-12 14:42:54 -0700
commit150f45c53cf3cadf5c5171a78bcdce35733c744d (patch)
treec9e7ff05db3977286a2642dc1af1a61635a415fd
parent5950c63d54228f15a973d8d8c843b0dc9987ab7f (diff)
downloadpoky-halstead/hashclient.tar.gz
hashserv: client: Fix mode state errorshalstead/hashclient
Careful reading of the code can contrive cases where poorly timed ConnectionError's will result in the client mode being incorrectly reset to MODE_NORMAL when it should actual be a stream mode for the current command. Fix this by no longer attempting to restore the mode when the connection is setup. Instead, attempt to set the stream mode inside the send wrapper for the stream data, which means that it should always end up in the correct mode before continuing. Also, factor out the transition to normal mode into a invoke() override so it doesn't need to be specified over and over again. Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
-rw-r--r--bitbake/lib/hashserv/client.py39
1 files changed, 10 insertions, 29 deletions
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index b269879ecf..0b254beddd 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -27,9 +27,7 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
27 27
28 async def setup_connection(self): 28 async def setup_connection(self):
29 await super().setup_connection() 29 await super().setup_connection()
30 cur_mode = self.mode
31 self.mode = self.MODE_NORMAL 30 self.mode = self.MODE_NORMAL
32 await self._set_mode(cur_mode)
33 if self.username: 31 if self.username:
34 # Save off become user temporarily because auth() resets it 32 # Save off become user temporarily because auth() resets it
35 become = self.saved_become_user 33 become = self.saved_become_user
@@ -38,13 +36,20 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
38 if become: 36 if become:
39 await self.become_user(become) 37 await self.become_user(become)
40 38
41 async def send_stream(self, msg): 39 async def send_stream(self, mode, msg):
42 async def proc(): 40 async def proc():
41 await self._set_mode(mode)
43 await self.socket.send(msg) 42 await self.socket.send(msg)
44 return await self.socket.recv() 43 return await self.socket.recv()
45 44
46 return await self._send_wrapper(proc) 45 return await self._send_wrapper(proc)
47 46
47 async def invoke(self, *args, **kwargs):
48 # It's OK if connection errors cause a failure here, because the mode
49 # is also reset to normal on a new connection
50 await self._set_mode(self.MODE_NORMAL)
51 return await super().invoke(*args, **kwargs)
52
48 async def _set_mode(self, new_mode): 53 async def _set_mode(self, new_mode):
49 async def stream_to_normal(): 54 async def stream_to_normal():
50 await self.socket.send("END") 55 await self.socket.send("END")
@@ -84,14 +89,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
84 self.mode = new_mode 89 self.mode = new_mode
85 90
86 async def get_unihash(self, method, taskhash): 91 async def get_unihash(self, method, taskhash):
87 await self._set_mode(self.MODE_GET_STREAM) 92 r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash))
88 r = await self.send_stream("%s %s" % (method, taskhash))
89 if not r: 93 if not r:
90 return None 94 return None
91 return r 95 return r
92 96
93 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 97 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
94 await self._set_mode(self.MODE_NORMAL)
95 m = extra.copy() 98 m = extra.copy()
96 m["taskhash"] = taskhash 99 m["taskhash"] = taskhash
97 m["method"] = method 100 m["method"] = method
@@ -100,7 +103,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
100 return await self.invoke({"report": m}) 103 return await self.invoke({"report": m})
101 104
102 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 105 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
103 await self._set_mode(self.MODE_NORMAL)
104 m = extra.copy() 106 m = extra.copy()
105 m["taskhash"] = taskhash 107 m["taskhash"] = taskhash
106 m["method"] = method 108 m["method"] = method
@@ -108,18 +110,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
108 return await self.invoke({"report-equiv": m}) 110 return await self.invoke({"report-equiv": m})
109 111
110 async def get_taskhash(self, method, taskhash, all_properties=False): 112 async def get_taskhash(self, method, taskhash, all_properties=False):
111 await self._set_mode(self.MODE_NORMAL)
112 return await self.invoke( 113 return await self.invoke(
113 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 114 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
114 ) 115 )
115 116
116 async def unihash_exists(self, unihash): 117 async def unihash_exists(self, unihash):
117 await self._set_mode(self.MODE_EXIST_STREAM) 118 r = await self.send_stream(self.MODE_EXIST_STREAM, unihash)
118 r = await self.send_stream(unihash)
119 return r == "true" 119 return r == "true"
120 120
121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 121 async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
122 await self._set_mode(self.MODE_NORMAL)
123 return await self.invoke( 122 return await self.invoke(
124 { 123 {
125 "get-outhash": { 124 "get-outhash": {
@@ -132,27 +131,21 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
132 ) 131 )
133 132
134 async def get_stats(self): 133 async def get_stats(self):
135 await self._set_mode(self.MODE_NORMAL)
136 return await self.invoke({"get-stats": None}) 134 return await self.invoke({"get-stats": None})
137 135
138 async def reset_stats(self): 136 async def reset_stats(self):
139 await self._set_mode(self.MODE_NORMAL)
140 return await self.invoke({"reset-stats": None}) 137 return await self.invoke({"reset-stats": None})
141 138
142 async def backfill_wait(self): 139 async def backfill_wait(self):
143 await self._set_mode(self.MODE_NORMAL)
144 return (await self.invoke({"backfill-wait": None}))["tasks"] 140 return (await self.invoke({"backfill-wait": None}))["tasks"]
145 141
146 async def remove(self, where): 142 async def remove(self, where):
147 await self._set_mode(self.MODE_NORMAL)
148 return await self.invoke({"remove": {"where": where}}) 143 return await self.invoke({"remove": {"where": where}})
149 144
150 async def clean_unused(self, max_age): 145 async def clean_unused(self, max_age):
151 await self._set_mode(self.MODE_NORMAL)
152 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}}) 146 return await self.invoke({"clean-unused": {"max_age_seconds": max_age}})
153 147
154 async def auth(self, username, token): 148 async def auth(self, username, token):
155 await self._set_mode(self.MODE_NORMAL)
156 result = await self.invoke({"auth": {"username": username, "token": token}}) 149 result = await self.invoke({"auth": {"username": username, "token": token}})
157 self.username = username 150 self.username = username
158 self.password = token 151 self.password = token
@@ -160,7 +153,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
160 return result 153 return result
161 154
162 async def refresh_token(self, username=None): 155 async def refresh_token(self, username=None):
163 await self._set_mode(self.MODE_NORMAL)
164 m = {} 156 m = {}
165 if username: 157 if username:
166 m["username"] = username 158 m["username"] = username
@@ -174,34 +166,28 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
174 return result 166 return result
175 167
176 async def set_user_perms(self, username, permissions): 168 async def set_user_perms(self, username, permissions):
177 await self._set_mode(self.MODE_NORMAL)
178 return await self.invoke( 169 return await self.invoke(
179 {"set-user-perms": {"username": username, "permissions": permissions}} 170 {"set-user-perms": {"username": username, "permissions": permissions}}
180 ) 171 )
181 172
182 async def get_user(self, username=None): 173 async def get_user(self, username=None):
183 await self._set_mode(self.MODE_NORMAL)
184 m = {} 174 m = {}
185 if username: 175 if username:
186 m["username"] = username 176 m["username"] = username
187 return await self.invoke({"get-user": m}) 177 return await self.invoke({"get-user": m})
188 178
189 async def get_all_users(self): 179 async def get_all_users(self):
190 await self._set_mode(self.MODE_NORMAL)
191 return (await self.invoke({"get-all-users": {}}))["users"] 180 return (await self.invoke({"get-all-users": {}}))["users"]
192 181
193 async def new_user(self, username, permissions): 182 async def new_user(self, username, permissions):
194 await self._set_mode(self.MODE_NORMAL)
195 return await self.invoke( 183 return await self.invoke(
196 {"new-user": {"username": username, "permissions": permissions}} 184 {"new-user": {"username": username, "permissions": permissions}}
197 ) 185 )
198 186
199 async def delete_user(self, username): 187 async def delete_user(self, username):
200 await self._set_mode(self.MODE_NORMAL)
201 return await self.invoke({"delete-user": {"username": username}}) 188 return await self.invoke({"delete-user": {"username": username}})
202 189
203 async def become_user(self, username): 190 async def become_user(self, username):
204 await self._set_mode(self.MODE_NORMAL)
205 result = await self.invoke({"become-user": {"username": username}}) 191 result = await self.invoke({"become-user": {"username": username}})
206 if username == self.username: 192 if username == self.username:
207 self.saved_become_user = None 193 self.saved_become_user = None
@@ -210,15 +196,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
210 return result 196 return result
211 197
212 async def get_db_usage(self): 198 async def get_db_usage(self):
213 await self._set_mode(self.MODE_NORMAL)
214 return (await self.invoke({"get-db-usage": {}}))["usage"] 199 return (await self.invoke({"get-db-usage": {}}))["usage"]
215 200
216 async def get_db_query_columns(self): 201 async def get_db_query_columns(self):
217 await self._set_mode(self.MODE_NORMAL)
218 return (await self.invoke({"get-db-query-columns": {}}))["columns"] 202 return (await self.invoke({"get-db-query-columns": {}}))["columns"]
219 203
220 async def gc_status(self): 204 async def gc_status(self):
221 await self._set_mode(self.MODE_NORMAL)
222 return await self.invoke({"gc-status": {}}) 205 return await self.invoke({"gc-status": {}})
223 206
224 async def gc_mark(self, mark, where): 207 async def gc_mark(self, mark, where):
@@ -231,7 +214,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
231 kept. In addition, any new entries added to the database after this 214 kept. In addition, any new entries added to the database after this
232 command will be automatically marked with "mark" 215 command will be automatically marked with "mark"
233 """ 216 """
234 await self._set_mode(self.MODE_NORMAL)
235 return await self.invoke({"gc-mark": {"mark": mark, "where": where}}) 217 return await self.invoke({"gc-mark": {"mark": mark, "where": where}})
236 218
237 async def gc_sweep(self, mark): 219 async def gc_sweep(self, mark):
@@ -242,7 +224,6 @@ class AsyncClient(bb.asyncrpc.AsyncClient):
242 It is recommended to clean unused outhash entries after running this to 224 It is recommended to clean unused outhash entries after running this to
243 cleanup any dangling outhashes 225 cleanup any dangling outhashes
244 """ 226 """
245 await self._set_mode(self.MODE_NORMAL)
246 return await self.invoke({"gc-sweep": {"mark": mark}}) 227 return await self.invoke({"gc-sweep": {"mark": mark}})
247 228
248 229