diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-02-18 15:59:49 -0700 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-02-19 11:58:12 +0000 |
commit | 2406bd10550997ecd033baad93fcb59b223c5aa8 (patch) | |
tree | 839a0f5ec364f3879894c4f3f73ab18a7afdf1fd | |
parent | 3bd2c69e70853584beaaa5a4fd62589fa051d911 (diff) | |
download | poky-2406bd10550997ecd033baad93fcb59b223c5aa8.tar.gz |
bitbake: asyncrpc: Add Client Pool object
Adds an abstract base class that can be used to implement a pool of
client connections. The class implements a thread that runs an async
event loop, and allows derived classes to schedule work on the loop and
wait for the work to be finished.
(Bitbake rev: f113456417f9ac0a4b44b291a6e22ea8219c3a5f)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 2 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 77 |
2 files changed, 78 insertions, 1 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index a4371643d7..639e1607f8 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
@@ -5,7 +5,7 @@ | |||
5 | # | 5 | # |
6 | 6 | ||
7 | 7 | ||
8 | from .client import AsyncClient, Client | 8 | from .client import AsyncClient, Client, ClientPool |
9 | from .serv import AsyncServer, AsyncServerConnection | 9 | from .serv import AsyncServer, AsyncServerConnection |
10 | from .connection import DEFAULT_MAX_CHUNK | 10 | from .connection import DEFAULT_MAX_CHUNK |
11 | from .exceptions import ( | 11 | from .exceptions import ( |
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 0d7cd85780..a6228bb0ba 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -10,6 +10,8 @@ import json | |||
10 | import os | 10 | import os |
11 | import socket | 11 | import socket |
12 | import sys | 12 | import sys |
13 | import contextlib | ||
14 | from threading import Thread | ||
13 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK | 15 | from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK |
14 | from .exceptions import ConnectionClosedError, InvokeError | 16 | from .exceptions import ConnectionClosedError, InvokeError |
15 | 17 | ||
@@ -180,3 +182,78 @@ class Client(object): | |||
180 | def __exit__(self, exc_type, exc_value, traceback): | 182 | def __exit__(self, exc_type, exc_value, traceback): |
181 | self.close() | 183 | self.close() |
182 | return False | 184 | return False |
185 | |||
186 | |||
187 | class ClientPool(object): | ||
188 | def __init__(self, max_clients): | ||
189 | self.avail_clients = [] | ||
190 | self.num_clients = 0 | ||
191 | self.max_clients = max_clients | ||
192 | self.loop = None | ||
193 | self.client_condition = None | ||
194 | |||
195 | @abc.abstractmethod | ||
196 | async def _new_client(self): | ||
197 | raise NotImplementedError("Must be implemented in derived class") | ||
198 | |||
199 | def close(self): | ||
200 | if self.client_condition: | ||
201 | self.client_condition = None | ||
202 | |||
203 | if self.loop: | ||
204 | self.loop.run_until_complete(self.__close_clients()) | ||
205 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
206 | self.loop.close() | ||
207 | self.loop = None | ||
208 | |||
209 | def run_tasks(self, tasks): | ||
210 | if not self.loop: | ||
211 | self.loop = asyncio.new_event_loop() | ||
212 | |||
213 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
214 | thread.start() | ||
215 | thread.join() | ||
216 | |||
217 | @contextlib.asynccontextmanager | ||
218 | async def get_client(self): | ||
219 | async with self.client_condition: | ||
220 | if self.avail_clients: | ||
221 | client = self.avail_clients.pop() | ||
222 | elif self.num_clients < self.max_clients: | ||
223 | self.num_clients += 1 | ||
224 | client = await self._new_client() | ||
225 | else: | ||
226 | while not self.avail_clients: | ||
227 | await self.client_condition.wait() | ||
228 | client = self.avail_clients.pop() | ||
229 | |||
230 | try: | ||
231 | yield client | ||
232 | finally: | ||
233 | async with self.client_condition: | ||
234 | self.avail_clients.append(client) | ||
235 | self.client_condition.notify() | ||
236 | |||
237 | def __thread_main(self, tasks): | ||
238 | async def process_task(task): | ||
239 | async with self.get_client() as client: | ||
240 | await task(client) | ||
241 | |||
242 | asyncio.set_event_loop(self.loop) | ||
243 | if not self.client_condition: | ||
244 | self.client_condition = asyncio.Condition() | ||
245 | tasks = [process_task(t) for t in tasks] | ||
246 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
247 | |||
248 | async def __close_clients(self): | ||
249 | for c in self.avail_clients: | ||
250 | await c.close() | ||
251 | self.avail_clients = [] | ||
252 | self.num_clients = 0 | ||
253 | |||
254 | def __enter__(self): | ||
255 | return self | ||
256 | |||
257 | def __exit__(self, exc_type, exc_value, traceback): | ||
258 | self.close() | ||
259 | return False | ||