diff options
| -rw-r--r-- | command.py | 50 | ||||
| -rw-r--r-- | subcmds/sync.py | 171 |
2 files changed, 134 insertions, 87 deletions
| @@ -12,6 +12,7 @@ | |||
| 12 | # See the License for the specific language governing permissions and | 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import contextlib | ||
| 15 | import multiprocessing | 16 | import multiprocessing |
| 16 | import optparse | 17 | import optparse |
| 17 | import os | 18 | import os |
| @@ -70,6 +71,14 @@ class Command: | |||
| 70 | # migrated subcommands can set it to False. | 71 | # migrated subcommands can set it to False. |
| 71 | MULTI_MANIFEST_SUPPORT = True | 72 | MULTI_MANIFEST_SUPPORT = True |
| 72 | 73 | ||
| 74 | # Shared data across parallel execution workers. | ||
| 75 | _parallel_context = None | ||
| 76 | |||
| 77 | @classmethod | ||
| 78 | def get_parallel_context(cls): | ||
| 79 | assert cls._parallel_context is not None | ||
| 80 | return cls._parallel_context | ||
| 81 | |||
| 73 | def __init__( | 82 | def __init__( |
| 74 | self, | 83 | self, |
| 75 | repodir=None, | 84 | repodir=None, |
| @@ -242,9 +251,36 @@ class Command: | |||
| 242 | """Perform the action, after option parsing is complete.""" | 251 | """Perform the action, after option parsing is complete.""" |
| 243 | raise NotImplementedError | 252 | raise NotImplementedError |
| 244 | 253 | ||
| 245 | @staticmethod | 254 | @classmethod |
| 255 | @contextlib.contextmanager | ||
| 256 | def ParallelContext(cls): | ||
| 257 | """Obtains the context, which is shared to ExecuteInParallel workers. | ||
| 258 | |||
| 259 | Callers can store data in the context dict before invocation of | ||
| 260 | ExecuteInParallel. The dict will then be shared to child workers of | ||
| 261 | ExecuteInParallel. | ||
| 262 | """ | ||
| 263 | assert cls._parallel_context is None | ||
| 264 | cls._parallel_context = {} | ||
| 265 | try: | ||
| 266 | yield | ||
| 267 | finally: | ||
| 268 | cls._parallel_context = None | ||
| 269 | |||
| 270 | @classmethod | ||
| 271 | def _SetParallelContext(cls, context): | ||
| 272 | cls._parallel_context = context | ||
| 273 | |||
| 274 | @classmethod | ||
| 246 | def ExecuteInParallel( | 275 | def ExecuteInParallel( |
| 247 | jobs, func, inputs, callback, output=None, ordered=False | 276 | cls, |
| 277 | jobs, | ||
| 278 | func, | ||
| 279 | inputs, | ||
| 280 | callback, | ||
| 281 | output=None, | ||
| 282 | ordered=False, | ||
| 283 | chunksize=WORKER_BATCH_SIZE, | ||
| 248 | ): | 284 | ): |
| 249 | """Helper for managing parallel execution boiler plate. | 285 | """Helper for managing parallel execution boiler plate. |
| 250 | 286 | ||
| @@ -269,6 +305,8 @@ class Command: | |||
| 269 | output: An output manager. May be progress.Progess or | 305 | output: An output manager. May be progress.Progess or |
| 270 | color.Coloring. | 306 | color.Coloring. |
| 271 | ordered: Whether the jobs should be processed in order. | 307 | ordered: Whether the jobs should be processed in order. |
| 308 | chunksize: The number of jobs processed in batch by parallel | ||
| 309 | workers. | ||
| 272 | 310 | ||
| 273 | Returns: | 311 | Returns: |
| 274 | The |callback| function's results are returned. | 312 | The |callback| function's results are returned. |
| @@ -278,12 +316,16 @@ class Command: | |||
| 278 | if len(inputs) == 1 or jobs == 1: | 316 | if len(inputs) == 1 or jobs == 1: |
| 279 | return callback(None, output, (func(x) for x in inputs)) | 317 | return callback(None, output, (func(x) for x in inputs)) |
| 280 | else: | 318 | else: |
| 281 | with multiprocessing.Pool(jobs) as pool: | 319 | with multiprocessing.Pool( |
| 320 | jobs, | ||
| 321 | initializer=cls._SetParallelContext, | ||
| 322 | initargs=(cls._parallel_context,), | ||
| 323 | ) as pool: | ||
| 282 | submit = pool.imap if ordered else pool.imap_unordered | 324 | submit = pool.imap if ordered else pool.imap_unordered |
| 283 | return callback( | 325 | return callback( |
| 284 | pool, | 326 | pool, |
| 285 | output, | 327 | output, |
| 286 | submit(func, inputs, chunksize=WORKER_BATCH_SIZE), | 328 | submit(func, inputs, chunksize=chunksize), |
| 287 | ) | 329 | ) |
| 288 | finally: | 330 | finally: |
| 289 | if isinstance(output, progress.Progress): | 331 | if isinstance(output, progress.Progress): |
diff --git a/subcmds/sync.py b/subcmds/sync.py index bebe18b9..00fee776 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -141,7 +141,7 @@ class _FetchOneResult(NamedTuple): | |||
| 141 | 141 | ||
| 142 | Attributes: | 142 | Attributes: |
| 143 | success (bool): True if successful. | 143 | success (bool): True if successful. |
| 144 | project (Project): The fetched project. | 144 | project_idx (int): The fetched project index. |
| 145 | start (float): The starting time.time(). | 145 | start (float): The starting time.time(). |
| 146 | finish (float): The ending time.time(). | 146 | finish (float): The ending time.time(). |
| 147 | remote_fetched (bool): True if the remote was actually queried. | 147 | remote_fetched (bool): True if the remote was actually queried. |
| @@ -149,7 +149,7 @@ class _FetchOneResult(NamedTuple): | |||
| 149 | 149 | ||
| 150 | success: bool | 150 | success: bool |
| 151 | errors: List[Exception] | 151 | errors: List[Exception] |
| 152 | project: Project | 152 | project_idx: int |
| 153 | start: float | 153 | start: float |
| 154 | finish: float | 154 | finish: float |
| 155 | remote_fetched: bool | 155 | remote_fetched: bool |
| @@ -182,14 +182,14 @@ class _CheckoutOneResult(NamedTuple): | |||
| 182 | 182 | ||
| 183 | Attributes: | 183 | Attributes: |
| 184 | success (bool): True if successful. | 184 | success (bool): True if successful. |
| 185 | project (Project): The project. | 185 | project_idx (int): The project index. |
| 186 | start (float): The starting time.time(). | 186 | start (float): The starting time.time(). |
| 187 | finish (float): The ending time.time(). | 187 | finish (float): The ending time.time(). |
| 188 | """ | 188 | """ |
| 189 | 189 | ||
| 190 | success: bool | 190 | success: bool |
| 191 | errors: List[Exception] | 191 | errors: List[Exception] |
| 192 | project: Project | 192 | project_idx: int |
| 193 | start: float | 193 | start: float |
| 194 | finish: float | 194 | finish: float |
| 195 | 195 | ||
| @@ -592,7 +592,8 @@ later is required to fix a server side protocol bug. | |||
| 592 | branch = branch[len(R_HEADS) :] | 592 | branch = branch[len(R_HEADS) :] |
| 593 | return branch | 593 | return branch |
| 594 | 594 | ||
| 595 | def _GetCurrentBranchOnly(self, opt, manifest): | 595 | @classmethod |
| 596 | def _GetCurrentBranchOnly(cls, opt, manifest): | ||
| 596 | """Returns whether current-branch or use-superproject options are | 597 | """Returns whether current-branch or use-superproject options are |
| 597 | enabled. | 598 | enabled. |
| 598 | 599 | ||
| @@ -710,7 +711,8 @@ later is required to fix a server side protocol bug. | |||
| 710 | if need_unload: | 711 | if need_unload: |
| 711 | m.outer_client.manifest.Unload() | 712 | m.outer_client.manifest.Unload() |
| 712 | 713 | ||
| 713 | def _FetchProjectList(self, opt, projects): | 714 | @classmethod |
| 715 | def _FetchProjectList(cls, opt, projects): | ||
| 714 | """Main function of the fetch worker. | 716 | """Main function of the fetch worker. |
| 715 | 717 | ||
| 716 | The projects we're given share the same underlying git object store, so | 718 | The projects we're given share the same underlying git object store, so |
| @@ -722,21 +724,23 @@ later is required to fix a server side protocol bug. | |||
| 722 | opt: Program options returned from optparse. See _Options(). | 724 | opt: Program options returned from optparse. See _Options(). |
| 723 | projects: Projects to fetch. | 725 | projects: Projects to fetch. |
| 724 | """ | 726 | """ |
| 725 | return [self._FetchOne(opt, x) for x in projects] | 727 | return [cls._FetchOne(opt, x) for x in projects] |
| 726 | 728 | ||
| 727 | def _FetchOne(self, opt, project): | 729 | @classmethod |
| 730 | def _FetchOne(cls, opt, project_idx): | ||
| 728 | """Fetch git objects for a single project. | 731 | """Fetch git objects for a single project. |
| 729 | 732 | ||
| 730 | Args: | 733 | Args: |
| 731 | opt: Program options returned from optparse. See _Options(). | 734 | opt: Program options returned from optparse. See _Options(). |
| 732 | project: Project object for the project to fetch. | 735 | project_idx: Project index for the project to fetch. |
| 733 | 736 | ||
| 734 | Returns: | 737 | Returns: |
| 735 | Whether the fetch was successful. | 738 | Whether the fetch was successful. |
| 736 | """ | 739 | """ |
| 740 | project = cls.get_parallel_context()["projects"][project_idx] | ||
| 737 | start = time.time() | 741 | start = time.time() |
| 738 | k = f"{project.name} @ {project.relpath}" | 742 | k = f"{project.name} @ {project.relpath}" |
| 739 | self._sync_dict[k] = start | 743 | cls.get_parallel_context()["sync_dict"][k] = start |
| 740 | success = False | 744 | success = False |
| 741 | remote_fetched = False | 745 | remote_fetched = False |
| 742 | errors = [] | 746 | errors = [] |
| @@ -746,7 +750,7 @@ later is required to fix a server side protocol bug. | |||
| 746 | quiet=opt.quiet, | 750 | quiet=opt.quiet, |
| 747 | verbose=opt.verbose, | 751 | verbose=opt.verbose, |
| 748 | output_redir=buf, | 752 | output_redir=buf, |
| 749 | current_branch_only=self._GetCurrentBranchOnly( | 753 | current_branch_only=cls._GetCurrentBranchOnly( |
| 750 | opt, project.manifest | 754 | opt, project.manifest |
| 751 | ), | 755 | ), |
| 752 | force_sync=opt.force_sync, | 756 | force_sync=opt.force_sync, |
| @@ -756,7 +760,7 @@ later is required to fix a server side protocol bug. | |||
| 756 | optimized_fetch=opt.optimized_fetch, | 760 | optimized_fetch=opt.optimized_fetch, |
| 757 | retry_fetches=opt.retry_fetches, | 761 | retry_fetches=opt.retry_fetches, |
| 758 | prune=opt.prune, | 762 | prune=opt.prune, |
| 759 | ssh_proxy=self.ssh_proxy, | 763 | ssh_proxy=cls.get_parallel_context()["ssh_proxy"], |
| 760 | clone_filter=project.manifest.CloneFilter, | 764 | clone_filter=project.manifest.CloneFilter, |
| 761 | partial_clone_exclude=project.manifest.PartialCloneExclude, | 765 | partial_clone_exclude=project.manifest.PartialCloneExclude, |
| 762 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, | 766 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, |
| @@ -788,24 +792,20 @@ later is required to fix a server side protocol bug. | |||
| 788 | type(e).__name__, | 792 | type(e).__name__, |
| 789 | e, | 793 | e, |
| 790 | ) | 794 | ) |
| 791 | del self._sync_dict[k] | ||
| 792 | errors.append(e) | 795 | errors.append(e) |
| 793 | raise | 796 | raise |
| 797 | finally: | ||
| 798 | del cls.get_parallel_context()["sync_dict"][k] | ||
| 794 | 799 | ||
| 795 | finish = time.time() | 800 | finish = time.time() |
| 796 | del self._sync_dict[k] | ||
| 797 | return _FetchOneResult( | 801 | return _FetchOneResult( |
| 798 | success, errors, project, start, finish, remote_fetched | 802 | success, errors, project_idx, start, finish, remote_fetched |
| 799 | ) | 803 | ) |
| 800 | 804 | ||
| 801 | @classmethod | ||
| 802 | def _FetchInitChild(cls, ssh_proxy): | ||
| 803 | cls.ssh_proxy = ssh_proxy | ||
| 804 | |||
| 805 | def _GetSyncProgressMessage(self): | 805 | def _GetSyncProgressMessage(self): |
| 806 | earliest_time = float("inf") | 806 | earliest_time = float("inf") |
| 807 | earliest_proj = None | 807 | earliest_proj = None |
| 808 | items = self._sync_dict.items() | 808 | items = self.get_parallel_context()["sync_dict"].items() |
| 809 | for project, t in items: | 809 | for project, t in items: |
| 810 | if t < earliest_time: | 810 | if t < earliest_time: |
| 811 | earliest_time = t | 811 | earliest_time = t |
| @@ -813,7 +813,7 @@ later is required to fix a server side protocol bug. | |||
| 813 | 813 | ||
| 814 | if not earliest_proj: | 814 | if not earliest_proj: |
| 815 | # This function is called when sync is still running but in some | 815 | # This function is called when sync is still running but in some |
| 816 | # cases (by chance), _sync_dict can contain no entries. Return some | 816 | # cases (by chance), sync_dict can contain no entries. Return some |
| 817 | # text to indicate that sync is still working. | 817 | # text to indicate that sync is still working. |
| 818 | return "..working.." | 818 | return "..working.." |
| 819 | 819 | ||
| @@ -835,7 +835,6 @@ later is required to fix a server side protocol bug. | |||
| 835 | elide=True, | 835 | elide=True, |
| 836 | ) | 836 | ) |
| 837 | 837 | ||
| 838 | self._sync_dict = multiprocessing.Manager().dict() | ||
| 839 | sync_event = _threading.Event() | 838 | sync_event = _threading.Event() |
| 840 | 839 | ||
| 841 | def _MonitorSyncLoop(): | 840 | def _MonitorSyncLoop(): |
| @@ -846,21 +845,13 @@ later is required to fix a server side protocol bug. | |||
| 846 | 845 | ||
| 847 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) | 846 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) |
| 848 | sync_progress_thread.daemon = True | 847 | sync_progress_thread.daemon = True |
| 849 | sync_progress_thread.start() | ||
| 850 | |||
| 851 | objdir_project_map = dict() | ||
| 852 | for project in projects: | ||
| 853 | objdir_project_map.setdefault(project.objdir, []).append(project) | ||
| 854 | projects_list = list(objdir_project_map.values()) | ||
| 855 | 848 | ||
| 856 | jobs = min(opt.jobs_network, len(projects_list)) | 849 | def _ProcessResults(pool, pm, results_sets): |
| 857 | |||
| 858 | def _ProcessResults(results_sets): | ||
| 859 | ret = True | 850 | ret = True |
| 860 | for results in results_sets: | 851 | for results in results_sets: |
| 861 | for result in results: | 852 | for result in results: |
| 862 | success = result.success | 853 | success = result.success |
| 863 | project = result.project | 854 | project = projects[result.project_idx] |
| 864 | start = result.start | 855 | start = result.start |
| 865 | finish = result.finish | 856 | finish = result.finish |
| 866 | self._fetch_times.Set(project, finish - start) | 857 | self._fetch_times.Set(project, finish - start) |
| @@ -884,45 +875,49 @@ later is required to fix a server side protocol bug. | |||
| 884 | fetched.add(project.gitdir) | 875 | fetched.add(project.gitdir) |
| 885 | pm.update() | 876 | pm.update() |
| 886 | if not ret and opt.fail_fast: | 877 | if not ret and opt.fail_fast: |
| 878 | if pool: | ||
| 879 | pool.close() | ||
| 887 | break | 880 | break |
| 888 | return ret | 881 | return ret |
| 889 | 882 | ||
| 890 | # We pass the ssh proxy settings via the class. This allows | 883 | with self.ParallelContext(): |
| 891 | # multiprocessing to pickle it up when spawning children. We can't pass | 884 | self.get_parallel_context()["projects"] = projects |
| 892 | # it as an argument to _FetchProjectList below as multiprocessing is | 885 | self.get_parallel_context()[ |
| 893 | # unable to pickle those. | 886 | "sync_dict" |
| 894 | Sync.ssh_proxy = None | 887 | ] = multiprocessing.Manager().dict() |
| 895 | 888 | ||
| 896 | # NB: Multiprocessing is heavy, so don't spin it up for one job. | 889 | objdir_project_map = dict() |
| 897 | if jobs == 1: | 890 | for index, project in enumerate(projects): |
| 898 | self._FetchInitChild(ssh_proxy) | 891 | objdir_project_map.setdefault(project.objdir, []).append(index) |
| 899 | if not _ProcessResults( | 892 | projects_list = list(objdir_project_map.values()) |
| 900 | self._FetchProjectList(opt, x) for x in projects_list | 893 | |
| 901 | ): | 894 | jobs = min(opt.jobs_network, len(projects_list)) |
| 902 | ret = False | 895 | |
| 903 | else: | 896 | # We pass the ssh proxy settings via the class. This allows |
| 897 | # multiprocessing to pickle it up when spawning children. We can't | ||
| 898 | # pass it as an argument to _FetchProjectList below as | ||
| 899 | # multiprocessing is unable to pickle those. | ||
| 900 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy | ||
| 901 | |||
| 902 | sync_progress_thread.start() | ||
| 904 | if not opt.quiet: | 903 | if not opt.quiet: |
| 905 | pm.update(inc=0, msg="warming up") | 904 | pm.update(inc=0, msg="warming up") |
| 906 | with multiprocessing.Pool( | 905 | try: |
| 907 | jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,) | 906 | ret = self.ExecuteInParallel( |
| 908 | ) as pool: | 907 | jobs, |
| 909 | results = pool.imap_unordered( | ||
| 910 | functools.partial(self._FetchProjectList, opt), | 908 | functools.partial(self._FetchProjectList, opt), |
| 911 | projects_list, | 909 | projects_list, |
| 912 | chunksize=_chunksize(len(projects_list), jobs), | 910 | callback=_ProcessResults, |
| 911 | output=pm, | ||
| 912 | # Use chunksize=1 to avoid the chance that some workers are | ||
| 913 | # idle while other workers still have more than one job in | ||
| 914 | # their chunk queue. | ||
| 915 | chunksize=1, | ||
| 913 | ) | 916 | ) |
| 914 | if not _ProcessResults(results): | 917 | finally: |
| 915 | ret = False | 918 | sync_event.set() |
| 916 | pool.close() | 919 | sync_progress_thread.join() |
| 917 | |||
| 918 | # Cleanup the reference now that we're done with it, and we're going to | ||
| 919 | # release any resources it points to. If we don't, later | ||
| 920 | # multiprocessing usage (e.g. checkouts) will try to pickle and then | ||
| 921 | # crash. | ||
| 922 | del Sync.ssh_proxy | ||
| 923 | 920 | ||
| 924 | sync_event.set() | ||
| 925 | pm.end() | ||
| 926 | self._fetch_times.Save() | 921 | self._fetch_times.Save() |
| 927 | self._local_sync_state.Save() | 922 | self._local_sync_state.Save() |
| 928 | 923 | ||
| @@ -1008,14 +1003,15 @@ later is required to fix a server side protocol bug. | |||
| 1008 | 1003 | ||
| 1009 | return _FetchMainResult(all_projects) | 1004 | return _FetchMainResult(all_projects) |
| 1010 | 1005 | ||
| 1006 | @classmethod | ||
| 1011 | def _CheckoutOne( | 1007 | def _CheckoutOne( |
| 1012 | self, | 1008 | cls, |
| 1013 | detach_head, | 1009 | detach_head, |
| 1014 | force_sync, | 1010 | force_sync, |
| 1015 | force_checkout, | 1011 | force_checkout, |
| 1016 | force_rebase, | 1012 | force_rebase, |
| 1017 | verbose, | 1013 | verbose, |
| 1018 | project, | 1014 | project_idx, |
| 1019 | ): | 1015 | ): |
| 1020 | """Checkout work tree for one project | 1016 | """Checkout work tree for one project |
| 1021 | 1017 | ||
| @@ -1027,11 +1023,12 @@ later is required to fix a server side protocol bug. | |||
| 1027 | force_checkout: Force checking out of the repo content. | 1023 | force_checkout: Force checking out of the repo content. |
| 1028 | force_rebase: Force rebase. | 1024 | force_rebase: Force rebase. |
| 1029 | verbose: Whether to show verbose messages. | 1025 | verbose: Whether to show verbose messages. |
| 1030 | project: Project object for the project to checkout. | 1026 | project_idx: Project index for the project to checkout. |
| 1031 | 1027 | ||
| 1032 | Returns: | 1028 | Returns: |
| 1033 | Whether the fetch was successful. | 1029 | Whether the fetch was successful. |
| 1034 | """ | 1030 | """ |
| 1031 | project = cls.get_parallel_context()["projects"][project_idx] | ||
| 1035 | start = time.time() | 1032 | start = time.time() |
| 1036 | syncbuf = SyncBuffer( | 1033 | syncbuf = SyncBuffer( |
| 1037 | project.manifest.manifestProject.config, detach_head=detach_head | 1034 | project.manifest.manifestProject.config, detach_head=detach_head |
| @@ -1065,7 +1062,7 @@ later is required to fix a server side protocol bug. | |||
| 1065 | if not success: | 1062 | if not success: |
| 1066 | logger.error("error: Cannot checkout %s", project.name) | 1063 | logger.error("error: Cannot checkout %s", project.name) |
| 1067 | finish = time.time() | 1064 | finish = time.time() |
| 1068 | return _CheckoutOneResult(success, errors, project, start, finish) | 1065 | return _CheckoutOneResult(success, errors, project_idx, start, finish) |
| 1069 | 1066 | ||
| 1070 | def _Checkout(self, all_projects, opt, err_results, checkout_errors): | 1067 | def _Checkout(self, all_projects, opt, err_results, checkout_errors): |
| 1071 | """Checkout projects listed in all_projects | 1068 | """Checkout projects listed in all_projects |
| @@ -1083,7 +1080,9 @@ later is required to fix a server side protocol bug. | |||
| 1083 | ret = True | 1080 | ret = True |
| 1084 | for result in results: | 1081 | for result in results: |
| 1085 | success = result.success | 1082 | success = result.success |
| 1086 | project = result.project | 1083 | project = self.get_parallel_context()["projects"][ |
| 1084 | result.project_idx | ||
| 1085 | ] | ||
| 1087 | start = result.start | 1086 | start = result.start |
| 1088 | finish = result.finish | 1087 | finish = result.finish |
| 1089 | self.event_log.AddSync( | 1088 | self.event_log.AddSync( |
| @@ -1110,22 +1109,28 @@ later is required to fix a server side protocol bug. | |||
| 1110 | return ret | 1109 | return ret |
| 1111 | 1110 | ||
| 1112 | for projects in _SafeCheckoutOrder(all_projects): | 1111 | for projects in _SafeCheckoutOrder(all_projects): |
| 1113 | proc_res = self.ExecuteInParallel( | 1112 | with self.ParallelContext(): |
| 1114 | opt.jobs_checkout, | 1113 | self.get_parallel_context()["projects"] = projects |
| 1115 | functools.partial( | 1114 | proc_res = self.ExecuteInParallel( |
| 1116 | self._CheckoutOne, | 1115 | opt.jobs_checkout, |
| 1117 | opt.detach_head, | 1116 | functools.partial( |
| 1118 | opt.force_sync, | 1117 | self._CheckoutOne, |
| 1119 | opt.force_checkout, | 1118 | opt.detach_head, |
| 1120 | opt.rebase, | 1119 | opt.force_sync, |
| 1121 | opt.verbose, | 1120 | opt.force_checkout, |
| 1122 | ), | 1121 | opt.rebase, |
| 1123 | projects, | 1122 | opt.verbose, |
| 1124 | callback=_ProcessResults, | 1123 | ), |
| 1125 | output=Progress( | 1124 | range(len(projects)), |
| 1126 | "Checking out", len(all_projects), quiet=opt.quiet | 1125 | callback=_ProcessResults, |
| 1127 | ), | 1126 | output=Progress( |
| 1128 | ) | 1127 | "Checking out", len(all_projects), quiet=opt.quiet |
| 1128 | ), | ||
| 1129 | # Use chunksize=1 to avoid the chance that some workers are | ||
| 1130 | # idle while other workers still have more than one job in | ||
| 1131 | # their chunk queue. | ||
| 1132 | chunksize=1, | ||
| 1133 | ) | ||
| 1129 | 1134 | ||
| 1130 | self._local_sync_state.Save() | 1135 | self._local_sync_state.Save() |
| 1131 | return proc_res and not err_results | 1136 | return proc_res and not err_results |
