diff options
author | Gavin Mak <gavinmak@google.com> | 2025-10-20 11:13:09 -0700 |
---|---|---|
committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2025-10-20 11:28:21 -0700 |
commit | 1afe96a7e997ce7748f066b206a85ac648f7a87c (patch) | |
tree | 69f31057506ad29f5e415819b52decf577960469 | |
parent | 2719a8e203e43b34a437b510092758870b81cae6 (diff) | |
download | git-repo-main.tar.gz |
Interleaved sync didn't save _fetch_times and _local_sync_state to disk.
Phased sync saved them, but incorrectly applied moving average smoothing
repeatedly when fetching submodules, and discarded historical data
during partial syncs.
Move .Save() calls to the end of main sync loops to ensure they run
once. Update _FetchTimes.Save() to merge new data with existing history,
preventing data loss.
Change-Id: I174f98a62ac86859f1eeea1daba65eb35c227852
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/519821
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
-rw-r--r-- | subcmds/sync.py | 311 | ||||
-rw-r--r-- | tests/test_subcmds_sync.py | 3 |
2 files changed, 167 insertions, 147 deletions
diff --git a/subcmds/sync.py b/subcmds/sync.py index 582bd057..f9500314 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
@@ -975,9 +975,6 @@ later is required to fix a server side protocol bug. | |||
975 | sync_event.set() | 975 | sync_event.set() |
976 | sync_progress_thread.join() | 976 | sync_progress_thread.join() |
977 | 977 | ||
978 | self._fetch_times.Save() | ||
979 | self._local_sync_state.Save() | ||
980 | |||
981 | if not self.outer_client.manifest.IsArchive: | 978 | if not self.outer_client.manifest.IsArchive: |
982 | self._GCProjects(projects, opt, err_event) | 979 | self._GCProjects(projects, opt, err_event) |
983 | 980 | ||
@@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug. | |||
1003 | to_fetch.extend(all_projects) | 1000 | to_fetch.extend(all_projects) |
1004 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 1001 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) |
1005 | 1002 | ||
1006 | result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) | 1003 | try: |
1007 | success = result.success | 1004 | result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) |
1008 | fetched = result.projects | ||
1009 | if not success: | ||
1010 | err_event.set() | ||
1011 | |||
1012 | if opt.network_only: | ||
1013 | # Bail out now; the rest touches the working tree. | ||
1014 | if err_event.is_set(): | ||
1015 | e = SyncError( | ||
1016 | "error: Exited sync due to fetch errors.", | ||
1017 | aggregate_errors=errors, | ||
1018 | ) | ||
1019 | |||
1020 | logger.error(e) | ||
1021 | raise e | ||
1022 | return _FetchMainResult([]) | ||
1023 | |||
1024 | # Iteratively fetch missing and/or nested unregistered submodules. | ||
1025 | previously_missing_set = set() | ||
1026 | while True: | ||
1027 | self._ReloadManifest(None, manifest) | ||
1028 | all_projects = self.GetProjects( | ||
1029 | args, | ||
1030 | missing_ok=True, | ||
1031 | submodules_ok=opt.fetch_submodules, | ||
1032 | manifest=manifest, | ||
1033 | all_manifests=not opt.this_manifest_only, | ||
1034 | ) | ||
1035 | missing = [] | ||
1036 | for project in all_projects: | ||
1037 | if project.gitdir not in fetched: | ||
1038 | missing.append(project) | ||
1039 | if not missing: | ||
1040 | break | ||
1041 | # Stop us from non-stopped fetching actually-missing repos: If set | ||
1042 | # of missing repos has not been changed from last fetch, we break. | ||
1043 | missing_set = {p.name for p in missing} | ||
1044 | if previously_missing_set == missing_set: | ||
1045 | break | ||
1046 | previously_missing_set = missing_set | ||
1047 | result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) | ||
1048 | success = result.success | 1005 | success = result.success |
1049 | new_fetched = result.projects | 1006 | fetched = result.projects |
1050 | if not success: | 1007 | if not success: |
1051 | err_event.set() | 1008 | err_event.set() |
1052 | fetched.update(new_fetched) | 1009 | |
1010 | if opt.network_only: | ||
1011 | # Bail out now; the rest touches the working tree. | ||
1012 | if err_event.is_set(): | ||
1013 | e = SyncError( | ||
1014 | "error: Exited sync due to fetch errors.", | ||
1015 | aggregate_errors=errors, | ||
1016 | ) | ||
1017 | |||
1018 | logger.error(e) | ||
1019 | raise e | ||
1020 | return _FetchMainResult([]) | ||
1021 | |||
1022 | # Iteratively fetch missing and/or nested unregistered submodules. | ||
1023 | previously_missing_set = set() | ||
1024 | while True: | ||
1025 | self._ReloadManifest(None, manifest) | ||
1026 | all_projects = self.GetProjects( | ||
1027 | args, | ||
1028 | missing_ok=True, | ||
1029 | submodules_ok=opt.fetch_submodules, | ||
1030 | manifest=manifest, | ||
1031 | all_manifests=not opt.this_manifest_only, | ||
1032 | ) | ||
1033 | missing = [] | ||
1034 | for project in all_projects: | ||
1035 | if project.gitdir not in fetched: | ||
1036 | missing.append(project) | ||
1037 | if not missing: | ||
1038 | break | ||
1039 | # Stop us from non-stopped fetching actually-missing repos: If | ||
1040 | # set of missing repos has not been changed from last fetch, we | ||
1041 | # break. | ||
1042 | missing_set = {p.name for p in missing} | ||
1043 | if previously_missing_set == missing_set: | ||
1044 | break | ||
1045 | previously_missing_set = missing_set | ||
1046 | result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) | ||
1047 | success = result.success | ||
1048 | new_fetched = result.projects | ||
1049 | if not success: | ||
1050 | err_event.set() | ||
1051 | fetched.update(new_fetched) | ||
1052 | finally: | ||
1053 | self._fetch_times.Save() | ||
1054 | self._local_sync_state.Save() | ||
1053 | 1055 | ||
1054 | return _FetchMainResult(all_projects) | 1056 | return _FetchMainResult(all_projects) |
1055 | 1057 | ||
@@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug. | |||
2491 | sync_event = _threading.Event() | 2493 | sync_event = _threading.Event() |
2492 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | 2494 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) |
2493 | 2495 | ||
2494 | with multiprocessing.Manager() as manager, ssh.ProxyManager( | 2496 | try: |
2495 | manager | 2497 | with multiprocessing.Manager() as manager, ssh.ProxyManager( |
2496 | ) as ssh_proxy: | 2498 | manager |
2497 | ssh_proxy.sock() | 2499 | ) as ssh_proxy: |
2498 | with self.ParallelContext(): | 2500 | ssh_proxy.sock() |
2499 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | 2501 | with self.ParallelContext(): |
2500 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | 2502 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy |
2501 | self.get_parallel_context()[ | 2503 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. |
2502 | "sync_dict" | 2504 | self.get_parallel_context()[ |
2503 | ] = multiprocessing.Manager().dict() | 2505 | "sync_dict" |
2504 | sync_progress_thread.start() | 2506 | ] = multiprocessing.Manager().dict() |
2507 | sync_progress_thread.start() | ||
2505 | 2508 | ||
2506 | try: | 2509 | try: |
2507 | # Outer loop for dynamic project discovery. This continues | 2510 | # Outer loop for dynamic project discovery. This |
2508 | # until no unsynced projects remain. | 2511 | # continues until no unsynced projects remain. |
2509 | while True: | 2512 | while True: |
2510 | projects_to_sync = [ | 2513 | projects_to_sync = [ |
2511 | p | 2514 | p |
2512 | for p in project_list | 2515 | for p in project_list |
2513 | if p.relpath not in finished_relpaths | 2516 | if p.relpath not in finished_relpaths |
2514 | ] | 2517 | ] |
2515 | if not projects_to_sync: | 2518 | if not projects_to_sync: |
2516 | break | 2519 | break |
2517 | 2520 | ||
2518 | pending_relpaths = {p.relpath for p in projects_to_sync} | 2521 | pending_relpaths = { |
2519 | if previously_pending_relpaths == pending_relpaths: | 2522 | p.relpath for p in projects_to_sync |
2520 | stalled_projects_str = "\n".join( | 2523 | } |
2521 | f" - {path}" | 2524 | if previously_pending_relpaths == pending_relpaths: |
2522 | for path in sorted(list(pending_relpaths)) | 2525 | stalled_projects_str = "\n".join( |
2523 | ) | 2526 | f" - {path}" |
2524 | logger.error( | 2527 | for path in sorted(list(pending_relpaths)) |
2525 | "The following projects failed and could not " | 2528 | ) |
2526 | "be synced:\n%s", | 2529 | logger.error( |
2527 | stalled_projects_str, | 2530 | "The following projects failed and could " |
2528 | ) | 2531 | "not be synced:\n%s", |
2529 | err_event.set() | 2532 | stalled_projects_str, |
2530 | break | ||
2531 | previously_pending_relpaths = pending_relpaths | ||
2532 | |||
2533 | self.get_parallel_context()[ | ||
2534 | "projects" | ||
2535 | ] = projects_to_sync | ||
2536 | project_index_map = { | ||
2537 | p: i for i, p in enumerate(projects_to_sync) | ||
2538 | } | ||
2539 | |||
2540 | # Inner loop to process projects in a hierarchical | ||
2541 | # order. This iterates through levels of project | ||
2542 | # dependencies (e.g. 'foo' then 'foo/bar'). All projects | ||
2543 | # in one level can be processed in parallel, but we must | ||
2544 | # wait for a level to complete before starting the next. | ||
2545 | for level_projects in _SafeCheckoutOrder( | ||
2546 | projects_to_sync | ||
2547 | ): | ||
2548 | if not level_projects: | ||
2549 | continue | ||
2550 | |||
2551 | objdir_project_map = collections.defaultdict(list) | ||
2552 | for p in level_projects: | ||
2553 | objdir_project_map[p.objdir].append( | ||
2554 | project_index_map[p] | ||
2555 | ) | 2533 | ) |
2556 | |||
2557 | work_items = list(objdir_project_map.values()) | ||
2558 | if not work_items: | ||
2559 | continue | ||
2560 | |||
2561 | jobs = max(1, min(opt.jobs, len(work_items))) | ||
2562 | callback = functools.partial( | ||
2563 | self._ProcessSyncInterleavedResults, | ||
2564 | finished_relpaths, | ||
2565 | err_event, | ||
2566 | errors, | ||
2567 | opt, | ||
2568 | ) | ||
2569 | if not self.ExecuteInParallel( | ||
2570 | jobs, | ||
2571 | functools.partial(self._SyncProjectList, opt), | ||
2572 | work_items, | ||
2573 | callback=callback, | ||
2574 | output=pm, | ||
2575 | chunksize=1, | ||
2576 | initializer=self.InitWorker, | ||
2577 | ): | ||
2578 | err_event.set() | 2534 | err_event.set() |
2535 | break | ||
2536 | previously_pending_relpaths = pending_relpaths | ||
2537 | |||
2538 | self.get_parallel_context()[ | ||
2539 | "projects" | ||
2540 | ] = projects_to_sync | ||
2541 | project_index_map = { | ||
2542 | p: i for i, p in enumerate(projects_to_sync) | ||
2543 | } | ||
2544 | |||
2545 | # Inner loop to process projects in a hierarchical | ||
2546 | # order. This iterates through levels of project | ||
2547 | # dependencies (e.g. 'foo' then 'foo/bar'). All | ||
2548 | # projects in one level can be processed in | ||
2549 | # parallel, but we must wait for a level to complete | ||
2550 | # before starting the next. | ||
2551 | for level_projects in _SafeCheckoutOrder( | ||
2552 | projects_to_sync | ||
2553 | ): | ||
2554 | if not level_projects: | ||
2555 | continue | ||
2579 | 2556 | ||
2580 | if err_event.is_set() and opt.fail_fast: | 2557 | objdir_project_map = collections.defaultdict( |
2581 | raise SyncFailFastError(aggregate_errors=errors) | 2558 | list |
2582 | 2559 | ) | |
2583 | self._ReloadManifest(None, manifest) | 2560 | for p in level_projects: |
2584 | project_list = self.GetProjects( | 2561 | objdir_project_map[p.objdir].append( |
2585 | args, | 2562 | project_index_map[p] |
2586 | missing_ok=True, | 2563 | ) |
2587 | submodules_ok=opt.fetch_submodules, | 2564 | |
2588 | manifest=manifest, | 2565 | work_items = list(objdir_project_map.values()) |
2589 | all_manifests=not opt.this_manifest_only, | 2566 | if not work_items: |
2590 | ) | 2567 | continue |
2591 | pm.update_total(len(project_list)) | 2568 | |
2592 | finally: | 2569 | jobs = max(1, min(opt.jobs, len(work_items))) |
2593 | sync_event.set() | 2570 | callback = functools.partial( |
2594 | sync_progress_thread.join() | 2571 | self._ProcessSyncInterleavedResults, |
2572 | finished_relpaths, | ||
2573 | err_event, | ||
2574 | errors, | ||
2575 | opt, | ||
2576 | ) | ||
2577 | if not self.ExecuteInParallel( | ||
2578 | jobs, | ||
2579 | functools.partial( | ||
2580 | self._SyncProjectList, opt | ||
2581 | ), | ||
2582 | work_items, | ||
2583 | callback=callback, | ||
2584 | output=pm, | ||
2585 | chunksize=1, | ||
2586 | initializer=self.InitWorker, | ||
2587 | ): | ||
2588 | err_event.set() | ||
2589 | |||
2590 | if err_event.is_set() and opt.fail_fast: | ||
2591 | raise SyncFailFastError( | ||
2592 | aggregate_errors=errors | ||
2593 | ) | ||
2594 | |||
2595 | self._ReloadManifest(None, manifest) | ||
2596 | project_list = self.GetProjects( | ||
2597 | args, | ||
2598 | missing_ok=True, | ||
2599 | submodules_ok=opt.fetch_submodules, | ||
2600 | manifest=manifest, | ||
2601 | all_manifests=not opt.this_manifest_only, | ||
2602 | ) | ||
2603 | pm.update_total(len(project_list)) | ||
2604 | finally: | ||
2605 | sync_event.set() | ||
2606 | sync_progress_thread.join() | ||
2607 | finally: | ||
2608 | self._fetch_times.Save() | ||
2609 | self._local_sync_state.Save() | ||
2595 | 2610 | ||
2596 | pm.end() | 2611 | pm.end() |
2597 | 2612 | ||
@@ -2695,17 +2710,19 @@ class _FetchTimes: | |||
2695 | self._saved = {} | 2710 | self._saved = {} |
2696 | 2711 | ||
2697 | def Save(self): | 2712 | def Save(self): |
2698 | if self._saved is None: | 2713 | if not self._seen: |
2699 | return | 2714 | return |
2700 | 2715 | ||
2716 | self._Load() | ||
2717 | |||
2701 | for name, t in self._seen.items(): | 2718 | for name, t in self._seen.items(): |
2702 | # Keep a moving average across the previous/current sync runs. | 2719 | # Keep a moving average across the previous/current sync runs. |
2703 | old = self._saved.get(name, t) | 2720 | old = self._saved.get(name, t) |
2704 | self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) | 2721 | self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) |
2705 | 2722 | ||
2706 | try: | 2723 | try: |
2707 | with open(self._path, "w") as f: | 2724 | with open(self._path, "w") as f: |
2708 | json.dump(self._seen, f, indent=2) | 2725 | json.dump(self._saved, f, indent=2) |
2709 | except (OSError, TypeError): | 2726 | except (OSError, TypeError): |
2710 | platform_utils.remove(self._path, missing_ok=True) | 2727 | platform_utils.remove(self._path, missing_ok=True) |
2711 | 2728 | ||
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 6c9cc9ab..6eb8a5a7 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
@@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase): | |||
681 | # Mock _GetCurrentBranchOnly for worker tests. | 681 | # Mock _GetCurrentBranchOnly for worker tests. |
682 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() | 682 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() |
683 | 683 | ||
684 | self.cmd._fetch_times = mock.Mock() | ||
685 | self.cmd._local_sync_state = mock.Mock() | ||
686 | |||
684 | def tearDown(self): | 687 | def tearDown(self): |
685 | """Clean up resources.""" | 688 | """Clean up resources.""" |
686 | shutil.rmtree(self.repodir) | 689 | shutil.rmtree(self.repodir) |