summaryrefslogtreecommitdiffstats
path: root/bitbake
diff options
context:
space:
mode:
authorJoshua Watt <JPEWhacker@gmail.com>2024-05-30 09:41:26 -0600
committerRichard Purdie <richard.purdie@linuxfoundation.org>2024-05-31 16:56:25 +0100
commit247d08ae0765fdd73f80e7608f76e36983e2109d (patch)
tree69c1359556e1235276f10052cf3959c4748fc536 /bitbake
parentf618d1dfd7dd414cb458467d0e35b135d6e7cd32 (diff)
downloadpoky-247d08ae0765fdd73f80e7608f76e36983e2109d.tar.gz
bitbake: asyncrpc: Remove ClientPool
Batching support on the client side has proven to be a much more effective way of dealing with server latency than multiple client connections and is also much nicer on the server, so drop the client pool support from asyncrpc and the hash server (Bitbake rev: 6f80560f1c7010d09fe5448fdde616aef8468102) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r--bitbake/lib/bb/asyncrpc/__init__.py2
-rw-r--r--bitbake/lib/bb/asyncrpc/client.py76
-rw-r--r--bitbake/lib/hashserv/client.py80
-rw-r--r--bitbake/lib/hashserv/tests.py82
4 files changed, 2 insertions, 238 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py
index 639e1607f8..a4371643d7 100644
--- a/bitbake/lib/bb/asyncrpc/__init__.py
+++ b/bitbake/lib/bb/asyncrpc/__init__.py
@@ -5,7 +5,7 @@
5# 5#
6 6
7 7
8from .client import AsyncClient, Client, ClientPool 8from .client import AsyncClient, Client
9from .serv import AsyncServer, AsyncServerConnection 9from .serv import AsyncServer, AsyncServerConnection
10from .connection import DEFAULT_MAX_CHUNK 10from .connection import DEFAULT_MAX_CHUNK
11from .exceptions import ( 11from .exceptions import (
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py
index f81ad92f48..11179b0fcb 100644
--- a/bitbake/lib/bb/asyncrpc/client.py
+++ b/bitbake/lib/bb/asyncrpc/client.py
@@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1)
29if sys.version_info >= (3, 10, 0): 29if sys.version_info >= (3, 10, 0):
30 WEBSOCKETS_MIN_VERSION = (10, 0) 30 WEBSOCKETS_MIN_VERSION = (10, 0)
31 31
32
32def parse_address(addr): 33def parse_address(addr):
33 if addr.startswith(UNIX_PREFIX): 34 if addr.startswith(UNIX_PREFIX):
34 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) 35 return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],))
@@ -259,78 +260,3 @@ class Client(object):
259 def __exit__(self, exc_type, exc_value, traceback): 260 def __exit__(self, exc_type, exc_value, traceback):
260 self.close() 261 self.close()
261 return False 262 return False
262
263
264class ClientPool(object):
265 def __init__(self, max_clients):
266 self.avail_clients = []
267 self.num_clients = 0
268 self.max_clients = max_clients
269 self.loop = None
270 self.client_condition = None
271
272 @abc.abstractmethod
273 async def _new_client(self):
274 raise NotImplementedError("Must be implemented in derived class")
275
276 def close(self):
277 if self.client_condition:
278 self.client_condition = None
279
280 if self.loop:
281 self.loop.run_until_complete(self.__close_clients())
282 self.loop.run_until_complete(self.loop.shutdown_asyncgens())
283 self.loop.close()
284 self.loop = None
285
286 def run_tasks(self, tasks):
287 if not self.loop:
288 self.loop = asyncio.new_event_loop()
289
290 thread = Thread(target=self.__thread_main, args=(tasks,))
291 thread.start()
292 thread.join()
293
294 @contextlib.asynccontextmanager
295 async def get_client(self):
296 async with self.client_condition:
297 if self.avail_clients:
298 client = self.avail_clients.pop()
299 elif self.num_clients < self.max_clients:
300 self.num_clients += 1
301 client = await self._new_client()
302 else:
303 while not self.avail_clients:
304 await self.client_condition.wait()
305 client = self.avail_clients.pop()
306
307 try:
308 yield client
309 finally:
310 async with self.client_condition:
311 self.avail_clients.append(client)
312 self.client_condition.notify()
313
314 def __thread_main(self, tasks):
315 async def process_task(task):
316 async with self.get_client() as client:
317 await task(client)
318
319 asyncio.set_event_loop(self.loop)
320 if not self.client_condition:
321 self.client_condition = asyncio.Condition()
322 tasks = [process_task(t) for t in tasks]
323 self.loop.run_until_complete(asyncio.gather(*tasks))
324
325 async def __close_clients(self):
326 for c in self.avail_clients:
327 await c.close()
328 self.avail_clients = []
329 self.num_clients = 0
330
331 def __enter__(self):
332 return self
333
334 def __exit__(self, exc_type, exc_value, traceback):
335 self.close()
336 return False
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py
index 775faf935a..d415617b20 100644
--- a/bitbake/lib/hashserv/client.py
+++ b/bitbake/lib/hashserv/client.py
@@ -352,83 +352,3 @@ class Client(bb.asyncrpc.Client):
352 352
353 def _get_async_client(self): 353 def _get_async_client(self):
354 return AsyncClient(self.username, self.password) 354 return AsyncClient(self.username, self.password)
355
356
357class ClientPool(bb.asyncrpc.ClientPool):
358 def __init__(
359 self,
360 address,
361 max_clients,
362 *,
363 username=None,
364 password=None,
365 become=None,
366 ):
367 super().__init__(max_clients)
368 self.address = address
369 self.username = username
370 self.password = password
371 self.become = become
372
373 async def _new_client(self):
374 client = await create_async_client(
375 self.address,
376 username=self.username,
377 password=self.password,
378 )
379 if self.become:
380 await client.become_user(self.become)
381 return client
382
383 def _run_key_tasks(self, queries, call):
384 results = {key: None for key in queries.keys()}
385
386 def make_task(key, args):
387 async def task(client):
388 nonlocal results
389 unihash = await call(client, args)
390 results[key] = unihash
391
392 return task
393
394 def gen_tasks():
395 for key, args in queries.items():
396 yield make_task(key, args)
397
398 self.run_tasks(gen_tasks())
399 return results
400
401 def get_unihashes(self, queries):
402 """
403 Query multiple unihashes in parallel.
404
405 The queries argument is a dictionary with arbitrary key. The values
406 must be a tuple of (method, taskhash).
407
408 Returns a dictionary with a corresponding key for each input key, and
409 the value is the queried unihash (which might be none if the query
410 failed)
411 """
412
413 async def call(client, args):
414 method, taskhash = args
415 return await client.get_unihash(method, taskhash)
416
417 return self._run_key_tasks(queries, call)
418
419 def unihashes_exist(self, queries):
420 """
421 Query multiple unihash existence checks in parallel.
422
423 The queries argument is a dictionary with arbitrary key. The values
424 must be a unihash.
425
426 Returns a dictionary with a corresponding key for each input key, and
427 the value is True or False if the unihash is known by the server (or
428 None if there was a failure)
429 """
430
431 async def call(client, unihash):
432 return await client.unihash_exists(unihash)
433
434 return self._run_key_tasks(queries, call)
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py
index cf74d9de7e..13ccb20ebf 100644
--- a/bitbake/lib/hashserv/tests.py
+++ b/bitbake/lib/hashserv/tests.py
@@ -8,7 +8,6 @@
8from . import create_server, create_client 8from . import create_server, create_client
9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS 9from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS
10from bb.asyncrpc import InvokeError 10from bb.asyncrpc import InvokeError
11from .client import ClientPool
12import hashlib 11import hashlib
13import logging 12import logging
14import multiprocessing 13import multiprocessing
@@ -552,45 +551,6 @@ class HashEquivalenceCommonTests(object):
552 # shares a taskhash with Task 2 551 # shares a taskhash with Task 2
553 self.assertClientGetHash(self.client, taskhash2, unihash2) 552 self.assertClientGetHash(self.client, taskhash2, unihash2)
554 553
555
556 def test_client_pool_get_unihashes(self):
557 TEST_INPUT = (
558 # taskhash outhash unihash
559 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
560 # Duplicated taskhash with multiple output hashes and unihashes.
561 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
562 # Equivalent hash
563 ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
564 ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
565 ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
566 ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
567 ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
568 )
569 EXTRA_QUERIES = (
570 "6b6be7a84ab179b4240c4302518dc3f6",
571 )
572
573 with ClientPool(self.server_address, 10) as client_pool:
574 for taskhash, outhash, unihash in TEST_INPUT:
575 self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
576
577 query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)}
578 for idx, taskhash in enumerate(EXTRA_QUERIES):
579 query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash)
580
581 result = client_pool.get_unihashes(query)
582
583 self.assertDictEqual(result, {
584 0: "218e57509998197d570e2c98512d0105985dffc9",
585 1: "218e57509998197d570e2c98512d0105985dffc9",
586 2: "218e57509998197d570e2c98512d0105985dffc9",
587 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d",
588 4: "f46d3fbb439bd9b921095da657a4de906510d2cd",
589 5: "f46d3fbb439bd9b921095da657a4de906510d2cd",
590 6: "05d2a63c81e32f0a36542ca677e8ad852365c538",
591 7: None,
592 })
593
594 def test_get_unihash_batch(self): 554 def test_get_unihash_batch(self):
595 TEST_INPUT = ( 555 TEST_INPUT = (
596 # taskhash outhash unihash 556 # taskhash outhash unihash
@@ -628,48 +588,6 @@ class HashEquivalenceCommonTests(object):
628 None, 588 None,
629 ]) 589 ])
630 590
631 def test_client_pool_unihash_exists(self):
632 TEST_INPUT = (
633 # taskhash outhash unihash
634 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
635 # Duplicated taskhash with multiple output hashes and unihashes.
636 ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
637 # Equivalent hash
638 ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
639 ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
640 ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
641 ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
642 ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
643 )
644 EXTRA_QUERIES = (
645 "6b6be7a84ab179b4240c4302518dc3f6",
646 )
647
648 result_unihashes = set()
649
650
651 with ClientPool(self.server_address, 10) as client_pool:
652 for taskhash, outhash, unihash in TEST_INPUT:
653 result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
654 result_unihashes.add(result["unihash"])
655
656 query = {}
657 expected = {}
658
659 for _, _, unihash in TEST_INPUT:
660 idx = len(query)
661 query[idx] = unihash
662 expected[idx] = unihash in result_unihashes
663
664
665 for unihash in EXTRA_QUERIES:
666 idx = len(query)
667 query[idx] = unihash
668 expected[idx] = False
669
670 result = client_pool.unihashes_exist(query)
671 self.assertDictEqual(result, expected)
672
673 def test_unihash_exists_batch(self): 591 def test_unihash_exists_batch(self):
674 TEST_INPUT = ( 592 TEST_INPUT = (
675 # taskhash outhash unihash 593 # taskhash outhash unihash