From 6421d3330c10868faf01084d4fdd1f3f9874f3bd Mon Sep 17 00:00:00 2001 From: Jose Quaresma Date: Sat, 16 Apr 2022 23:28:45 +0100 Subject: sstate: Use the python3 ThreadPoolExecutor instead of the OE ThreadedPool For the FetchConnectionCache use a queue where each thread can get an unsed connection_cache that is properly initialized before we fireup the ThreadPoolExecutor. (From OE-Core rev: eb6a6820928472ef194b963b606454e731f9486f) Signed-off-by: Jose Quaresma Signed-off-by: Richard Purdie --- meta/classes/sstate.bbclass | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) (limited to 'meta/classes/sstate.bbclass') diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass index 3513269bca..0aa901fe89 100644 --- a/meta/classes/sstate.bbclass +++ b/meta/classes/sstate.bbclass @@ -977,15 +977,19 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, localdata.delVar('BB_NO_NETWORK') from bb.fetch2 import FetchConnectionCache - def checkstatus_init(thread_worker): - thread_worker.connection_cache = FetchConnectionCache() + def checkstatus_init(): + while not connection_cache_pool.full(): + connection_cache_pool.put(FetchConnectionCache()) - def checkstatus_end(thread_worker): - thread_worker.connection_cache.close_connections() + def checkstatus_end(): + while not connection_cache_pool.empty(): + connection_cache = connection_cache_pool.get() + connection_cache.close_connections() - def checkstatus(thread_worker, arg): + def checkstatus(arg): (tid, sstatefile) = arg + connection_cache = connection_cache_pool.get() localdata2 = bb.data.createCopy(localdata) srcuri = "file://" + sstatefile localdata2.setVar('SRC_URI', srcuri) @@ -995,7 +999,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, try: fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2, - connection_cache=thread_worker.connection_cache) + connection_cache=connection_cache) fetcher.checkstatus() bb.debug(2, "SState: Successful fetch test for %s" % srcuri) found.add(tid) @@ -1005,6 +1009,8 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, except Exception as e: bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc())) + connection_cache_pool.put(connection_cache) + if progress: bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d) @@ -1025,13 +1031,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True, fetcherenv = bb.fetch2.get_fetcher_environment(d) with bb.utils.environment(**fetcherenv): bb.event.enable_threadlock() - pool = oe.utils.ThreadedPool(nproc, len(tasklist), - worker_init=checkstatus_init, worker_end=checkstatus_end, - name="sstate_checkhashes-") - for t in tasklist: - pool.add_task(checkstatus, t) - pool.start() - pool.wait_completion() + import concurrent.futures + from queue import Queue + connection_cache_pool = Queue(nproc) + checkstatus_init() + with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor: + executor.map(checkstatus, tasklist.copy()) + checkstatus_end() bb.event.disable_threadlock() if progress: -- cgit v1.2.3-54-g00ecf