From 1afe96a7e997ce7748f066b206a85ac648f7a87c Mon Sep 17 00:00:00 2001 From: Gavin Mak Date: Mon, 20 Oct 2025 11:13:09 -0700 Subject: sync: fix saving of fetch times and local state Interleaved sync didn't save _fetch_times and _local_sync_state to disk. Phased sync saved them, but incorrectly applied moving average smoothing repeatedly when fetching submodules, and discarded historical data during partial syncs. Move .Save() calls to the end of main sync loops to ensure they run once. Update _FetchTimes.Save() to merge new data with existing history, preventing data loss. Change-Id: I174f98a62ac86859f1eeea1daba65eb35c227852 Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/519821 Commit-Queue: Gavin Mak Reviewed-by: Scott Lee Tested-by: Gavin Mak --- subcmds/sync.py | 311 ++++++++++++++++++++++++--------------------- tests/test_subcmds_sync.py | 3 + 2 files changed, 167 insertions(+), 147 deletions(-) diff --git a/subcmds/sync.py b/subcmds/sync.py index 582bd057..f9500314 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -975,9 +975,6 @@ later is required to fix a server side protocol bug. sync_event.set() sync_progress_thread.join() - self._fetch_times.Save() - self._local_sync_state.Save() - if not self.outer_client.manifest.IsArchive: self._GCProjects(projects, opt, err_event) @@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug. to_fetch.extend(all_projects) to_fetch.sort(key=self._fetch_times.Get, reverse=True) - result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) - success = result.success - fetched = result.projects - if not success: - err_event.set() - - if opt.network_only: - # Bail out now; the rest touches the working tree. - if err_event.is_set(): - e = SyncError( - "error: Exited sync due to fetch errors.", - aggregate_errors=errors, - ) - - logger.error(e) - raise e - return _FetchMainResult([]) - - # Iteratively fetch missing and/or nested unregistered submodules. - previously_missing_set = set() - while True: - self._ReloadManifest(None, manifest) - all_projects = self.GetProjects( - args, - missing_ok=True, - submodules_ok=opt.fetch_submodules, - manifest=manifest, - all_manifests=not opt.this_manifest_only, - ) - missing = [] - for project in all_projects: - if project.gitdir not in fetched: - missing.append(project) - if not missing: - break - # Stop us from non-stopped fetching actually-missing repos: If set - # of missing repos has not been changed from last fetch, we break. - missing_set = {p.name for p in missing} - if previously_missing_set == missing_set: - break - previously_missing_set = missing_set - result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) + try: + result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) success = result.success - new_fetched = result.projects + fetched = result.projects if not success: err_event.set() - fetched.update(new_fetched) + + if opt.network_only: + # Bail out now; the rest touches the working tree. + if err_event.is_set(): + e = SyncError( + "error: Exited sync due to fetch errors.", + aggregate_errors=errors, + ) + + logger.error(e) + raise e + return _FetchMainResult([]) + + # Iteratively fetch missing and/or nested unregistered submodules. + previously_missing_set = set() + while True: + self._ReloadManifest(None, manifest) + all_projects = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + missing = [] + for project in all_projects: + if project.gitdir not in fetched: + missing.append(project) + if not missing: + break + # Stop us from non-stopped fetching actually-missing repos: If + # set of missing repos has not been changed from last fetch, we + # break. + missing_set = {p.name for p in missing} + if previously_missing_set == missing_set: + break + previously_missing_set = missing_set + result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) + success = result.success + new_fetched = result.projects + if not success: + err_event.set() + fetched.update(new_fetched) + finally: + self._fetch_times.Save() + self._local_sync_state.Save() return _FetchMainResult(all_projects) @@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug. sync_event = _threading.Event() sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) - with multiprocessing.Manager() as manager, ssh.ProxyManager( - manager - ) as ssh_proxy: - ssh_proxy.sock() - with self.ParallelContext(): - self.get_parallel_context()["ssh_proxy"] = ssh_proxy - # TODO(gavinmak): Use multprocessing.Queue instead of dict. - self.get_parallel_context()[ - "sync_dict" - ] = multiprocessing.Manager().dict() - sync_progress_thread.start() + try: + with multiprocessing.Manager() as manager, ssh.ProxyManager( + manager + ) as ssh_proxy: + ssh_proxy.sock() + with self.ParallelContext(): + self.get_parallel_context()["ssh_proxy"] = ssh_proxy + # TODO(gavinmak): Use multprocessing.Queue instead of dict. + self.get_parallel_context()[ + "sync_dict" + ] = multiprocessing.Manager().dict() + sync_progress_thread.start() - try: - # Outer loop for dynamic project discovery. This continues - # until no unsynced projects remain. - while True: - projects_to_sync = [ - p - for p in project_list - if p.relpath not in finished_relpaths - ] - if not projects_to_sync: - break - - pending_relpaths = {p.relpath for p in projects_to_sync} - if previously_pending_relpaths == pending_relpaths: - stalled_projects_str = "\n".join( - f" - {path}" - for path in sorted(list(pending_relpaths)) - ) - logger.error( - "The following projects failed and could not " - "be synced:\n%s", - stalled_projects_str, - ) - err_event.set() - break - previously_pending_relpaths = pending_relpaths - - self.get_parallel_context()[ - "projects" - ] = projects_to_sync - project_index_map = { - p: i for i, p in enumerate(projects_to_sync) - } - - # Inner loop to process projects in a hierarchical - # order. This iterates through levels of project - # dependencies (e.g. 'foo' then 'foo/bar'). All projects - # in one level can be processed in parallel, but we must - # wait for a level to complete before starting the next. - for level_projects in _SafeCheckoutOrder( - projects_to_sync - ): - if not level_projects: - continue - - objdir_project_map = collections.defaultdict(list) - for p in level_projects: - objdir_project_map[p.objdir].append( - project_index_map[p] + try: + # Outer loop for dynamic project discovery. This + # continues until no unsynced projects remain. + while True: + projects_to_sync = [ + p + for p in project_list + if p.relpath not in finished_relpaths + ] + if not projects_to_sync: + break + + pending_relpaths = { + p.relpath for p in projects_to_sync + } + if previously_pending_relpaths == pending_relpaths: + stalled_projects_str = "\n".join( + f" - {path}" + for path in sorted(list(pending_relpaths)) + ) + logger.error( + "The following projects failed and could " + "not be synced:\n%s", + stalled_projects_str, ) - - work_items = list(objdir_project_map.values()) - if not work_items: - continue - - jobs = max(1, min(opt.jobs, len(work_items))) - callback = functools.partial( - self._ProcessSyncInterleavedResults, - finished_relpaths, - err_event, - errors, - opt, - ) - if not self.ExecuteInParallel( - jobs, - functools.partial(self._SyncProjectList, opt), - work_items, - callback=callback, - output=pm, - chunksize=1, - initializer=self.InitWorker, - ): err_event.set() + break + previously_pending_relpaths = pending_relpaths + + self.get_parallel_context()[ + "projects" + ] = projects_to_sync + project_index_map = { + p: i for i, p in enumerate(projects_to_sync) + } + + # Inner loop to process projects in a hierarchical + # order. This iterates through levels of project + # dependencies (e.g. 'foo' then 'foo/bar'). All + # projects in one level can be processed in + # parallel, but we must wait for a level to complete + # before starting the next. + for level_projects in _SafeCheckoutOrder( + projects_to_sync + ): + if not level_projects: + continue - if err_event.is_set() and opt.fail_fast: - raise SyncFailFastError(aggregate_errors=errors) - - self._ReloadManifest(None, manifest) - project_list = self.GetProjects( - args, - missing_ok=True, - submodules_ok=opt.fetch_submodules, - manifest=manifest, - all_manifests=not opt.this_manifest_only, - ) - pm.update_total(len(project_list)) - finally: - sync_event.set() - sync_progress_thread.join() + objdir_project_map = collections.defaultdict( + list + ) + for p in level_projects: + objdir_project_map[p.objdir].append( + project_index_map[p] + ) + + work_items = list(objdir_project_map.values()) + if not work_items: + continue + + jobs = max(1, min(opt.jobs, len(work_items))) + callback = functools.partial( + self._ProcessSyncInterleavedResults, + finished_relpaths, + err_event, + errors, + opt, + ) + if not self.ExecuteInParallel( + jobs, + functools.partial( + self._SyncProjectList, opt + ), + work_items, + callback=callback, + output=pm, + chunksize=1, + initializer=self.InitWorker, + ): + err_event.set() + + if err_event.is_set() and opt.fail_fast: + raise SyncFailFastError( + aggregate_errors=errors + ) + + self._ReloadManifest(None, manifest) + project_list = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + pm.update_total(len(project_list)) + finally: + sync_event.set() + sync_progress_thread.join() + finally: + self._fetch_times.Save() + self._local_sync_state.Save() pm.end() @@ -2695,17 +2710,19 @@ class _FetchTimes: self._saved = {} def Save(self): - if self._saved is None: + if not self._seen: return + self._Load() + for name, t in self._seen.items(): # Keep a moving average across the previous/current sync runs. old = self._saved.get(name, t) - self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) + self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) try: with open(self._path, "w") as f: - json.dump(self._seen, f, indent=2) + json.dump(self._saved, f, indent=2) except (OSError, TypeError): platform_utils.remove(self._path, missing_ok=True) diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 6c9cc9ab..6eb8a5a7 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py @@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase): # Mock _GetCurrentBranchOnly for worker tests. mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() + self.cmd._fetch_times = mock.Mock() + self.cmd._local_sync_state = mock.Mock() + def tearDown(self): """Clean up resources.""" shutil.rmtree(self.repodir) -- cgit v1.2.3-54-g00ecf