summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-02-18 15:59:49 -0700
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-02-19 11:58:12 +0000
commit2406bd10550997ecd033baad93fcb59b223c5aa8 (patch)
tree839a0f5ec364f3879894c4f3f73ab18a7afdf1fd
parent3bd2c69e70853584beaaa5a4fd62589fa051d911 (diff)
downloadpoky-2406bd10550997ecd033baad93fcb59b223c5aa8.tar.gz
bitbake: asyncrpc: Add Client Pool object
Adds an abstract base class that can be used to implement a pool of client connections. The class implements a thread that runs an async event loop, and allows derived classes to schedule work on the loop and wait for the work to be finished. (Bitbake rev: f113456417f9ac0a4b44b291a6e22ea8219c3a5f) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
-rw-r--r--bitbake/lib/bb/asyncrpc/__init__.py2
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py77
2 files changed, 78 insertions, 1 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py
index a4371643d7..639e1607f8 100644
--- a/bitbake/lib/bb/asyncrpc/__init__.py
+++ b/bitbake/lib/bb/asyncrpc/__init__.py
@@ -5,7 +5,7 @@
5# 5#
6 6
7 7
8from .client import AsyncClient, Client 8from .client import AsyncClient, Client, ClientPool
9from .serv import AsyncServer, AsyncServerConnection 9from .serv import AsyncServer, AsyncServerConnection
10from .connection import DEFAULT_MAX_CHUNK 10from .connection import DEFAULT_MAX_CHUNK
11from .exceptions import ( 11from .exceptions import (
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index 0d7cd85780..a6228bb0ba 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -10,6 +10,8 @@ import json
10import os 10import os
11import socket 11import socket
12import sys 12import sys
13import contextlib
14from threading import Thread
13from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK 15from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK
14from .exceptions import ConnectionClosedError, InvokeError 16from .exceptions import ConnectionClosedError, InvokeError
15 17
@@ -180,3 +182,78 @@ class Client(object):
180 def __exit__(self, exc_type, exc_value, traceback): 182 def __exit__(self, exc_type, exc_value, traceback):
181 self.close() 183 self.close()
182 return False 184 return False
185
186
187class ClientPool(object):
188 def __init__(self, max_clients):
189 self.avail_clients = []
190 self.num_clients = 0
191 self.max_clients = max_clients
192 self.loop = None
193 self.client_condition = None
194
195 @abc.abstractmethod
196 async def _new_client(self):
197 raise NotImplementedError("Must be implemented in derived class")
198
199 def close(self):
200 if self.client_condition:
201 self.client_condition = None
202
203 if self.loop:
204 self.loop.run_until_complete(self.__close_clients())
205 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
206 self.loop.close()
207 self.loop = None
208
209 def run_tasks(self, tasks):
210 if not self.loop:
211 self.loop = asyncio.new_event_loop()
212
213 thread = Thread(target=self.__thread_main, args=(tasks,))
214 thread.start()
215 thread.join()
216
217 @contextlib.asynccontextmanager
218 async def get_client(self):
219 async with self.client_condition:
220 if self.avail_clients:
221 client = self.avail_clients.pop()
222 elif self.num_clients < self.max_clients:
223 self.num_clients += 1
224 client = await self._new_client()
225 else:
226 while not self.avail_clients:
227 await self.client_condition.wait()
228 client = self.avail_clients.pop()
229
230 try:
231 yield client
232 finally:
233 async with self.client_condition:
234 self.avail_clients.append(client)
235 self.client_condition.notify()
236
237 def __thread_main(self, tasks):
238 async def process_task(task):
239 async with self.get_client() as client:
240 await task(client)
241
242 asyncio.set_event_loop(self.loop)
243 if not self.client_condition:
244 self.client_condition = asyncio.Condition()
245 tasks = [process_task(t) for t in tasks]
246 self.loop.run_until_complete(asyncio.gather(*tasks))
247
248 async def __close_clients(self):
249 for c in self.avail_clients:
250 await c.close()
251 self.avail_clients = []
252 self.num_clients = 0
253
254 def __enter__(self):
255 return self
256
257 def __exit__(self, exc_type, exc_value, traceback):
258 self.close()
259 return False