diff options
author | Joshua Watt <JPEWhacker@gmail.com> | 2024-05-30 09:41:26 -0600 |
---|---|---|
committer | Richard Purdie <richard.purdie@linuxfoundation.org> | 2024-05-31 16:56:25 +0100 |
commit | 247d08ae0765fdd73f80e7608f76e36983e2109d (patch) | |
tree | 69c1359556e1235276f10052cf3959c4748fc536 /bitbake | |
parent | f618d1dfd7dd414cb458467d0e35b135d6e7cd32 (diff) | |
download | poky-247d08ae0765fdd73f80e7608f76e36983e2109d.tar.gz |
bitbake: asyncrpc: Remove ClientPool
Batching support on the client side has proven to be a much more
effective way of dealing with server latency than multiple client
connections and is also much nicer on the server, so drop the client
pool support from asyncrpc and the hash server
(Bitbake rev: 6f80560f1c7010d09fe5448fdde616aef8468102)
Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Diffstat (limited to 'bitbake')
-rw-r--r-- | bitbake/lib/bb/asyncrpc/__init__.py | 2 | ||||
-rw-r--r-- | bitbake/lib/bb/asyncrpc/client.py | 76 | ||||
-rw-r--r-- | bitbake/lib/hashserv/client.py | 80 | ||||
-rw-r--r-- | bitbake/lib/hashserv/tests.py | 82 |
4 files changed, 2 insertions, 238 deletions
diff --git a/bitbake/lib/bb/asyncrpc/__init__.py b/bitbake/lib/bb/asyncrpc/__init__.py index 639e1607f8..a4371643d7 100644 --- a/bitbake/lib/bb/asyncrpc/__init__.py +++ b/bitbake/lib/bb/asyncrpc/__init__.py | |||
@@ -5,7 +5,7 @@ | |||
5 | # | 5 | # |
6 | 6 | ||
7 | 7 | ||
8 | from .client import AsyncClient, Client, ClientPool | 8 | from .client import AsyncClient, Client |
9 | from .serv import AsyncServer, AsyncServerConnection | 9 | from .serv import AsyncServer, AsyncServerConnection |
10 | from .connection import DEFAULT_MAX_CHUNK | 10 | from .connection import DEFAULT_MAX_CHUNK |
11 | from .exceptions import ( | 11 | from .exceptions import ( |
diff --git a/bitbake/lib/bb/asyncrpc/client.py b/bitbake/lib/bb/asyncrpc/client.py index f81ad92f48..11179b0fcb 100644 --- a/bitbake/lib/bb/asyncrpc/client.py +++ b/bitbake/lib/bb/asyncrpc/client.py | |||
@@ -29,6 +29,7 @@ WEBSOCKETS_MIN_VERSION = (9, 1) | |||
29 | if sys.version_info >= (3, 10, 0): | 29 | if sys.version_info >= (3, 10, 0): |
30 | WEBSOCKETS_MIN_VERSION = (10, 0) | 30 | WEBSOCKETS_MIN_VERSION = (10, 0) |
31 | 31 | ||
32 | |||
32 | def parse_address(addr): | 33 | def parse_address(addr): |
33 | if addr.startswith(UNIX_PREFIX): | 34 | if addr.startswith(UNIX_PREFIX): |
34 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) | 35 | return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX) :],)) |
@@ -259,78 +260,3 @@ class Client(object): | |||
259 | def __exit__(self, exc_type, exc_value, traceback): | 260 | def __exit__(self, exc_type, exc_value, traceback): |
260 | self.close() | 261 | self.close() |
261 | return False | 262 | return False |
262 | |||
263 | |||
264 | class ClientPool(object): | ||
265 | def __init__(self, max_clients): | ||
266 | self.avail_clients = [] | ||
267 | self.num_clients = 0 | ||
268 | self.max_clients = max_clients | ||
269 | self.loop = None | ||
270 | self.client_condition = None | ||
271 | |||
272 | @abc.abstractmethod | ||
273 | async def _new_client(self): | ||
274 | raise NotImplementedError("Must be implemented in derived class") | ||
275 | |||
276 | def close(self): | ||
277 | if self.client_condition: | ||
278 | self.client_condition = None | ||
279 | |||
280 | if self.loop: | ||
281 | self.loop.run_until_complete(self.__close_clients()) | ||
282 | self.loop.run_until_complete(self.loop.shutdown_asyncgens()) | ||
283 | self.loop.close() | ||
284 | self.loop = None | ||
285 | |||
286 | def run_tasks(self, tasks): | ||
287 | if not self.loop: | ||
288 | self.loop = asyncio.new_event_loop() | ||
289 | |||
290 | thread = Thread(target=self.__thread_main, args=(tasks,)) | ||
291 | thread.start() | ||
292 | thread.join() | ||
293 | |||
294 | @contextlib.asynccontextmanager | ||
295 | async def get_client(self): | ||
296 | async with self.client_condition: | ||
297 | if self.avail_clients: | ||
298 | client = self.avail_clients.pop() | ||
299 | elif self.num_clients < self.max_clients: | ||
300 | self.num_clients += 1 | ||
301 | client = await self._new_client() | ||
302 | else: | ||
303 | while not self.avail_clients: | ||
304 | await self.client_condition.wait() | ||
305 | client = self.avail_clients.pop() | ||
306 | |||
307 | try: | ||
308 | yield client | ||
309 | finally: | ||
310 | async with self.client_condition: | ||
311 | self.avail_clients.append(client) | ||
312 | self.client_condition.notify() | ||
313 | |||
314 | def __thread_main(self, tasks): | ||
315 | async def process_task(task): | ||
316 | async with self.get_client() as client: | ||
317 | await task(client) | ||
318 | |||
319 | asyncio.set_event_loop(self.loop) | ||
320 | if not self.client_condition: | ||
321 | self.client_condition = asyncio.Condition() | ||
322 | tasks = [process_task(t) for t in tasks] | ||
323 | self.loop.run_until_complete(asyncio.gather(*tasks)) | ||
324 | |||
325 | async def __close_clients(self): | ||
326 | for c in self.avail_clients: | ||
327 | await c.close() | ||
328 | self.avail_clients = [] | ||
329 | self.num_clients = 0 | ||
330 | |||
331 | def __enter__(self): | ||
332 | return self | ||
333 | |||
334 | def __exit__(self, exc_type, exc_value, traceback): | ||
335 | self.close() | ||
336 | return False | ||
diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index 775faf935a..d415617b20 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py | |||
@@ -352,83 +352,3 @@ class Client(bb.asyncrpc.Client): | |||
352 | 352 | ||
353 | def _get_async_client(self): | 353 | def _get_async_client(self): |
354 | return AsyncClient(self.username, self.password) | 354 | return AsyncClient(self.username, self.password) |
355 | |||
356 | |||
357 | class ClientPool(bb.asyncrpc.ClientPool): | ||
358 | def __init__( | ||
359 | self, | ||
360 | address, | ||
361 | max_clients, | ||
362 | *, | ||
363 | username=None, | ||
364 | password=None, | ||
365 | become=None, | ||
366 | ): | ||
367 | super().__init__(max_clients) | ||
368 | self.address = address | ||
369 | self.username = username | ||
370 | self.password = password | ||
371 | self.become = become | ||
372 | |||
373 | async def _new_client(self): | ||
374 | client = await create_async_client( | ||
375 | self.address, | ||
376 | username=self.username, | ||
377 | password=self.password, | ||
378 | ) | ||
379 | if self.become: | ||
380 | await client.become_user(self.become) | ||
381 | return client | ||
382 | |||
383 | def _run_key_tasks(self, queries, call): | ||
384 | results = {key: None for key in queries.keys()} | ||
385 | |||
386 | def make_task(key, args): | ||
387 | async def task(client): | ||
388 | nonlocal results | ||
389 | unihash = await call(client, args) | ||
390 | results[key] = unihash | ||
391 | |||
392 | return task | ||
393 | |||
394 | def gen_tasks(): | ||
395 | for key, args in queries.items(): | ||
396 | yield make_task(key, args) | ||
397 | |||
398 | self.run_tasks(gen_tasks()) | ||
399 | return results | ||
400 | |||
401 | def get_unihashes(self, queries): | ||
402 | """ | ||
403 | Query multiple unihashes in parallel. | ||
404 | |||
405 | The queries argument is a dictionary with arbitrary key. The values | ||
406 | must be a tuple of (method, taskhash). | ||
407 | |||
408 | Returns a dictionary with a corresponding key for each input key, and | ||
409 | the value is the queried unihash (which might be none if the query | ||
410 | failed) | ||
411 | """ | ||
412 | |||
413 | async def call(client, args): | ||
414 | method, taskhash = args | ||
415 | return await client.get_unihash(method, taskhash) | ||
416 | |||
417 | return self._run_key_tasks(queries, call) | ||
418 | |||
419 | def unihashes_exist(self, queries): | ||
420 | """ | ||
421 | Query multiple unihash existence checks in parallel. | ||
422 | |||
423 | The queries argument is a dictionary with arbitrary key. The values | ||
424 | must be a unihash. | ||
425 | |||
426 | Returns a dictionary with a corresponding key for each input key, and | ||
427 | the value is True or False if the unihash is known by the server (or | ||
428 | None if there was a failure) | ||
429 | """ | ||
430 | |||
431 | async def call(client, unihash): | ||
432 | return await client.unihash_exists(unihash) | ||
433 | |||
434 | return self._run_key_tasks(queries, call) | ||
diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index cf74d9de7e..13ccb20ebf 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py | |||
@@ -8,7 +8,6 @@ | |||
8 | from . import create_server, create_client | 8 | from . import create_server, create_client |
9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS | 9 | from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS |
10 | from bb.asyncrpc import InvokeError | 10 | from bb.asyncrpc import InvokeError |
11 | from .client import ClientPool | ||
12 | import hashlib | 11 | import hashlib |
13 | import logging | 12 | import logging |
14 | import multiprocessing | 13 | import multiprocessing |
@@ -552,45 +551,6 @@ class HashEquivalenceCommonTests(object): | |||
552 | # shares a taskhash with Task 2 | 551 | # shares a taskhash with Task 2 |
553 | self.assertClientGetHash(self.client, taskhash2, unihash2) | 552 | self.assertClientGetHash(self.client, taskhash2, unihash2) |
554 | 553 | ||
555 | |||
556 | def test_client_pool_get_unihashes(self): | ||
557 | TEST_INPUT = ( | ||
558 | # taskhash outhash unihash | ||
559 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
560 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
561 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
562 | # Equivalent hash | ||
563 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
564 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
565 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
566 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
567 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
568 | ) | ||
569 | EXTRA_QUERIES = ( | ||
570 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
571 | ) | ||
572 | |||
573 | with ClientPool(self.server_address, 10) as client_pool: | ||
574 | for taskhash, outhash, unihash in TEST_INPUT: | ||
575 | self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
576 | |||
577 | query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} | ||
578 | for idx, taskhash in enumerate(EXTRA_QUERIES): | ||
579 | query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) | ||
580 | |||
581 | result = client_pool.get_unihashes(query) | ||
582 | |||
583 | self.assertDictEqual(result, { | ||
584 | 0: "218e57509998197d570e2c98512d0105985dffc9", | ||
585 | 1: "218e57509998197d570e2c98512d0105985dffc9", | ||
586 | 2: "218e57509998197d570e2c98512d0105985dffc9", | ||
587 | 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", | ||
588 | 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
589 | 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", | ||
590 | 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", | ||
591 | 7: None, | ||
592 | }) | ||
593 | |||
594 | def test_get_unihash_batch(self): | 554 | def test_get_unihash_batch(self): |
595 | TEST_INPUT = ( | 555 | TEST_INPUT = ( |
596 | # taskhash outhash unihash | 556 | # taskhash outhash unihash |
@@ -628,48 +588,6 @@ class HashEquivalenceCommonTests(object): | |||
628 | None, | 588 | None, |
629 | ]) | 589 | ]) |
630 | 590 | ||
631 | def test_client_pool_unihash_exists(self): | ||
632 | TEST_INPUT = ( | ||
633 | # taskhash outhash unihash | ||
634 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), | ||
635 | # Duplicated taskhash with multiple output hashes and unihashes. | ||
636 | ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), | ||
637 | # Equivalent hash | ||
638 | ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), | ||
639 | ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), | ||
640 | ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), | ||
641 | ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), | ||
642 | ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), | ||
643 | ) | ||
644 | EXTRA_QUERIES = ( | ||
645 | "6b6be7a84ab179b4240c4302518dc3f6", | ||
646 | ) | ||
647 | |||
648 | result_unihashes = set() | ||
649 | |||
650 | |||
651 | with ClientPool(self.server_address, 10) as client_pool: | ||
652 | for taskhash, outhash, unihash in TEST_INPUT: | ||
653 | result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) | ||
654 | result_unihashes.add(result["unihash"]) | ||
655 | |||
656 | query = {} | ||
657 | expected = {} | ||
658 | |||
659 | for _, _, unihash in TEST_INPUT: | ||
660 | idx = len(query) | ||
661 | query[idx] = unihash | ||
662 | expected[idx] = unihash in result_unihashes | ||
663 | |||
664 | |||
665 | for unihash in EXTRA_QUERIES: | ||
666 | idx = len(query) | ||
667 | query[idx] = unihash | ||
668 | expected[idx] = False | ||
669 | |||
670 | result = client_pool.unihashes_exist(query) | ||
671 | self.assertDictEqual(result, expected) | ||
672 | |||
673 | def test_unihash_exists_batch(self): | 591 | def test_unihash_exists_batch(self): |
674 | TEST_INPUT = ( | 592 | TEST_INPUT = ( |
675 | # taskhash outhash unihash | 593 | # taskhash outhash unihash |