diff options
| -rw-r--r-- | subcmds/sync.py | 311 | ||||
| -rw-r--r-- | tests/test_subcmds_sync.py | 3 | 
2 files changed, 167 insertions, 147 deletions
| diff --git a/subcmds/sync.py b/subcmds/sync.py index 582bd057..f9500314 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -975,9 +975,6 @@ later is required to fix a server side protocol bug. | |||
| 975 | sync_event.set() | 975 | sync_event.set() | 
| 976 | sync_progress_thread.join() | 976 | sync_progress_thread.join() | 
| 977 | 977 | ||
| 978 | self._fetch_times.Save() | ||
| 979 | self._local_sync_state.Save() | ||
| 980 | |||
| 981 | if not self.outer_client.manifest.IsArchive: | 978 | if not self.outer_client.manifest.IsArchive: | 
| 982 | self._GCProjects(projects, opt, err_event) | 979 | self._GCProjects(projects, opt, err_event) | 
| 983 | 980 | ||
| @@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug. | |||
| 1003 | to_fetch.extend(all_projects) | 1000 | to_fetch.extend(all_projects) | 
| 1004 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 1001 | to_fetch.sort(key=self._fetch_times.Get, reverse=True) | 
| 1005 | 1002 | ||
| 1006 | result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) | 1003 | try: | 
| 1007 | success = result.success | 1004 | result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) | 
| 1008 | fetched = result.projects | ||
| 1009 | if not success: | ||
| 1010 | err_event.set() | ||
| 1011 | |||
| 1012 | if opt.network_only: | ||
| 1013 | # Bail out now; the rest touches the working tree. | ||
| 1014 | if err_event.is_set(): | ||
| 1015 | e = SyncError( | ||
| 1016 | "error: Exited sync due to fetch errors.", | ||
| 1017 | aggregate_errors=errors, | ||
| 1018 | ) | ||
| 1019 | |||
| 1020 | logger.error(e) | ||
| 1021 | raise e | ||
| 1022 | return _FetchMainResult([]) | ||
| 1023 | |||
| 1024 | # Iteratively fetch missing and/or nested unregistered submodules. | ||
| 1025 | previously_missing_set = set() | ||
| 1026 | while True: | ||
| 1027 | self._ReloadManifest(None, manifest) | ||
| 1028 | all_projects = self.GetProjects( | ||
| 1029 | args, | ||
| 1030 | missing_ok=True, | ||
| 1031 | submodules_ok=opt.fetch_submodules, | ||
| 1032 | manifest=manifest, | ||
| 1033 | all_manifests=not opt.this_manifest_only, | ||
| 1034 | ) | ||
| 1035 | missing = [] | ||
| 1036 | for project in all_projects: | ||
| 1037 | if project.gitdir not in fetched: | ||
| 1038 | missing.append(project) | ||
| 1039 | if not missing: | ||
| 1040 | break | ||
| 1041 | # Stop us from non-stopped fetching actually-missing repos: If set | ||
| 1042 | # of missing repos has not been changed from last fetch, we break. | ||
| 1043 | missing_set = {p.name for p in missing} | ||
| 1044 | if previously_missing_set == missing_set: | ||
| 1045 | break | ||
| 1046 | previously_missing_set = missing_set | ||
| 1047 | result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) | ||
| 1048 | success = result.success | 1005 | success = result.success | 
| 1049 | new_fetched = result.projects | 1006 | fetched = result.projects | 
| 1050 | if not success: | 1007 | if not success: | 
| 1051 | err_event.set() | 1008 | err_event.set() | 
| 1052 | fetched.update(new_fetched) | 1009 | |
| 1010 | if opt.network_only: | ||
| 1011 | # Bail out now; the rest touches the working tree. | ||
| 1012 | if err_event.is_set(): | ||
| 1013 | e = SyncError( | ||
| 1014 | "error: Exited sync due to fetch errors.", | ||
| 1015 | aggregate_errors=errors, | ||
| 1016 | ) | ||
| 1017 | |||
| 1018 | logger.error(e) | ||
| 1019 | raise e | ||
| 1020 | return _FetchMainResult([]) | ||
| 1021 | |||
| 1022 | # Iteratively fetch missing and/or nested unregistered submodules. | ||
| 1023 | previously_missing_set = set() | ||
| 1024 | while True: | ||
| 1025 | self._ReloadManifest(None, manifest) | ||
| 1026 | all_projects = self.GetProjects( | ||
| 1027 | args, | ||
| 1028 | missing_ok=True, | ||
| 1029 | submodules_ok=opt.fetch_submodules, | ||
| 1030 | manifest=manifest, | ||
| 1031 | all_manifests=not opt.this_manifest_only, | ||
| 1032 | ) | ||
| 1033 | missing = [] | ||
| 1034 | for project in all_projects: | ||
| 1035 | if project.gitdir not in fetched: | ||
| 1036 | missing.append(project) | ||
| 1037 | if not missing: | ||
| 1038 | break | ||
| 1039 | # Stop us from non-stopped fetching actually-missing repos: If | ||
| 1040 | # set of missing repos has not been changed from last fetch, we | ||
| 1041 | # break. | ||
| 1042 | missing_set = {p.name for p in missing} | ||
| 1043 | if previously_missing_set == missing_set: | ||
| 1044 | break | ||
| 1045 | previously_missing_set = missing_set | ||
| 1046 | result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) | ||
| 1047 | success = result.success | ||
| 1048 | new_fetched = result.projects | ||
| 1049 | if not success: | ||
| 1050 | err_event.set() | ||
| 1051 | fetched.update(new_fetched) | ||
| 1052 | finally: | ||
| 1053 | self._fetch_times.Save() | ||
| 1054 | self._local_sync_state.Save() | ||
| 1053 | 1055 | ||
| 1054 | return _FetchMainResult(all_projects) | 1056 | return _FetchMainResult(all_projects) | 
| 1055 | 1057 | ||
| @@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug. | |||
| 2491 | sync_event = _threading.Event() | 2493 | sync_event = _threading.Event() | 
| 2492 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | 2494 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | 
| 2493 | 2495 | ||
| 2494 | with multiprocessing.Manager() as manager, ssh.ProxyManager( | 2496 | try: | 
| 2495 | manager | 2497 | with multiprocessing.Manager() as manager, ssh.ProxyManager( | 
| 2496 | ) as ssh_proxy: | 2498 | manager | 
| 2497 | ssh_proxy.sock() | 2499 | ) as ssh_proxy: | 
| 2498 | with self.ParallelContext(): | 2500 | ssh_proxy.sock() | 
| 2499 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | 2501 | with self.ParallelContext(): | 
| 2500 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | 2502 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | 
| 2501 | self.get_parallel_context()[ | 2503 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | 
| 2502 | "sync_dict" | 2504 | self.get_parallel_context()[ | 
| 2503 | ] = multiprocessing.Manager().dict() | 2505 | "sync_dict" | 
| 2504 | sync_progress_thread.start() | 2506 | ] = multiprocessing.Manager().dict() | 
| 2507 | sync_progress_thread.start() | ||
| 2505 | 2508 | ||
| 2506 | try: | 2509 | try: | 
| 2507 | # Outer loop for dynamic project discovery. This continues | 2510 | # Outer loop for dynamic project discovery. This | 
| 2508 | # until no unsynced projects remain. | 2511 | # continues until no unsynced projects remain. | 
| 2509 | while True: | 2512 | while True: | 
| 2510 | projects_to_sync = [ | 2513 | projects_to_sync = [ | 
| 2511 | p | 2514 | p | 
| 2512 | for p in project_list | 2515 | for p in project_list | 
| 2513 | if p.relpath not in finished_relpaths | 2516 | if p.relpath not in finished_relpaths | 
| 2514 | ] | 2517 | ] | 
| 2515 | if not projects_to_sync: | 2518 | if not projects_to_sync: | 
| 2516 | break | 2519 | break | 
| 2517 | 2520 | ||
| 2518 | pending_relpaths = {p.relpath for p in projects_to_sync} | 2521 | pending_relpaths = { | 
| 2519 | if previously_pending_relpaths == pending_relpaths: | 2522 | p.relpath for p in projects_to_sync | 
| 2520 | stalled_projects_str = "\n".join( | 2523 | } | 
| 2521 | f" - {path}" | 2524 | if previously_pending_relpaths == pending_relpaths: | 
| 2522 | for path in sorted(list(pending_relpaths)) | 2525 | stalled_projects_str = "\n".join( | 
| 2523 | ) | 2526 | f" - {path}" | 
| 2524 | logger.error( | 2527 | for path in sorted(list(pending_relpaths)) | 
| 2525 | "The following projects failed and could not " | 2528 | ) | 
| 2526 | "be synced:\n%s", | 2529 | logger.error( | 
| 2527 | stalled_projects_str, | 2530 | "The following projects failed and could " | 
| 2528 | ) | 2531 | "not be synced:\n%s", | 
| 2529 | err_event.set() | 2532 | stalled_projects_str, | 
| 2530 | break | ||
| 2531 | previously_pending_relpaths = pending_relpaths | ||
| 2532 | |||
| 2533 | self.get_parallel_context()[ | ||
| 2534 | "projects" | ||
| 2535 | ] = projects_to_sync | ||
| 2536 | project_index_map = { | ||
| 2537 | p: i for i, p in enumerate(projects_to_sync) | ||
| 2538 | } | ||
| 2539 | |||
| 2540 | # Inner loop to process projects in a hierarchical | ||
| 2541 | # order. This iterates through levels of project | ||
| 2542 | # dependencies (e.g. 'foo' then 'foo/bar'). All projects | ||
| 2543 | # in one level can be processed in parallel, but we must | ||
| 2544 | # wait for a level to complete before starting the next. | ||
| 2545 | for level_projects in _SafeCheckoutOrder( | ||
| 2546 | projects_to_sync | ||
| 2547 | ): | ||
| 2548 | if not level_projects: | ||
| 2549 | continue | ||
| 2550 | |||
| 2551 | objdir_project_map = collections.defaultdict(list) | ||
| 2552 | for p in level_projects: | ||
| 2553 | objdir_project_map[p.objdir].append( | ||
| 2554 | project_index_map[p] | ||
| 2555 | ) | 2533 | ) | 
| 2556 | |||
| 2557 | work_items = list(objdir_project_map.values()) | ||
| 2558 | if not work_items: | ||
| 2559 | continue | ||
| 2560 | |||
| 2561 | jobs = max(1, min(opt.jobs, len(work_items))) | ||
| 2562 | callback = functools.partial( | ||
| 2563 | self._ProcessSyncInterleavedResults, | ||
| 2564 | finished_relpaths, | ||
| 2565 | err_event, | ||
| 2566 | errors, | ||
| 2567 | opt, | ||
| 2568 | ) | ||
| 2569 | if not self.ExecuteInParallel( | ||
| 2570 | jobs, | ||
| 2571 | functools.partial(self._SyncProjectList, opt), | ||
| 2572 | work_items, | ||
| 2573 | callback=callback, | ||
| 2574 | output=pm, | ||
| 2575 | chunksize=1, | ||
| 2576 | initializer=self.InitWorker, | ||
| 2577 | ): | ||
| 2578 | err_event.set() | 2534 | err_event.set() | 
| 2535 | break | ||
| 2536 | previously_pending_relpaths = pending_relpaths | ||
| 2537 | |||
| 2538 | self.get_parallel_context()[ | ||
| 2539 | "projects" | ||
| 2540 | ] = projects_to_sync | ||
| 2541 | project_index_map = { | ||
| 2542 | p: i for i, p in enumerate(projects_to_sync) | ||
| 2543 | } | ||
| 2544 | |||
| 2545 | # Inner loop to process projects in a hierarchical | ||
| 2546 | # order. This iterates through levels of project | ||
| 2547 | # dependencies (e.g. 'foo' then 'foo/bar'). All | ||
| 2548 | # projects in one level can be processed in | ||
| 2549 | # parallel, but we must wait for a level to complete | ||
| 2550 | # before starting the next. | ||
| 2551 | for level_projects in _SafeCheckoutOrder( | ||
| 2552 | projects_to_sync | ||
| 2553 | ): | ||
| 2554 | if not level_projects: | ||
| 2555 | continue | ||
| 2579 | 2556 | ||
| 2580 | if err_event.is_set() and opt.fail_fast: | 2557 | objdir_project_map = collections.defaultdict( | 
| 2581 | raise SyncFailFastError(aggregate_errors=errors) | 2558 | list | 
| 2582 | 2559 | ) | |
| 2583 | self._ReloadManifest(None, manifest) | 2560 | for p in level_projects: | 
| 2584 | project_list = self.GetProjects( | 2561 | objdir_project_map[p.objdir].append( | 
| 2585 | args, | 2562 | project_index_map[p] | 
| 2586 | missing_ok=True, | 2563 | ) | 
| 2587 | submodules_ok=opt.fetch_submodules, | 2564 | |
| 2588 | manifest=manifest, | 2565 | work_items = list(objdir_project_map.values()) | 
| 2589 | all_manifests=not opt.this_manifest_only, | 2566 | if not work_items: | 
| 2590 | ) | 2567 | continue | 
| 2591 | pm.update_total(len(project_list)) | 2568 | |
| 2592 | finally: | 2569 | jobs = max(1, min(opt.jobs, len(work_items))) | 
| 2593 | sync_event.set() | 2570 | callback = functools.partial( | 
| 2594 | sync_progress_thread.join() | 2571 | self._ProcessSyncInterleavedResults, | 
| 2572 | finished_relpaths, | ||
| 2573 | err_event, | ||
| 2574 | errors, | ||
| 2575 | opt, | ||
| 2576 | ) | ||
| 2577 | if not self.ExecuteInParallel( | ||
| 2578 | jobs, | ||
| 2579 | functools.partial( | ||
| 2580 | self._SyncProjectList, opt | ||
| 2581 | ), | ||
| 2582 | work_items, | ||
| 2583 | callback=callback, | ||
| 2584 | output=pm, | ||
| 2585 | chunksize=1, | ||
| 2586 | initializer=self.InitWorker, | ||
| 2587 | ): | ||
| 2588 | err_event.set() | ||
| 2589 | |||
| 2590 | if err_event.is_set() and opt.fail_fast: | ||
| 2591 | raise SyncFailFastError( | ||
| 2592 | aggregate_errors=errors | ||
| 2593 | ) | ||
| 2594 | |||
| 2595 | self._ReloadManifest(None, manifest) | ||
| 2596 | project_list = self.GetProjects( | ||
| 2597 | args, | ||
| 2598 | missing_ok=True, | ||
| 2599 | submodules_ok=opt.fetch_submodules, | ||
| 2600 | manifest=manifest, | ||
| 2601 | all_manifests=not opt.this_manifest_only, | ||
| 2602 | ) | ||
| 2603 | pm.update_total(len(project_list)) | ||
| 2604 | finally: | ||
| 2605 | sync_event.set() | ||
| 2606 | sync_progress_thread.join() | ||
| 2607 | finally: | ||
| 2608 | self._fetch_times.Save() | ||
| 2609 | self._local_sync_state.Save() | ||
| 2595 | 2610 | ||
| 2596 | pm.end() | 2611 | pm.end() | 
| 2597 | 2612 | ||
| @@ -2695,17 +2710,19 @@ class _FetchTimes: | |||
| 2695 | self._saved = {} | 2710 | self._saved = {} | 
| 2696 | 2711 | ||
| 2697 | def Save(self): | 2712 | def Save(self): | 
| 2698 | if self._saved is None: | 2713 | if not self._seen: | 
| 2699 | return | 2714 | return | 
| 2700 | 2715 | ||
| 2716 | self._Load() | ||
| 2717 | |||
| 2701 | for name, t in self._seen.items(): | 2718 | for name, t in self._seen.items(): | 
| 2702 | # Keep a moving average across the previous/current sync runs. | 2719 | # Keep a moving average across the previous/current sync runs. | 
| 2703 | old = self._saved.get(name, t) | 2720 | old = self._saved.get(name, t) | 
| 2704 | self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) | 2721 | self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) | 
| 2705 | 2722 | ||
| 2706 | try: | 2723 | try: | 
| 2707 | with open(self._path, "w") as f: | 2724 | with open(self._path, "w") as f: | 
| 2708 | json.dump(self._seen, f, indent=2) | 2725 | json.dump(self._saved, f, indent=2) | 
| 2709 | except (OSError, TypeError): | 2726 | except (OSError, TypeError): | 
| 2710 | platform_utils.remove(self._path, missing_ok=True) | 2727 | platform_utils.remove(self._path, missing_ok=True) | 
| 2711 | 2728 | ||
| diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 6c9cc9ab..6eb8a5a7 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
| @@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase): | |||
| 681 | # Mock _GetCurrentBranchOnly for worker tests. | 681 | # Mock _GetCurrentBranchOnly for worker tests. | 
| 682 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() | 682 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() | 
| 683 | 683 | ||
| 684 | self.cmd._fetch_times = mock.Mock() | ||
| 685 | self.cmd._local_sync_state = mock.Mock() | ||
| 686 | |||
| 684 | def tearDown(self): | 687 | def tearDown(self): | 
| 685 | """Clean up resources.""" | 688 | """Clean up resources.""" | 
| 686 | shutil.rmtree(self.repodir) | 689 | shutil.rmtree(self.repodir) | 
