From 37b4d7e4931cb032659273c19520d74083ffb0e9 Mon Sep 17 00:00:00 2001 From: Joshua Watt Date: Sun, 18 Feb 2024 15:59:50 -0700 Subject: bitbake: hashserv: Add Client Pool Implements a Client Pool derived from the AsyncRPC client pool that allows querying for multiple equivalent hashes in parallel (Bitbake rev: ba4c764d8061c7b88cd4985ca493d6ea6e317106) Signed-off-by: Joshua Watt Signed-off-by: Richard Purdie --- bitbake/lib/hashserv/client.py | 80 ++++++++++++++++++++++++++++++++++++++++ bitbake/lib/hashserv/tests.py | 83 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) diff --git a/bitbake/lib/hashserv/client.py b/bitbake/lib/hashserv/client.py index daf1e12842..b269879ecf 100644 --- a/bitbake/lib/hashserv/client.py +++ b/bitbake/lib/hashserv/client.py @@ -283,3 +283,83 @@ class Client(bb.asyncrpc.Client): def _get_async_client(self): return AsyncClient(self.username, self.password) + + +class ClientPool(bb.asyncrpc.ClientPool): + def __init__( + self, + address, + max_clients, + *, + username=None, + password=None, + become=None, + ): + super().__init__(max_clients) + self.address = address + self.username = username + self.password = password + self.become = become + + async def _new_client(self): + client = await create_async_client( + self.address, + username=self.username, + password=self.password, + ) + if self.become: + await client.become_user(self.become) + return client + + def _run_key_tasks(self, queries, call): + results = {key: None for key in queries.keys()} + + def make_task(key, args): + async def task(client): + nonlocal results + unihash = await call(client, args) + results[key] = unihash + + return task + + def gen_tasks(): + for key, args in queries.items(): + yield make_task(key, args) + + self.run_tasks(gen_tasks()) + return results + + def get_unihashes(self, queries): + """ + Query multiple unihashes in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a tuple of (method, taskhash). + + Returns a dictionary with a corresponding key for each input key, and + the value is the queried unihash (which might be none if the query + failed) + """ + + async def call(client, args): + method, taskhash = args + return await client.get_unihash(method, taskhash) + + return self._run_key_tasks(queries, call) + + def unihashes_exist(self, queries): + """ + Query multiple unihash existence checks in parallel. + + The queries argument is a dictionary with arbitrary key. The values + must be a unihash. + + Returns a dictionary with a corresponding key for each input key, and + the value is True or False if the unihash is known by the server (or + None if there was a failure) + """ + + async def call(client, unihash): + return await client.unihash_exists(unihash) + + return self._run_key_tasks(queries, call) diff --git a/bitbake/lib/hashserv/tests.py b/bitbake/lib/hashserv/tests.py index fbbe81512a..0809453cf8 100644 --- a/bitbake/lib/hashserv/tests.py +++ b/bitbake/lib/hashserv/tests.py @@ -8,6 +8,7 @@ from . import create_server, create_client from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS from bb.asyncrpc import InvokeError +from .client import ClientPool import hashlib import logging import multiprocessing @@ -554,6 +555,88 @@ class HashEquivalenceCommonTests(object): # shares a taskhash with Task 2 self.assertClientGetHash(self.client, taskhash2, unihash2) + + def test_client_pool_get_unihashes(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + + query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)} + for idx, taskhash in enumerate(EXTRA_QUERIES): + query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash) + + result = client_pool.get_unihashes(query) + + self.assertDictEqual(result, { + 0: "218e57509998197d570e2c98512d0105985dffc9", + 1: "218e57509998197d570e2c98512d0105985dffc9", + 2: "218e57509998197d570e2c98512d0105985dffc9", + 3: "3b5d3d83f07f259e9086fcb422c855286e18a57d", + 4: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 5: "f46d3fbb439bd9b921095da657a4de906510d2cd", + 6: "05d2a63c81e32f0a36542ca677e8ad852365c538", + 7: None, + }) + + def test_client_pool_unihash_exists(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + result_unihashes = set() + + + with ClientPool(self.server_address, 10) as client_pool: + for taskhash, outhash, unihash in TEST_INPUT: + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result_unihashes.add(result["unihash"]) + + query = {} + expected = {} + + for _, _, unihash in TEST_INPUT: + idx = len(query) + query[idx] = unihash + expected[idx] = unihash in result_unihashes + + + for unihash in EXTRA_QUERIES: + idx = len(query) + query[idx] = unihash + expected[idx] = False + + result = client_pool.unihashes_exist(query) + self.assertDictEqual(result, expected) + + def test_auth_read_perms(self): admin_client = self.start_auth_server() -- cgit v1.2.3-54-g00ecf