diff options
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 2 | ||||
| -rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 77 |
2 files changed, 78 insertions, 1 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index a4371643d7..639e1607f8 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
| @@ -5,7 +5,7 @@ | |||
| 5 | # | 5 | # |
| 6 | 6 | ||
| 7 | 7 | ||
| 8 | from .client import AsyncClient, Client | 8 | from .client import AsyncClient, Client, ClientPool |
| 9 | from .serv import AsyncServer, AsyncServerConnection | 9 | from .serv import AsyncServer, AsyncServerConnection |
| 10 | from .connection import DEFAULT_MAX_CHUNK | 10 | from .connection import DEFAULT_MAX_CHUNK |
| 11 | from .exceptions import ( | 11 | from .exceptions import ( |
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 0d7cd85780..a6228bb0ba 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
| @@ -10,6 +10,8 @@ import json | |||
| 10 | import os | 10 | import os |
| 11 | import socket | 11 | import socket |
| 12 | import sys | 12 | import sys |
| 13 | import contextlib | ||
| 14 | from threading import Thread | ||
| 13 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK | 15 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK |
| 14 | from .exceptions import ConnectionClosedError, InvokeError | 16 | from .exceptions import ConnectionClosedError, InvokeError |
| 15 | 17 | ||
| @@ -180,3 +182,78 @@ class Client(object): | |||
| 180 | def __exit__(self, exc_type, exc_value, traceback): | 182 | def __exit__(self, exc_type, exc_value, traceback): |
| 181 | self.close() | 183 | self.close() |
| 182 | return False | 184 | return False |
| 185 | |||
| 186 | |||
| 187 | class ClientPool(object): | ||
| 188 | def __init__(self, max_clients): | ||
| 189 | self.avail_clients = [] | ||
| 190 | self.num_clients = 0 | ||
| 191 | self.max_clients = max_clients | ||
| 192 | self.loop = None | ||
| 193 | self.client_condition = None | ||
| 194 | |||
| 195 | @abc.abstractmethod | ||
| 196 | async def _new_client(self): | ||
| 197 | raise NotImplementedError("Must be implemented in derived class") | ||
| 198 | |||
| 199 | def close(self): | ||
| 200 | if self.client_condition: | ||
| 201 | self.client_condition = None | ||
| 202 | |||
| 203 | if self.loop: | ||
| 204 | self.loop.run_until_complete(self.__close_clients()) | ||
| 205 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
| 206 | self.loop.close() | ||
| 207 | self.loop = None | ||
| 208 | |||
| 209 | def run_tasks(self, tasks): | ||
| 210 | if not self.loop: | ||
| 211 | self.loop = asyncio.new_event_loop() | ||
| 212 | |||
| 213 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
| 214 | thread.start() | ||
| 215 | thread.join() | ||
| 216 | |||
| 217 | @contextlib.asynccontextmanager | ||
| 218 | async def get_client(self): | ||
| 219 | async with self.client_condition: | ||
| 220 | if self.avail_clients: | ||
| 221 | client = self.avail_clients.pop() | ||
| 222 | elif self.num_clients < self.max_clients: | ||
| 223 | self.num_clients += 1 | ||
| 224 | client = await self._new_client() | ||
| 225 | else: | ||
| 226 | while not self.avail_clients: | ||
| 227 | await self.client_condition.wait() | ||
| 228 | client = self.avail_clients.pop() | ||
| 229 | |||
| 230 | try: | ||
| 231 | yield client | ||
| 232 | finally: | ||
| 233 | async with self.client_condition: | ||
| 234 | self.avail_clients.append(client) | ||
| 235 | self.client_condition.notify() | ||
| 236 | |||
| 237 | def __thread_main(self, tasks): | ||
| 238 | async def process_task(task): | ||
| 239 | async with self.get_client() as client: | ||
| 240 | await task(client) | ||
| 241 | |||
| 242 | asyncio.set_event_loop(self.loop) | ||
| 243 | if not self.client_condition: | ||
| 244 | self.client_condition = asyncio.Condition() | ||
| 245 | tasks = [process_task(t) for t in tasks] | ||
| 246 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
| 247 | |||
| 248 | async def __close_clients(self): | ||
| 249 | for c in self.avail_clients: | ||
| 250 | await c.close() | ||
| 251 | self.avail_clients = [] | ||
| 252 | self.num_clients = 0 | ||
| 253 | |||
| 254 | def __enter__(self): | ||
| 255 | return self | ||
| 256 | |||
| 257 | def __exit__(self, exc_type, exc_value, traceback): | ||
| 258 | self.close() | ||
| 259 | return False | ||
