diff options
Diffstat (limited to 'subcmds')
| -rw-r--r-- | subcmds/sync.py | 171 | 
1 files changed, 88 insertions, 83 deletions
| diff --git a/subcmds/sync.py b/subcmds/sync.py index bebe18b9..00fee776 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -141,7 +141,7 @@ class _FetchOneResult(NamedTuple): | |||
| 141 | 141 | ||
| 142 | Attributes: | 142 | Attributes: | 
| 143 | success (bool): True if successful. | 143 | success (bool): True if successful. | 
| 144 | project (Project): The fetched project. | 144 | project_idx (int): The fetched project index. | 
| 145 | start (float): The starting time.time(). | 145 | start (float): The starting time.time(). | 
| 146 | finish (float): The ending time.time(). | 146 | finish (float): The ending time.time(). | 
| 147 | remote_fetched (bool): True if the remote was actually queried. | 147 | remote_fetched (bool): True if the remote was actually queried. | 
| @@ -149,7 +149,7 @@ class _FetchOneResult(NamedTuple): | |||
| 149 | 149 | ||
| 150 | success: bool | 150 | success: bool | 
| 151 | errors: List[Exception] | 151 | errors: List[Exception] | 
| 152 | project: Project | 152 | project_idx: int | 
| 153 | start: float | 153 | start: float | 
| 154 | finish: float | 154 | finish: float | 
| 155 | remote_fetched: bool | 155 | remote_fetched: bool | 
| @@ -182,14 +182,14 @@ class _CheckoutOneResult(NamedTuple): | |||
| 182 | 182 | ||
| 183 | Attributes: | 183 | Attributes: | 
| 184 | success (bool): True if successful. | 184 | success (bool): True if successful. | 
| 185 | project (Project): The project. | 185 | project_idx (int): The project index. | 
| 186 | start (float): The starting time.time(). | 186 | start (float): The starting time.time(). | 
| 187 | finish (float): The ending time.time(). | 187 | finish (float): The ending time.time(). | 
| 188 | """ | 188 | """ | 
| 189 | 189 | ||
| 190 | success: bool | 190 | success: bool | 
| 191 | errors: List[Exception] | 191 | errors: List[Exception] | 
| 192 | project: Project | 192 | project_idx: int | 
| 193 | start: float | 193 | start: float | 
| 194 | finish: float | 194 | finish: float | 
| 195 | 195 | ||
| @@ -592,7 +592,8 @@ later is required to fix a server side protocol bug. | |||
| 592 | branch = branch[len(R_HEADS) :] | 592 | branch = branch[len(R_HEADS) :] | 
| 593 | return branch | 593 | return branch | 
| 594 | 594 | ||
| 595 | def _GetCurrentBranchOnly(self, opt, manifest): | 595 | @classmethod | 
| 596 | def _GetCurrentBranchOnly(cls, opt, manifest): | ||
| 596 | """Returns whether current-branch or use-superproject options are | 597 | """Returns whether current-branch or use-superproject options are | 
| 597 | enabled. | 598 | enabled. | 
| 598 | 599 | ||
| @@ -710,7 +711,8 @@ later is required to fix a server side protocol bug. | |||
| 710 | if need_unload: | 711 | if need_unload: | 
| 711 | m.outer_client.manifest.Unload() | 712 | m.outer_client.manifest.Unload() | 
| 712 | 713 | ||
| 713 | def _FetchProjectList(self, opt, projects): | 714 | @classmethod | 
| 715 | def _FetchProjectList(cls, opt, projects): | ||
| 714 | """Main function of the fetch worker. | 716 | """Main function of the fetch worker. | 
| 715 | 717 | ||
| 716 | The projects we're given share the same underlying git object store, so | 718 | The projects we're given share the same underlying git object store, so | 
| @@ -722,21 +724,23 @@ later is required to fix a server side protocol bug. | |||
| 722 | opt: Program options returned from optparse. See _Options(). | 724 | opt: Program options returned from optparse. See _Options(). | 
| 723 | projects: Projects to fetch. | 725 | projects: Projects to fetch. | 
| 724 | """ | 726 | """ | 
| 725 | return [self._FetchOne(opt, x) for x in projects] | 727 | return [cls._FetchOne(opt, x) for x in projects] | 
| 726 | 728 | ||
| 727 | def _FetchOne(self, opt, project): | 729 | @classmethod | 
| 730 | def _FetchOne(cls, opt, project_idx): | ||
| 728 | """Fetch git objects for a single project. | 731 | """Fetch git objects for a single project. | 
| 729 | 732 | ||
| 730 | Args: | 733 | Args: | 
| 731 | opt: Program options returned from optparse. See _Options(). | 734 | opt: Program options returned from optparse. See _Options(). | 
| 732 | project: Project object for the project to fetch. | 735 | project_idx: Project index for the project to fetch. | 
| 733 | 736 | ||
| 734 | Returns: | 737 | Returns: | 
| 735 | Whether the fetch was successful. | 738 | Whether the fetch was successful. | 
| 736 | """ | 739 | """ | 
| 740 | project = cls.get_parallel_context()["projects"][project_idx] | ||
| 737 | start = time.time() | 741 | start = time.time() | 
| 738 | k = f"{project.name} @ {project.relpath}" | 742 | k = f"{project.name} @ {project.relpath}" | 
| 739 | self._sync_dict[k] = start | 743 | cls.get_parallel_context()["sync_dict"][k] = start | 
| 740 | success = False | 744 | success = False | 
| 741 | remote_fetched = False | 745 | remote_fetched = False | 
| 742 | errors = [] | 746 | errors = [] | 
| @@ -746,7 +750,7 @@ later is required to fix a server side protocol bug. | |||
| 746 | quiet=opt.quiet, | 750 | quiet=opt.quiet, | 
| 747 | verbose=opt.verbose, | 751 | verbose=opt.verbose, | 
| 748 | output_redir=buf, | 752 | output_redir=buf, | 
| 749 | current_branch_only=self._GetCurrentBranchOnly( | 753 | current_branch_only=cls._GetCurrentBranchOnly( | 
| 750 | opt, project.manifest | 754 | opt, project.manifest | 
| 751 | ), | 755 | ), | 
| 752 | force_sync=opt.force_sync, | 756 | force_sync=opt.force_sync, | 
| @@ -756,7 +760,7 @@ later is required to fix a server side protocol bug. | |||
| 756 | optimized_fetch=opt.optimized_fetch, | 760 | optimized_fetch=opt.optimized_fetch, | 
| 757 | retry_fetches=opt.retry_fetches, | 761 | retry_fetches=opt.retry_fetches, | 
| 758 | prune=opt.prune, | 762 | prune=opt.prune, | 
| 759 | ssh_proxy=self.ssh_proxy, | 763 | ssh_proxy=cls.get_parallel_context()["ssh_proxy"], | 
| 760 | clone_filter=project.manifest.CloneFilter, | 764 | clone_filter=project.manifest.CloneFilter, | 
| 761 | partial_clone_exclude=project.manifest.PartialCloneExclude, | 765 | partial_clone_exclude=project.manifest.PartialCloneExclude, | 
| 762 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, | 766 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, | 
| @@ -788,24 +792,20 @@ later is required to fix a server side protocol bug. | |||
| 788 | type(e).__name__, | 792 | type(e).__name__, | 
| 789 | e, | 793 | e, | 
| 790 | ) | 794 | ) | 
| 791 | del self._sync_dict[k] | ||
| 792 | errors.append(e) | 795 | errors.append(e) | 
| 793 | raise | 796 | raise | 
| 797 | finally: | ||
| 798 | del cls.get_parallel_context()["sync_dict"][k] | ||
| 794 | 799 | ||
| 795 | finish = time.time() | 800 | finish = time.time() | 
| 796 | del self._sync_dict[k] | ||
| 797 | return _FetchOneResult( | 801 | return _FetchOneResult( | 
| 798 | success, errors, project, start, finish, remote_fetched | 802 | success, errors, project_idx, start, finish, remote_fetched | 
| 799 | ) | 803 | ) | 
| 800 | 804 | ||
| 801 | @classmethod | ||
| 802 | def _FetchInitChild(cls, ssh_proxy): | ||
| 803 | cls.ssh_proxy = ssh_proxy | ||
| 804 | |||
| 805 | def _GetSyncProgressMessage(self): | 805 | def _GetSyncProgressMessage(self): | 
| 806 | earliest_time = float("inf") | 806 | earliest_time = float("inf") | 
| 807 | earliest_proj = None | 807 | earliest_proj = None | 
| 808 | items = self._sync_dict.items() | 808 | items = self.get_parallel_context()["sync_dict"].items() | 
| 809 | for project, t in items: | 809 | for project, t in items: | 
| 810 | if t < earliest_time: | 810 | if t < earliest_time: | 
| 811 | earliest_time = t | 811 | earliest_time = t | 
| @@ -813,7 +813,7 @@ later is required to fix a server side protocol bug. | |||
| 813 | 813 | ||
| 814 | if not earliest_proj: | 814 | if not earliest_proj: | 
| 815 | # This function is called when sync is still running but in some | 815 | # This function is called when sync is still running but in some | 
| 816 | # cases (by chance), _sync_dict can contain no entries. Return some | 816 | # cases (by chance), sync_dict can contain no entries. Return some | 
| 817 | # text to indicate that sync is still working. | 817 | # text to indicate that sync is still working. | 
| 818 | return "..working.." | 818 | return "..working.." | 
| 819 | 819 | ||
| @@ -835,7 +835,6 @@ later is required to fix a server side protocol bug. | |||
| 835 | elide=True, | 835 | elide=True, | 
| 836 | ) | 836 | ) | 
| 837 | 837 | ||
| 838 | self._sync_dict = multiprocessing.Manager().dict() | ||
| 839 | sync_event = _threading.Event() | 838 | sync_event = _threading.Event() | 
| 840 | 839 | ||
| 841 | def _MonitorSyncLoop(): | 840 | def _MonitorSyncLoop(): | 
| @@ -846,21 +845,13 @@ later is required to fix a server side protocol bug. | |||
| 846 | 845 | ||
| 847 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) | 846 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) | 
| 848 | sync_progress_thread.daemon = True | 847 | sync_progress_thread.daemon = True | 
| 849 | sync_progress_thread.start() | ||
| 850 | |||
| 851 | objdir_project_map = dict() | ||
| 852 | for project in projects: | ||
| 853 | objdir_project_map.setdefault(project.objdir, []).append(project) | ||
| 854 | projects_list = list(objdir_project_map.values()) | ||
| 855 | 848 | ||
| 856 | jobs = min(opt.jobs_network, len(projects_list)) | 849 | def _ProcessResults(pool, pm, results_sets): | 
| 857 | |||
| 858 | def _ProcessResults(results_sets): | ||
| 859 | ret = True | 850 | ret = True | 
| 860 | for results in results_sets: | 851 | for results in results_sets: | 
| 861 | for result in results: | 852 | for result in results: | 
| 862 | success = result.success | 853 | success = result.success | 
| 863 | project = result.project | 854 | project = projects[result.project_idx] | 
| 864 | start = result.start | 855 | start = result.start | 
| 865 | finish = result.finish | 856 | finish = result.finish | 
| 866 | self._fetch_times.Set(project, finish - start) | 857 | self._fetch_times.Set(project, finish - start) | 
| @@ -884,45 +875,49 @@ later is required to fix a server side protocol bug. | |||
| 884 | fetched.add(project.gitdir) | 875 | fetched.add(project.gitdir) | 
| 885 | pm.update() | 876 | pm.update() | 
| 886 | if not ret and opt.fail_fast: | 877 | if not ret and opt.fail_fast: | 
| 878 | if pool: | ||
| 879 | pool.close() | ||
| 887 | break | 880 | break | 
| 888 | return ret | 881 | return ret | 
| 889 | 882 | ||
| 890 | # We pass the ssh proxy settings via the class. This allows | 883 | with self.ParallelContext(): | 
| 891 | # multiprocessing to pickle it up when spawning children. We can't pass | 884 | self.get_parallel_context()["projects"] = projects | 
| 892 | # it as an argument to _FetchProjectList below as multiprocessing is | 885 | self.get_parallel_context()[ | 
| 893 | # unable to pickle those. | 886 | "sync_dict" | 
| 894 | Sync.ssh_proxy = None | 887 | ] = multiprocessing.Manager().dict() | 
| 895 | 888 | ||
| 896 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 889 | objdir_project_map = dict() | 
| 897 | if jobs == 1: | 890 | for index, project in enumerate(projects): | 
| 898 | self._FetchInitChild(ssh_proxy) | 891 | objdir_project_map.setdefault(project.objdir, []).append(index) | 
| 899 | if not _ProcessResults( | 892 | projects_list = list(objdir_project_map.values()) | 
| 900 | self._FetchProjectList(opt, x) for x in projects_list | 893 | |
| 901 | ): | 894 | jobs = min(opt.jobs_network, len(projects_list)) | 
| 902 | ret = False | 895 | |
| 903 | else: | 896 | # We pass the ssh proxy settings via the class. This allows | 
| 897 | # multiprocessing to pickle it up when spawning children. We can't | ||
| 898 | # pass it as an argument to _FetchProjectList below as | ||
| 899 | # multiprocessing is unable to pickle those. | ||
| 900 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | ||
| 901 | |||
| 902 | sync_progress_thread.start() | ||
| 904 | if not opt.quiet: | 903 | if not opt.quiet: | 
| 905 | pm.update(inc=0, msg="warming up") | 904 | pm.update(inc=0, msg="warming up") | 
| 906 | with multiprocessing.Pool( | 905 | try: | 
| 907 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) | 906 | ret = self.ExecuteInParallel( | 
| 908 | ) as pool: | 907 | jobs, | 
| 909 | results = pool.imap_unordered( | ||
| 910 | functools.partial(self._FetchProjectList, opt), | 908 | functools.partial(self._FetchProjectList, opt), | 
| 911 | projects_list, | 909 | projects_list, | 
| 912 | chunksize=_chunksize(len(projects_list), jobs), | 910 | callback=_ProcessResults, | 
| 911 | output=pm, | ||
| 912 | # Use chunksize=1 to avoid the chance that some workers are | ||
| 913 | # idle while other workers still have more than one job in | ||
| 914 | # their chunk queue. | ||
| 915 | chunksize=1, | ||
| 913 | ) | 916 | ) | 
| 914 | if not _ProcessResults(results): | 917 | finally: | 
| 915 | ret = False | 918 | sync_event.set() | 
| 916 | pool.close() | 919 | sync_progress_thread.join() | 
| 917 | |||
| 918 | # Cleanup the reference now that we're done with it, and we're going to | ||
| 919 | # release any resources it points to. If we don't, later | ||
| 920 | # multiprocessing usage (e.g. checkouts) will try to pickle and then | ||
| 921 | # crash. | ||
| 922 | del Sync.ssh_proxy | ||
| 923 | 920 | ||
| 924 | sync_event.set() | ||
| 925 | pm.end() | ||
| 926 | self._fetch_times.Save() | 921 | self._fetch_times.Save() | 
| 927 | self._local_sync_state.Save() | 922 | self._local_sync_state.Save() | 
| 928 | 923 | ||
| @@ -1008,14 +1003,15 @@ later is required to fix a server side protocol bug. | |||
| 1008 | 1003 | ||
| 1009 | return _FetchMainResult(all_projects) | 1004 | return _FetchMainResult(all_projects) | 
| 1010 | 1005 | ||
| 1006 | @classmethod | ||
| 1011 | def _CheckoutOne( | 1007 | def _CheckoutOne( | 
| 1012 | self, | 1008 | cls, | 
| 1013 | detach_head, | 1009 | detach_head, | 
| 1014 | force_sync, | 1010 | force_sync, | 
| 1015 | force_checkout, | 1011 | force_checkout, | 
| 1016 | force_rebase, | 1012 | force_rebase, | 
| 1017 | verbose, | 1013 | verbose, | 
| 1018 | project, | 1014 | project_idx, | 
| 1019 | ): | 1015 | ): | 
| 1020 | """Checkout work tree for one project | 1016 | """Checkout work tree for one project | 
| 1021 | 1017 | ||
| @@ -1027,11 +1023,12 @@ later is required to fix a server side protocol bug. | |||
| 1027 | force_checkout: Force checking out of the repo content. | 1023 | force_checkout: Force checking out of the repo content. | 
| 1028 | force_rebase: Force rebase. | 1024 | force_rebase: Force rebase. | 
| 1029 | verbose: Whether to show verbose messages. | 1025 | verbose: Whether to show verbose messages. | 
| 1030 | project: Project object for the project to checkout. | 1026 | project_idx: Project index for the project to checkout. | 
| 1031 | 1027 | ||
| 1032 | Returns: | 1028 | Returns: | 
| 1033 | Whether the fetch was successful. | 1029 | Whether the fetch was successful. | 
| 1034 | """ | 1030 | """ | 
| 1031 | project = cls.get_parallel_context()["projects"][project_idx] | ||
| 1035 | start = time.time() | 1032 | start = time.time() | 
| 1036 | syncbuf = SyncBuffer( | 1033 | syncbuf = SyncBuffer( | 
| 1037 | project.manifest.manifestProject.config, detach_head=detach_head | 1034 | project.manifest.manifestProject.config, detach_head=detach_head | 
| @@ -1065,7 +1062,7 @@ later is required to fix a server side protocol bug. | |||
| 1065 | if not success: | 1062 | if not success: | 
| 1066 | logger.error("error: Cannot checkout %s", project.name) | 1063 | logger.error("error: Cannot checkout %s", project.name) | 
| 1067 | finish = time.time() | 1064 | finish = time.time() | 
| 1068 | return _CheckoutOneResult(success, errors, project, start, finish) | 1065 | return _CheckoutOneResult(success, errors, project_idx, start, finish) | 
| 1069 | 1066 | ||
| 1070 | def _Checkout(self, all_projects, opt, err_results, checkout_errors): | 1067 | def _Checkout(self, all_projects, opt, err_results, checkout_errors): | 
| 1071 | """Checkout projects listed in all_projects | 1068 | """Checkout projects listed in all_projects | 
| @@ -1083,7 +1080,9 @@ later is required to fix a server side protocol bug. | |||
| 1083 | ret = True | 1080 | ret = True | 
| 1084 | for result in results: | 1081 | for result in results: | 
| 1085 | success = result.success | 1082 | success = result.success | 
| 1086 | project = result.project | 1083 | project = self.get_parallel_context()["projects"][ | 
| 1084 | result.project_idx | ||
| 1085 | ] | ||
| 1087 | start = result.start | 1086 | start = result.start | 
| 1088 | finish = result.finish | 1087 | finish = result.finish | 
| 1089 | self.event_log.AddSync( | 1088 | self.event_log.AddSync( | 
| @@ -1110,22 +1109,28 @@ later is required to fix a server side protocol bug. | |||
| 1110 | return ret | 1109 | return ret | 
| 1111 | 1110 | ||
| 1112 | for projects in _SafeCheckoutOrder(all_projects): | 1111 | for projects in _SafeCheckoutOrder(all_projects): | 
| 1113 | proc_res = self.ExecuteInParallel( | 1112 | with self.ParallelContext(): | 
| 1114 | opt.jobs_checkout, | 1113 | self.get_parallel_context()["projects"] = projects | 
| 1115 | functools.partial( | 1114 | proc_res = self.ExecuteInParallel( | 
| 1116 | self._CheckoutOne, | 1115 | opt.jobs_checkout, | 
| 1117 | opt.detach_head, | 1116 | functools.partial( | 
| 1118 | opt.force_sync, | 1117 | self._CheckoutOne, | 
| 1119 | opt.force_checkout, | 1118 | opt.detach_head, | 
| 1120 | opt.rebase, | 1119 | opt.force_sync, | 
| 1121 | opt.verbose, | 1120 | opt.force_checkout, | 
| 1122 | ), | 1121 | opt.rebase, | 
| 1123 | projects, | 1122 | opt.verbose, | 
| 1124 | callback=_ProcessResults, | 1123 | ), | 
| 1125 | output=Progress( | 1124 | range(len(projects)), | 
| 1126 | "Checking out", len(all_projects), quiet=opt.quiet | 1125 | callback=_ProcessResults, | 
| 1127 | ), | 1126 | output=Progress( | 
| 1128 | ) | 1127 | "Checking out", len(all_projects), quiet=opt.quiet | 
| 1128 | ), | ||
| 1129 | # Use chunksize=1 to avoid the chance that some workers are | ||
| 1130 | # idle while other workers still have more than one job in | ||
| 1131 | # their chunk queue. | ||
| 1132 | chunksize=1, | ||
| 1133 | ) | ||
| 1129 | 1134 | ||
| 1130 | self._local_sync_state.Save() | 1135 | self._local_sync_state.Save() | 
| 1131 | return proc_res and not err_results | 1136 | return proc_res and not err_results | 
