diff options
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 92 |
1 files changed, 14 insertions, 78 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index 65f3f8964d..17b72033b9 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -25,6 +25,9 @@ ADDR_TYPE_TCP = 1 | |||
25 | ADDR_TYPE_WS = 2 | 25 | ADDR_TYPE_WS = 2 |
26 | 26 | ||
27 | WEBSOCKETS_MIN_VERSION = (9, 1) | 27 | WEBSOCKETS_MIN_VERSION = (9, 1) |
28 | # Need websockets 10 with python 3.10+ | ||
29 | if sys.version_info >= (3, 10, 0): | ||
30 | WEBSOCKETS_MIN_VERSION = (10, 0) | ||
28 | 31 | ||
29 | 32 | ||
30 | def parse_address(addr): | 33 | def parse_address(addr): |
@@ -109,7 +112,16 @@ class AsyncClient(object): | |||
109 | ) | 112 | ) |
110 | 113 | ||
111 | async def connect_sock(): | 114 | async def connect_sock(): |
112 | websocket = await websockets.connect(uri, ping_interval=None) | 115 | try: |
116 | websocket = await websockets.connect( | ||
117 | uri, | ||
118 | ping_interval=None, | ||
119 | open_timeout=self.timeout, | ||
120 | ) | ||
121 | except asyncio.exceptions.TimeoutError: | ||
122 | raise ConnectionError("Timeout while connecting to websocket") | ||
123 | except (OSError, websockets.InvalidHandshake, websockets.InvalidURI) as exc: | ||
124 | raise ConnectionError(f"Could not connect to websocket: {exc}") from exc | ||
113 | return WebsocketConnection(websocket, self.timeout) | 125 | return WebsocketConnection(websocket, self.timeout) |
114 | 126 | ||
115 | self._connect_sock = connect_sock | 127 | self._connect_sock = connect_sock |
@@ -247,85 +259,9 @@ class Client(object): | |||
247 | def close(self): | 259 | def close(self): |
248 | if self.loop: | 260 | if self.loop: |
249 | self.loop.run_until_complete(self.client.close()) | 261 | self.loop.run_until_complete(self.client.close()) |
250 | if sys.version_info >= (3, 6): | ||
251 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
252 | self.loop.close() | ||
253 | self.loop = None | ||
254 | |||
255 | def __enter__(self): | ||
256 | return self | ||
257 | |||
258 | def __exit__(self, exc_type, exc_value, traceback): | ||
259 | self.close() | ||
260 | return False | ||
261 | |||
262 | |||
263 | class ClientPool(object): | ||
264 | def __init__(self, max_clients): | ||
265 | self.avail_clients = [] | ||
266 | self.num_clients = 0 | ||
267 | self.max_clients = max_clients | ||
268 | self.loop = None | ||
269 | self.client_condition = None | ||
270 | |||
271 | @abc.abstractmethod | ||
272 | async def _new_client(self): | ||
273 | raise NotImplementedError("Must be implemented in derived class") | ||
274 | |||
275 | def close(self): | ||
276 | if self.client_condition: | ||
277 | self.client_condition = None | ||
278 | |||
279 | if self.loop: | ||
280 | self.loop.run_until_complete(self.__close_clients()) | ||
281 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | 262 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) |
282 | self.loop.close() | 263 | self.loop.close() |
283 | self.loop = None | 264 | self.loop = None |
284 | |||
285 | def run_tasks(self, tasks): | ||
286 | if not self.loop: | ||
287 | self.loop = asyncio.new_event_loop() | ||
288 | |||
289 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
290 | thread.start() | ||
291 | thread.join() | ||
292 | |||
293 | @contextlib.asynccontextmanager | ||
294 | async def get_client(self): | ||
295 | async with self.client_condition: | ||
296 | if self.avail_clients: | ||
297 | client = self.avail_clients.pop() | ||
298 | elif self.num_clients < self.max_clients: | ||
299 | self.num_clients += 1 | ||
300 | client = await self._new_client() | ||
301 | else: | ||
302 | while not self.avail_clients: | ||
303 | await self.client_condition.wait() | ||
304 | client = self.avail_clients.pop() | ||
305 | |||
306 | try: | ||
307 | yield client | ||
308 | finally: | ||
309 | async with self.client_condition: | ||
310 | self.avail_clients.append(client) | ||
311 | self.client_condition.notify() | ||
312 | |||
313 | def __thread_main(self, tasks): | ||
314 | async def process_task(task): | ||
315 | async with self.get_client() as client: | ||
316 | await task(client) | ||
317 | |||
318 | asyncio.set_event_loop(self.loop) | ||
319 | if not self.client_condition: | ||
320 | self.client_condition = asyncio.Condition() | ||
321 | tasks = [process_task(t) for t in tasks] | ||
322 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
323 | |||
324 | async def __close_clients(self): | ||
325 | for c in self.avail_clients: | ||
326 | await c.close() | ||
327 | self.avail_clients = [] | ||
328 | self.num_clients = 0 | ||
329 | 265 | ||
330 | def __enter__(self): | 266 | def __enter__(self): |
331 | return self | 267 | return self |