summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/asyncrpc/client.py
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-05-30 09:41:26 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-05-31 16:56:25 +0100
commit247d08ae0765fdd73f80e7608f76e36983e2109d (patch)
tree69c1359556e1235276f10052cf3959c4748fc536 /bitbake/lib/bb/asyncrpc/client.py
parentf618d1dfd7dd414cb458467d0e35b135d6e7cd32 (diff)
downloadpoky-247d08ae0765fdd73f80e7608f76e36983e2109d.tar.gz
bitbake: asyncrpc: Remove ClientPool
Batching support on the client side has proven to be a much more effective way of dealing with server latency than multiple client connections and is also much nicer on the server, so drop the client pool support from asyncrpc and the hash server (Bitbake rev: 6f80560f1c7010d09fe5448fdde616aef8468102) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py76
1 files changed, 1 insertions, 75 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index f81ad92f48..11179b0fcb 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1)
29if sys.version_info >= (3, 10, 0): 29if sys.version_info >= (3, 10, 0):
30 WEBSOCKETS_MIN_VERSION = (10, 0) 30 WEBSOCKETS_MIN_VERSION = (10, 0)
31 31
32
32def parse_address(addr): 33def parse_address(addr):
33 if addr.startswith(UNIX_PREFIX): 34 if addr.startswith(UNIX_PREFIX):
34 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) 35 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],))
@@ -259,78 +260,3 @@ class Client(object):
259 def __exit__(self, exc_type, exc_value, traceback): 260 def __exit__(self, exc_type, exc_value, traceback):
260 self.close() 261 self.close()
261 return False 262 return False
262
263
264class ClientPool(object):
265 def __init__(self, max_clients):
266 self.avail_clients = []
267 self.num_clients = 0
268 self.max_clients = max_clients
269 self.loop = None
270 self.client_condition = None
271
272 @abc.abstractmethod
273 async def _new_client(self):
274 raise NotImplementedError("Must be implemented in derived class")
275
276 def close(self):
277 if self.client_condition:
278 self.client_condition = None
279
280 if self.loop:
281 self.loop.run_until_complete(self.__close_clients())
282 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
283 self.loop.close()
284 self.loop = None
285
286 def run_tasks(self, tasks):
287 if not self.loop:
288 self.loop = asyncio.new_event_loop()
289
290 thread = Thread(target=self.__thread_main, args=(tasks,))
291 thread.start()
292 thread.join()
293
294 @contextlib.asynccontextmanager
295 async def get_client(self):
296 async with self.client_condition:
297 if self.avail_clients:
298 client = self.avail_clients.pop()
299 elif self.num_clients < self.max_clients:
300 self.num_clients += 1
301 client = await self._new_client()
302 else:
303 while not self.avail_clients:
304 await self.client_condition.wait()
305 client = self.avail_clients.pop()
306
307 try:
308 yield client
309 finally:
310 async with self.client_condition:
311 self.avail_clients.append(client)
312 self.client_condition.notify()
313
314 def __thread_main(self, tasks):
315 async def process_task(task):
316 async with self.get_client() as client:
317 await task(client)
318
319 asyncio.set_event_loop(self.loop)
320 if not self.client_condition:
321 self.client_condition = asyncio.Condition()
322 tasks = [process_task(t) for t in tasks]
323 self.loop.run_until_complete(asyncio.gather(*tasks))
324
325 async def __close_clients(self):
326 for c in self.avail_clients:
327 await c.close()
328 self.avail_clients = []
329 self.num_clients = 0
330
331 def __enter__(self):
332 return self
333
334 def __exit__(self, exc_type, exc_value, traceback):
335 self.close()
336 return False