diff options
| author | Joshua Watt <JPEWhacker@gmail.com> | 2024-05-30 09:41:26 -0600 |
|---|---|---|
| committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-05-31 16:56:25 +0100 |
| commit | 247d08ae0765fdd73f80e7608f76e36983e2109d (patch) | |
| tree | 69c1359556e1235276f10052cf3959c4748fc536 /bitbake/lib/bb/asyncrpc | |
| parent | f618d1dfd7dd414cb458467d0e35b135d6e7cd32 (diff) | |
| download | poky-247d08ae0765fdd73f80e7608f76e36983e2109d.tar.gz | |
bitbake: asyncrpc: Remove ClientPool
Batching support on the client side has proven to be a much more
effective way of dealing with server latency than multiple client
connections and is also much nicer on the server, so drop the client
pool support from asyncrpc and the hash server
(Bitbake rev: 6f80560f1c7010d09fe5448fdde616aef8468102)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake/lib/bb/asyncrpc')
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 2 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 76 |
2 files changed, 2 insertions, 76 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index 639e1607f8..a4371643d7 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
| @@ -5,7 +5,7 @@ | |||
| 5 | # | 5 | # |
| 6 | 6 | ||
| 7 | 7 | ||
| 8 | from .client import AsyncClient, Client, ClientPool | 8 | from .client import AsyncClient, Client |
| 9 | from .serv import AsyncServer, AsyncServerConnection | 9 | from .serv import AsyncServer, AsyncServerConnection |
| 10 | from .connection import DEFAULT_MAX_CHUNK | 10 | from .connection import DEFAULT_MAX_CHUNK |
| 11 | from .exceptions import ( | 11 | from .exceptions import ( |
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index f81ad92f48..11179b0fcb 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
| @@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1) | |||
| 29 | if sys.version_info >= (3, 10, 0): | 29 | if sys.version_info >= (3, 10, 0): |
| 30 | WEBSOCKETS_MIN_VERSION = (10, 0) | 30 | WEBSOCKETS_MIN_VERSION = (10, 0) |
| 31 | 31 | ||
| 32 | |||
| 32 | def parse_address(addr): | 33 | def parse_address(addr): |
| 33 | if addr.startswith(UNIX_PREFIX): | 34 | if addr.startswith(UNIX_PREFIX): |
| 34 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) | 35 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) |
| @@ -259,78 +260,3 @@ class Client(object): | |||
| 259 | def __exit__(self, exc_type, exc_value, traceback): | 260 | def __exit__(self, exc_type, exc_value, traceback): |
| 260 | self.close() | 261 | self.close() |
| 261 | return False | 262 | return False |
| 262 | |||
| 263 | |||
| 264 | class ClientPool(object): | ||
| 265 | def __init__(self, max_clients): | ||
| 266 | self.avail_clients = [] | ||
| 267 | self.num_clients = 0 | ||
| 268 | self.max_clients = max_clients | ||
| 269 | self.loop = None | ||
| 270 | self.client_condition = None | ||
| 271 | |||
| 272 | @abc.abstractmethod | ||
| 273 | async def _new_client(self): | ||
| 274 | raise NotImplementedError("Must be implemented in derived class") | ||
| 275 | |||
| 276 | def close(self): | ||
| 277 | if self.client_condition: | ||
| 278 | self.client_condition = None | ||
| 279 | |||
| 280 | if self.loop: | ||
| 281 | self.loop.run_until_complete(self.__close_clients()) | ||
| 282 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
| 283 | self.loop.close() | ||
| 284 | self.loop = None | ||
| 285 | |||
| 286 | def run_tasks(self, tasks): | ||
| 287 | if not self.loop: | ||
| 288 | self.loop = asyncio.new_event_loop() | ||
| 289 | |||
| 290 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
| 291 | thread.start() | ||
| 292 | thread.join() | ||
| 293 | |||
| 294 | @contextlib.asynccontextmanager | ||
| 295 | async def get_client(self): | ||
| 296 | async with self.client_condition: | ||
| 297 | if self.avail_clients: | ||
| 298 | client = self.avail_clients.pop() | ||
| 299 | elif self.num_clients < self.max_clients: | ||
| 300 | self.num_clients += 1 | ||
| 301 | client = await self._new_client() | ||
| 302 | else: | ||
| 303 | while not self.avail_clients: | ||
| 304 | await self.client_condition.wait() | ||
| 305 | client = self.avail_clients.pop() | ||
| 306 | |||
| 307 | try: | ||
| 308 | yield client | ||
| 309 | finally: | ||
| 310 | async with self.client_condition: | ||
| 311 | self.avail_clients.append(client) | ||
| 312 | self.client_condition.notify() | ||
| 313 | |||
| 314 | def __thread_main(self, tasks): | ||
| 315 | async def process_task(task): | ||
| 316 | async with self.get_client() as client: | ||
| 317 | await task(client) | ||
| 318 | |||
| 319 | asyncio.set_event_loop(self.loop) | ||
| 320 | if not self.client_condition: | ||
| 321 | self.client_condition = asyncio.Condition() | ||
| 322 | tasks = [process_task(t) for t in tasks] | ||
| 323 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
| 324 | |||
| 325 | async def __close_clients(self): | ||
| 326 | for c in self.avail_clients: | ||
| 327 | await c.close() | ||
| 328 | self.avail_clients = [] | ||
| 329 | self.num_clients = 0 | ||
| 330 | |||
| 331 | def __enter__(self): | ||
| 332 | return self | ||
| 333 | |||
| 334 | def __exit__(self, exc_type, exc_value, traceback): | ||
| 335 | self.close() | ||
| 336 | return False | ||
