summaryrefslogtreecommitdiffstats
path: root/bitbake/lib/bb/asyncrpc/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'bitbake/lib/bb/asyncrpc/client.py')
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py92
1 files changed, 14 insertions, 78 deletions
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index 65f3f8964d..17b72033b9 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -25,6 +25,9 @@ ADDR_TYPE_TCP = 1
25ADDR_TYPE_WS = 2 25ADDR_TYPE_WS = 2
26 26
27WEBSOCKETS_MIN_VERSION = (9, 1) 27WEBSOCKETS_MIN_VERSION = (9, 1)
28# Need websockets 10 with python 3.10+
29if sys.version_info >= (3, 10, 0):
30 WEBSOCKETS_MIN_VERSION = (10, 0)
28 31
29 32
30def parse_address(addr): 33def parse_address(addr):
@@ -109,7 +112,16 @@ class AsyncClient(object):
109 ) 112 )
110 113
111 async def connect_sock(): 114 async def connect_sock():
112 websocket = await websockets.connect(uri, ping_interval=None) 115 try:
116 websocket = await websockets.connect(
117 uri,
118 ping_interval=None,
119 open_timeout=self.timeout,
120 )
121 except asyncio.exceptions.TimeoutError:
122 raise ConnectionError("Timeout while connecting to websocket")
123 except (OSError, websockets.InvalidHandshake, websockets.InvalidURI) as exc:
124 raise ConnectionError(f"Could not connect to websocket: {exc}") from exc
113 return WebsocketConnection(websocket, self.timeout) 125 return WebsocketConnection(websocket, self.timeout)
114 126
115 self._connect_sock = connect_sock 127 self._connect_sock = connect_sock
@@ -247,85 +259,9 @@ class Client(object):
247 def close(self): 259 def close(self):
248 if self.loop: 260 if self.loop:
249 self.loop.run_until_complete(self.client.close()) 261 self.loop.run_until_complete(self.client.close())
250 if sys.version_info >= (3, 6):
251 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
252 self.loop.close()
253 self.loop = None
254
255 def __enter__(self):
256 return self
257
258 def __exit__(self, exc_type, exc_value, traceback):
259 self.close()
260 return False
261
262
263class ClientPool(object):
264 def __init__(self, max_clients):
265 self.avail_clients = []
266 self.num_clients = 0
267 self.max_clients = max_clients
268 self.loop = None
269 self.client_condition = None
270
271 @abc.abstractmethod
272 async def _new_client(self):
273 raise NotImplementedError("Must be implemented in derived class")
274
275 def close(self):
276 if self.client_condition:
277 self.client_condition = None
278
279 if self.loop:
280 self.loop.run_until_complete(self.__close_clients())
281 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 262 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
282 self.loop.close() 263 self.loop.close()
283 self.loop = None 264 self.loop = None
284
285 def run_tasks(self, tasks):
286 if not self.loop:
287 self.loop = asyncio.new_event_loop()
288
289 thread = Thread(target=self.__thread_main, args=(tasks,))
290 thread.start()
291 thread.join()
292
293 @contextlib.asynccontextmanager
294 async def get_client(self):
295 async with self.client_condition:
296 if self.avail_clients:
297 client = self.avail_clients.pop()
298 elif self.num_clients < self.max_clients:
299 self.num_clients += 1
300 client = await self._new_client()
301 else:
302 while not self.avail_clients:
303 await self.client_condition.wait()
304 client = self.avail_clients.pop()
305
306 try:
307 yield client
308 finally:
309 async with self.client_condition:
310 self.avail_clients.append(client)
311 self.client_condition.notify()
312
313 def __thread_main(self, tasks):
314 async def process_task(task):
315 async with self.get_client() as client:
316 await task(client)
317
318 asyncio.set_event_loop(self.loop)
319 if not self.client_condition:
320 self.client_condition = asyncio.Condition()
321 tasks = [process_task(t) for t in tasks]
322 self.loop.run_until_complete(asyncio.gather(*tasks))
323
324 async def __close_clients(self):
325 for c in self.avail_clients:
326 await c.close()
327 self.avail_clients = []
328 self.num_clients = 0
329 265
330 def __enter__(self): 266 def __enter__(self):
331 return self 267 return self