diff options
| author | Gavin Mak <gavinmak@google.com> | 2025-06-13 17:53:38 -0700 |
|---|---|---|
| committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2025-06-18 10:26:27 -0700 |
| commit | 7b6ffed4ae3102b7c90592eeff8e28855cc25c11 (patch) | |
| tree | 13be6b1470be5f3a2fad04797813936c034bb024 | |
| parent | b4b323a8bd02d52d060f7f6fa15ba045df5af5b2 (diff) | |
| download | git-repo-7b6ffed4ae3102b7c90592eeff8e28855cc25c11.tar.gz | |
sync: Implement --interleaved sync worker
For each assigned project, the worker sequentially calls
Sync_NetworkHalf and Sync_LocalHalf, respecting --local-only and
--network-only flags. To prevent scrambled progress bars, all stderr
output from the checkout phase is captured (shown with --verbose).
Result objects now carry status and timing information from the worker
for state updates.
Bug: 421935613
Change-Id: I398602e08a375e974a8914e5fa48ffae673dda9b
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483301
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
| -rw-r--r-- | progress.py | 15 | ||||
| -rw-r--r-- | subcmds/sync.py | 401 | ||||
| -rw-r--r-- | tests/test_subcmds_sync.py | 181 |
3 files changed, 483 insertions, 114 deletions
diff --git a/progress.py b/progress.py index fe246c74..a386f426 100644 --- a/progress.py +++ b/progress.py | |||
| @@ -195,6 +195,21 @@ class Progress: | |||
| 195 | ) | 195 | ) |
| 196 | ) | 196 | ) |
| 197 | 197 | ||
| 198 | def display_message(self, msg): | ||
| 199 | """Clears the current progress line and prints a message above it. | ||
| 200 | |||
| 201 | The progress bar is then redrawn on the next line. | ||
| 202 | """ | ||
| 203 | if not _TTY or IsTraceToStderr() or self._quiet: | ||
| 204 | return | ||
| 205 | |||
| 206 | # Erase the current line, print the message with a newline, | ||
| 207 | # and then immediately redraw the progress bar on the new line. | ||
| 208 | sys.stderr.write("\r" + CSI_ERASE_LINE) | ||
| 209 | sys.stderr.write(msg + "\n") | ||
| 210 | sys.stderr.flush() | ||
| 211 | self.update(inc=0) | ||
| 212 | |||
| 198 | def end(self): | 213 | def end(self): |
| 199 | self._update_event.set() | 214 | self._update_event.set() |
| 200 | if not _TTY or IsTraceToStderr() or self._quiet: | 215 | if not _TTY or IsTraceToStderr() or self._quiet: |
diff --git a/subcmds/sync.py b/subcmds/sync.py index 711baca2..f0c398a3 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -13,6 +13,7 @@ | |||
| 13 | # limitations under the License. | 13 | # limitations under the License. |
| 14 | 14 | ||
| 15 | import collections | 15 | import collections |
| 16 | import contextlib | ||
| 16 | import functools | 17 | import functools |
| 17 | import http.cookiejar as cookielib | 18 | import http.cookiejar as cookielib |
| 18 | import io | 19 | import io |
| @@ -198,33 +199,41 @@ class _SyncResult(NamedTuple): | |||
| 198 | """Individual project sync result for interleaved mode. | 199 | """Individual project sync result for interleaved mode. |
| 199 | 200 | ||
| 200 | Attributes: | 201 | Attributes: |
| 202 | project_index (int): The index of the project in the shared list. | ||
| 201 | relpath (str): The project's relative path from the repo client top. | 203 | relpath (str): The project's relative path from the repo client top. |
| 204 | remote_fetched (bool): True if the remote was actually queried. | ||
| 202 | fetch_success (bool): True if the fetch operation was successful. | 205 | fetch_success (bool): True if the fetch operation was successful. |
| 203 | checkout_success (bool): True if the checkout operation was | ||
| 204 | successful. | ||
| 205 | fetch_error (Optional[Exception]): The Exception from a failed fetch, | 206 | fetch_error (Optional[Exception]): The Exception from a failed fetch, |
| 206 | or None. | 207 | or None. |
| 207 | checkout_error (Optional[Exception]): The Exception from a failed | ||
| 208 | checkout, or None. | ||
| 209 | fetch_start (Optional[float]): The time.time() when fetch started. | 208 | fetch_start (Optional[float]): The time.time() when fetch started. |
| 210 | fetch_finish (Optional[float]): The time.time() when fetch finished. | 209 | fetch_finish (Optional[float]): The time.time() when fetch finished. |
| 210 | checkout_success (bool): True if the checkout operation was | ||
| 211 | successful. | ||
| 212 | checkout_error (Optional[Exception]): The Exception from a failed | ||
| 213 | checkout, or None. | ||
| 211 | checkout_start (Optional[float]): The time.time() when checkout | 214 | checkout_start (Optional[float]): The time.time() when checkout |
| 212 | started. | 215 | started. |
| 213 | checkout_finish (Optional[float]): The time.time() when checkout | 216 | checkout_finish (Optional[float]): The time.time() when checkout |
| 214 | finished. | 217 | finished. |
| 218 | stderr_text (str): The combined output from both fetch and checkout. | ||
| 215 | """ | 219 | """ |
| 216 | 220 | ||
| 221 | project_index: int | ||
| 217 | relpath: str | 222 | relpath: str |
| 223 | |||
| 224 | remote_fetched: bool | ||
| 218 | fetch_success: bool | 225 | fetch_success: bool |
| 219 | checkout_success: bool | ||
| 220 | fetch_error: Optional[Exception] | 226 | fetch_error: Optional[Exception] |
| 221 | checkout_error: Optional[Exception] | ||
| 222 | |||
| 223 | fetch_start: Optional[float] | 227 | fetch_start: Optional[float] |
| 224 | fetch_finish: Optional[float] | 228 | fetch_finish: Optional[float] |
| 229 | |||
| 230 | checkout_success: bool | ||
| 231 | checkout_error: Optional[Exception] | ||
| 225 | checkout_start: Optional[float] | 232 | checkout_start: Optional[float] |
| 226 | checkout_finish: Optional[float] | 233 | checkout_finish: Optional[float] |
| 227 | 234 | ||
| 235 | stderr_text: str | ||
| 236 | |||
| 228 | 237 | ||
| 229 | class _InterleavedSyncResult(NamedTuple): | 238 | class _InterleavedSyncResult(NamedTuple): |
| 230 | """Result of an interleaved sync. | 239 | """Result of an interleaved sync. |
| @@ -996,6 +1005,7 @@ later is required to fix a server side protocol bug. | |||
| 996 | err_event.set() | 1005 | err_event.set() |
| 997 | 1006 | ||
| 998 | # Call self update, unless requested not to | 1007 | # Call self update, unless requested not to |
| 1008 | # TODO(b/42193561): Extract repo update logic to ExecuteHelper. | ||
| 999 | if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0": | 1009 | if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0": |
| 1000 | _PostRepoFetch(rp, opt.repo_verify) | 1010 | _PostRepoFetch(rp, opt.repo_verify) |
| 1001 | if opt.network_only: | 1011 | if opt.network_only: |
| @@ -1176,6 +1186,16 @@ later is required to fix a server side protocol bug. | |||
| 1176 | self._local_sync_state.Save() | 1186 | self._local_sync_state.Save() |
| 1177 | return proc_res and not err_results | 1187 | return proc_res and not err_results |
| 1178 | 1188 | ||
| 1189 | def _PrintManifestNotices(self, opt): | ||
| 1190 | """Print all manifest notices, but only once.""" | ||
| 1191 | printed_notices = set() | ||
| 1192 | # Print all manifest notices, but only once. | ||
| 1193 | # Sort by path_prefix to ensure consistent ordering. | ||
| 1194 | for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix): | ||
| 1195 | if m.notice and m.notice not in printed_notices: | ||
| 1196 | print(m.notice) | ||
| 1197 | printed_notices.add(m.notice) | ||
| 1198 | |||
| 1179 | @staticmethod | 1199 | @staticmethod |
| 1180 | def _GetPreciousObjectsState(project: Project, opt): | 1200 | def _GetPreciousObjectsState(project: Project, opt): |
| 1181 | """Get the preciousObjects state for the project. | 1201 | """Get the preciousObjects state for the project. |
| @@ -2032,14 +2052,7 @@ later is required to fix a server side protocol bug. | |||
| 2032 | if err_checkout: | 2052 | if err_checkout: |
| 2033 | err_event.set() | 2053 | err_event.set() |
| 2034 | 2054 | ||
| 2035 | printed_notices = set() | 2055 | self._PrintManifestNotices(opt) |
| 2036 | # If there's a notice that's supposed to print at the end of the sync, | ||
| 2037 | # print it now... But avoid printing duplicate messages, and preserve | ||
| 2038 | # order. | ||
| 2039 | for m in sorted(self.ManifestList(opt), key=lambda x: x.path_prefix): | ||
| 2040 | if m.notice and m.notice not in printed_notices: | ||
| 2041 | print(m.notice) | ||
| 2042 | printed_notices.add(m.notice) | ||
| 2043 | 2056 | ||
| 2044 | # If we saw an error, exit with code 1 so that other scripts can check. | 2057 | # If we saw an error, exit with code 1 so that other scripts can check. |
| 2045 | if err_event.is_set(): | 2058 | if err_event.is_set(): |
| @@ -2068,6 +2081,139 @@ later is required to fix a server side protocol bug. | |||
| 2068 | raise SyncError(aggregate_errors=errors) | 2081 | raise SyncError(aggregate_errors=errors) |
| 2069 | 2082 | ||
| 2070 | @classmethod | 2083 | @classmethod |
| 2084 | def _SyncOneProject(cls, opt, project_index, project) -> _SyncResult: | ||
| 2085 | """Syncs a single project for interleaved sync.""" | ||
| 2086 | fetch_success = False | ||
| 2087 | remote_fetched = False | ||
| 2088 | fetch_error = None | ||
| 2089 | fetch_start = None | ||
| 2090 | fetch_finish = None | ||
| 2091 | network_output = "" | ||
| 2092 | |||
| 2093 | if opt.local_only: | ||
| 2094 | fetch_success = True | ||
| 2095 | else: | ||
| 2096 | fetch_start = time.time() | ||
| 2097 | network_output_capture = io.StringIO() | ||
| 2098 | try: | ||
| 2099 | ssh_proxy = cls.get_parallel_context().get("ssh_proxy") | ||
| 2100 | sync_result = project.Sync_NetworkHalf( | ||
| 2101 | quiet=opt.quiet, | ||
| 2102 | verbose=opt.verbose, | ||
| 2103 | output_redir=network_output_capture, | ||
| 2104 | current_branch_only=cls._GetCurrentBranchOnly( | ||
| 2105 | opt, project.manifest | ||
| 2106 | ), | ||
| 2107 | force_sync=opt.force_sync, | ||
| 2108 | clone_bundle=opt.clone_bundle, | ||
| 2109 | tags=opt.tags, | ||
| 2110 | archive=project.manifest.IsArchive, | ||
| 2111 | optimized_fetch=opt.optimized_fetch, | ||
| 2112 | retry_fetches=opt.retry_fetches, | ||
| 2113 | prune=opt.prune, | ||
| 2114 | ssh_proxy=ssh_proxy, | ||
| 2115 | clone_filter=project.manifest.CloneFilter, | ||
| 2116 | partial_clone_exclude=project.manifest.PartialCloneExclude, | ||
| 2117 | clone_filter_for_depth=project.manifest.CloneFilterForDepth, | ||
| 2118 | ) | ||
| 2119 | fetch_success = sync_result.success | ||
| 2120 | remote_fetched = sync_result.remote_fetched | ||
| 2121 | fetch_error = sync_result.error | ||
| 2122 | except KeyboardInterrupt: | ||
| 2123 | logger.error( | ||
| 2124 | "Keyboard interrupt while processing %s", project.name | ||
| 2125 | ) | ||
| 2126 | except GitError as e: | ||
| 2127 | fetch_error = e | ||
| 2128 | logger.error("error.GitError: Cannot fetch %s", e) | ||
| 2129 | except Exception as e: | ||
| 2130 | fetch_error = e | ||
| 2131 | logger.error( | ||
| 2132 | "error: Cannot fetch %s (%s: %s)", | ||
| 2133 | project.name, | ||
| 2134 | type(e).__name__, | ||
| 2135 | e, | ||
| 2136 | ) | ||
| 2137 | finally: | ||
| 2138 | fetch_finish = time.time() | ||
| 2139 | network_output = network_output_capture.getvalue() | ||
| 2140 | |||
| 2141 | checkout_success = False | ||
| 2142 | checkout_error = None | ||
| 2143 | checkout_start = None | ||
| 2144 | checkout_finish = None | ||
| 2145 | checkout_stderr = "" | ||
| 2146 | |||
| 2147 | if fetch_success and not opt.network_only: | ||
| 2148 | checkout_start = time.time() | ||
| 2149 | stderr_capture = io.StringIO() | ||
| 2150 | try: | ||
| 2151 | with contextlib.redirect_stderr(stderr_capture): | ||
| 2152 | syncbuf = SyncBuffer( | ||
| 2153 | project.manifest.manifestProject.config, | ||
| 2154 | detach_head=opt.detach_head, | ||
| 2155 | ) | ||
| 2156 | local_half_errors = [] | ||
| 2157 | project.Sync_LocalHalf( | ||
| 2158 | syncbuf, | ||
| 2159 | force_sync=opt.force_sync, | ||
| 2160 | force_checkout=opt.force_checkout, | ||
| 2161 | force_rebase=opt.rebase, | ||
| 2162 | errors=local_half_errors, | ||
| 2163 | verbose=opt.verbose, | ||
| 2164 | ) | ||
| 2165 | checkout_success = syncbuf.Finish() | ||
| 2166 | if local_half_errors: | ||
| 2167 | checkout_error = SyncError( | ||
| 2168 | aggregate_errors=local_half_errors | ||
| 2169 | ) | ||
| 2170 | except KeyboardInterrupt: | ||
| 2171 | logger.error( | ||
| 2172 | "Keyboard interrupt while processing %s", project.name | ||
| 2173 | ) | ||
| 2174 | except GitError as e: | ||
| 2175 | checkout_error = e | ||
| 2176 | logger.error( | ||
| 2177 | "error.GitError: Cannot checkout %s: %s", project.name, e | ||
| 2178 | ) | ||
| 2179 | except Exception as e: | ||
| 2180 | checkout_error = e | ||
| 2181 | logger.error( | ||
| 2182 | "error: Cannot checkout %s: %s: %s", | ||
| 2183 | project.name, | ||
| 2184 | type(e).__name__, | ||
| 2185 | e, | ||
| 2186 | ) | ||
| 2187 | finally: | ||
| 2188 | checkout_finish = time.time() | ||
| 2189 | checkout_stderr = stderr_capture.getvalue() | ||
| 2190 | elif fetch_success: | ||
| 2191 | checkout_success = True | ||
| 2192 | |||
| 2193 | # Consolidate all captured output. | ||
| 2194 | captured_parts = [] | ||
| 2195 | if network_output: | ||
| 2196 | captured_parts.append(network_output) | ||
| 2197 | if checkout_stderr: | ||
| 2198 | captured_parts.append(checkout_stderr) | ||
| 2199 | stderr_text = "\n".join(captured_parts) | ||
| 2200 | |||
| 2201 | return _SyncResult( | ||
| 2202 | project_index=project_index, | ||
| 2203 | relpath=project.relpath, | ||
| 2204 | fetch_success=fetch_success, | ||
| 2205 | remote_fetched=remote_fetched, | ||
| 2206 | checkout_success=checkout_success, | ||
| 2207 | fetch_error=fetch_error, | ||
| 2208 | checkout_error=checkout_error, | ||
| 2209 | stderr_text=stderr_text.strip(), | ||
| 2210 | fetch_start=fetch_start, | ||
| 2211 | fetch_finish=fetch_finish, | ||
| 2212 | checkout_start=checkout_start, | ||
| 2213 | checkout_finish=checkout_finish, | ||
| 2214 | ) | ||
| 2215 | |||
| 2216 | @classmethod | ||
| 2071 | def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: | 2217 | def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: |
| 2072 | """Worker for interleaved sync. | 2218 | """Worker for interleaved sync. |
| 2073 | 2219 | ||
| @@ -2092,27 +2238,12 @@ later is required to fix a server side protocol bug. | |||
| 2092 | # Use the first project as the representative for the progress bar. | 2238 | # Use the first project as the representative for the progress bar. |
| 2093 | first_project = projects[project_indices[0]] | 2239 | first_project = projects[project_indices[0]] |
| 2094 | key = f"{first_project.name} @ {first_project.relpath}" | 2240 | key = f"{first_project.name} @ {first_project.relpath}" |
| 2095 | start_time = time.time() | 2241 | sync_dict[key] = time.time() |
| 2096 | sync_dict[key] = start_time | ||
| 2097 | 2242 | ||
| 2098 | try: | 2243 | try: |
| 2099 | for idx in project_indices: | 2244 | for idx in project_indices: |
| 2100 | project = projects[idx] | 2245 | project = projects[idx] |
| 2101 | # For now, simulate a successful sync. | 2246 | results.append(cls._SyncOneProject(opt, idx, project)) |
| 2102 | # TODO(b/421935613): Perform the actual git fetch and checkout. | ||
| 2103 | results.append( | ||
| 2104 | _SyncResult( | ||
| 2105 | relpath=project.relpath, | ||
| 2106 | fetch_success=True, | ||
| 2107 | checkout_success=True, | ||
| 2108 | fetch_error=None, | ||
| 2109 | checkout_error=None, | ||
| 2110 | fetch_start=None, | ||
| 2111 | fetch_finish=None, | ||
| 2112 | checkout_start=None, | ||
| 2113 | checkout_finish=None, | ||
| 2114 | ) | ||
| 2115 | ) | ||
| 2116 | finally: | 2247 | finally: |
| 2117 | del sync_dict[key] | 2248 | del sync_dict[key] |
| 2118 | 2249 | ||
| @@ -2130,9 +2261,39 @@ later is required to fix a server side protocol bug. | |||
| 2130 | ): | 2261 | ): |
| 2131 | """Callback to process results from interleaved sync workers.""" | 2262 | """Callback to process results from interleaved sync workers.""" |
| 2132 | ret = True | 2263 | ret = True |
| 2264 | projects = self.get_parallel_context()["projects"] | ||
| 2133 | for result_group in results_sets: | 2265 | for result_group in results_sets: |
| 2134 | for result in result_group.results: | 2266 | for result in result_group.results: |
| 2135 | pm.update() | 2267 | pm.update() |
| 2268 | project = projects[result.project_index] | ||
| 2269 | |||
| 2270 | if opt.verbose and result.stderr_text: | ||
| 2271 | pm.display_message(result.stderr_text) | ||
| 2272 | |||
| 2273 | if result.fetch_start: | ||
| 2274 | self._fetch_times.Set( | ||
| 2275 | project, | ||
| 2276 | result.fetch_finish - result.fetch_start, | ||
| 2277 | ) | ||
| 2278 | self._local_sync_state.SetFetchTime(project) | ||
| 2279 | self.event_log.AddSync( | ||
| 2280 | project, | ||
| 2281 | event_log.TASK_SYNC_NETWORK, | ||
| 2282 | result.fetch_start, | ||
| 2283 | result.fetch_finish, | ||
| 2284 | result.fetch_success, | ||
| 2285 | ) | ||
| 2286 | if result.checkout_start: | ||
| 2287 | if result.checkout_success: | ||
| 2288 | self._local_sync_state.SetCheckoutTime(project) | ||
| 2289 | self.event_log.AddSync( | ||
| 2290 | project, | ||
| 2291 | event_log.TASK_SYNC_LOCAL, | ||
| 2292 | result.checkout_start, | ||
| 2293 | result.checkout_finish, | ||
| 2294 | result.checkout_success, | ||
| 2295 | ) | ||
| 2296 | |||
| 2136 | if result.fetch_success and result.checkout_success: | 2297 | if result.fetch_success and result.checkout_success: |
| 2137 | synced_relpaths.add(result.relpath) | 2298 | synced_relpaths.add(result.relpath) |
| 2138 | else: | 2299 | else: |
| @@ -2188,96 +2349,110 @@ later is required to fix a server side protocol bug. | |||
| 2188 | sync_event = _threading.Event() | 2349 | sync_event = _threading.Event() |
| 2189 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | 2350 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) |
| 2190 | 2351 | ||
| 2191 | with self.ParallelContext(): | 2352 | with multiprocessing.Manager() as manager, ssh.ProxyManager( |
| 2192 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | 2353 | manager |
| 2193 | self.get_parallel_context()[ | 2354 | ) as ssh_proxy: |
| 2194 | "sync_dict" | 2355 | ssh_proxy.sock() |
| 2195 | ] = multiprocessing.Manager().dict() | 2356 | with self.ParallelContext(): |
| 2196 | sync_progress_thread.start() | 2357 | self.get_parallel_context()["ssh_proxy"] = ssh_proxy |
| 2358 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | ||
| 2359 | self.get_parallel_context()[ | ||
| 2360 | "sync_dict" | ||
| 2361 | ] = multiprocessing.Manager().dict() | ||
| 2362 | sync_progress_thread.start() | ||
| 2197 | 2363 | ||
| 2198 | try: | 2364 | try: |
| 2199 | # Outer loop for dynamic project discovery (e.g., submodules). | 2365 | # Outer loop for dynamic project discovery. This continues |
| 2200 | # It continues until no unsynced projects remain. | 2366 | # until no unsynced projects remain. |
| 2201 | while True: | 2367 | while True: |
| 2202 | projects_to_sync = [ | 2368 | projects_to_sync = [ |
| 2203 | p | 2369 | p |
| 2204 | for p in project_list | 2370 | for p in project_list |
| 2205 | if p.relpath not in synced_relpaths | 2371 | if p.relpath not in synced_relpaths |
| 2206 | ] | 2372 | ] |
| 2207 | if not projects_to_sync: | 2373 | if not projects_to_sync: |
| 2208 | break | 2374 | break |
| 2209 | 2375 | ||
| 2210 | pending_relpaths = {p.relpath for p in projects_to_sync} | 2376 | pending_relpaths = {p.relpath for p in projects_to_sync} |
| 2211 | if previously_pending_relpaths == pending_relpaths: | 2377 | if previously_pending_relpaths == pending_relpaths: |
| 2212 | logger.error( | 2378 | logger.error( |
| 2213 | "Stall detected in interleaved sync, not all " | 2379 | "Stall detected in interleaved sync, not all " |
| 2214 | "projects could be synced." | 2380 | "projects could be synced." |
| 2215 | ) | ||
| 2216 | err_event.set() | ||
| 2217 | break | ||
| 2218 | previously_pending_relpaths = pending_relpaths | ||
| 2219 | |||
| 2220 | # Update the projects list for workers in the current pass. | ||
| 2221 | self.get_parallel_context()["projects"] = projects_to_sync | ||
| 2222 | project_index_map = { | ||
| 2223 | p: i for i, p in enumerate(projects_to_sync) | ||
| 2224 | } | ||
| 2225 | |||
| 2226 | # Inner loop to process projects in a hierarchical order. | ||
| 2227 | # This iterates through levels of project dependencies (e.g. | ||
| 2228 | # 'foo' then 'foo/bar'). All projects in one level can be | ||
| 2229 | # processed in parallel, but we must wait for a level to | ||
| 2230 | # complete before starting the next. | ||
| 2231 | for level_projects in _SafeCheckoutOrder(projects_to_sync): | ||
| 2232 | if not level_projects: | ||
| 2233 | continue | ||
| 2234 | |||
| 2235 | objdir_project_map = collections.defaultdict(list) | ||
| 2236 | for p in level_projects: | ||
| 2237 | objdir_project_map[p.objdir].append( | ||
| 2238 | project_index_map[p] | ||
| 2239 | ) | 2381 | ) |
| 2240 | |||
| 2241 | work_items = list(objdir_project_map.values()) | ||
| 2242 | if not work_items: | ||
| 2243 | continue | ||
| 2244 | |||
| 2245 | jobs = max(1, min(opt.jobs, len(work_items))) | ||
| 2246 | callback = functools.partial( | ||
| 2247 | self._ProcessSyncInterleavedResults, | ||
| 2248 | synced_relpaths, | ||
| 2249 | err_event, | ||
| 2250 | errors, | ||
| 2251 | opt, | ||
| 2252 | ) | ||
| 2253 | if not self.ExecuteInParallel( | ||
| 2254 | jobs, | ||
| 2255 | functools.partial(self._SyncProjectList, opt), | ||
| 2256 | work_items, | ||
| 2257 | callback=callback, | ||
| 2258 | output=pm, | ||
| 2259 | chunksize=1, | ||
| 2260 | ): | ||
| 2261 | err_event.set() | 2382 | err_event.set() |
| 2383 | break | ||
| 2384 | previously_pending_relpaths = pending_relpaths | ||
| 2385 | |||
| 2386 | self.get_parallel_context()[ | ||
| 2387 | "projects" | ||
| 2388 | ] = projects_to_sync | ||
| 2389 | project_index_map = { | ||
| 2390 | p: i for i, p in enumerate(projects_to_sync) | ||
| 2391 | } | ||
| 2392 | |||
| 2393 | # Inner loop to process projects in a hierarchical | ||
| 2394 | # order. This iterates through levels of project | ||
| 2395 | # dependencies (e.g. 'foo' then 'foo/bar'). All projects | ||
| 2396 | # in one level can be processed in parallel, but we must | ||
| 2397 | # wait for a level to complete before starting the next. | ||
| 2398 | for level_projects in _SafeCheckoutOrder( | ||
| 2399 | projects_to_sync | ||
| 2400 | ): | ||
| 2401 | if not level_projects: | ||
| 2402 | continue | ||
| 2262 | 2403 | ||
| 2263 | if err_event.is_set() and opt.fail_fast: | 2404 | objdir_project_map = collections.defaultdict(list) |
| 2264 | raise SyncFailFastError(aggregate_errors=errors) | 2405 | for p in level_projects: |
| 2406 | objdir_project_map[p.objdir].append( | ||
| 2407 | project_index_map[p] | ||
| 2408 | ) | ||
| 2265 | 2409 | ||
| 2266 | self._ReloadManifest(None, manifest) | 2410 | work_items = list(objdir_project_map.values()) |
| 2267 | project_list = self.GetProjects( | 2411 | if not work_items: |
| 2268 | args, | 2412 | continue |
| 2269 | missing_ok=True, | 2413 | |
| 2270 | submodules_ok=opt.fetch_submodules, | 2414 | jobs = max(1, min(opt.jobs, len(work_items))) |
| 2271 | manifest=manifest, | 2415 | callback = functools.partial( |
| 2272 | all_manifests=not opt.this_manifest_only, | 2416 | self._ProcessSyncInterleavedResults, |
| 2273 | ) | 2417 | synced_relpaths, |
| 2274 | finally: | 2418 | err_event, |
| 2275 | sync_event.set() | 2419 | errors, |
| 2276 | sync_progress_thread.join() | 2420 | opt, |
| 2421 | ) | ||
| 2422 | if not self.ExecuteInParallel( | ||
| 2423 | jobs, | ||
| 2424 | functools.partial(self._SyncProjectList, opt), | ||
| 2425 | work_items, | ||
| 2426 | callback=callback, | ||
| 2427 | output=pm, | ||
| 2428 | chunksize=1, | ||
| 2429 | ): | ||
| 2430 | err_event.set() | ||
| 2431 | |||
| 2432 | if err_event.is_set() and opt.fail_fast: | ||
| 2433 | raise SyncFailFastError(aggregate_errors=errors) | ||
| 2434 | |||
| 2435 | self._ReloadManifest(None, manifest) | ||
| 2436 | project_list = self.GetProjects( | ||
| 2437 | args, | ||
| 2438 | missing_ok=True, | ||
| 2439 | submodules_ok=opt.fetch_submodules, | ||
| 2440 | manifest=manifest, | ||
| 2441 | all_manifests=not opt.this_manifest_only, | ||
| 2442 | ) | ||
| 2443 | finally: | ||
| 2444 | sync_event.set() | ||
| 2445 | sync_progress_thread.join() | ||
| 2277 | 2446 | ||
| 2278 | pm.end() | 2447 | pm.end() |
| 2279 | 2448 | ||
| 2449 | # TODO(b/421935613): Add the manifest loop block from PhasedSync. | ||
| 2450 | if not self.outer_client.manifest.IsArchive: | ||
| 2451 | self._GCProjects(project_list, opt, err_event) | ||
| 2452 | |||
| 2453 | self._PrintManifestNotices(opt) | ||
| 2280 | if err_event.is_set(): | 2454 | if err_event.is_set(): |
| 2455 | # TODO(b/421935613): Log errors better like SyncPhased. | ||
| 2281 | logger.error( | 2456 | logger.error( |
| 2282 | "error: Unable to fully sync the tree in interleaved mode." | 2457 | "error: Unable to fully sync the tree in interleaved mode." |
| 2283 | ) | 2458 | ) |
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 60f283af..e7213ed9 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
| @@ -310,6 +310,16 @@ class FakeProject: | |||
| 310 | self.name = name or relpath | 310 | self.name = name or relpath |
| 311 | self.objdir = objdir or relpath | 311 | self.objdir = objdir or relpath |
| 312 | 312 | ||
| 313 | self.use_git_worktrees = False | ||
| 314 | self.UseAlternates = False | ||
| 315 | self.manifest = mock.MagicMock() | ||
| 316 | self.manifest.GetProjectsWithName.return_value = [self] | ||
| 317 | self.config = mock.MagicMock() | ||
| 318 | self.EnableRepositoryExtension = mock.MagicMock() | ||
| 319 | |||
| 320 | def RelPath(self, local=None): | ||
| 321 | return self.relpath | ||
| 322 | |||
| 313 | def __str__(self): | 323 | def __str__(self): |
| 314 | return f"project: {self.relpath}" | 324 | return f"project: {self.relpath}" |
| 315 | 325 | ||
| @@ -531,7 +541,11 @@ class InterleavedSyncTest(unittest.TestCase): | |||
| 531 | self.manifest.CloneBundle = False | 541 | self.manifest.CloneBundle = False |
| 532 | self.manifest.default.sync_j = 1 | 542 | self.manifest.default.sync_j = 1 |
| 533 | 543 | ||
| 534 | self.cmd = sync.Sync(manifest=self.manifest) | 544 | self.outer_client = mock.MagicMock() |
| 545 | self.outer_client.manifest.IsArchive = False | ||
| 546 | self.cmd = sync.Sync( | ||
| 547 | manifest=self.manifest, outer_client=self.outer_client | ||
| 548 | ) | ||
| 535 | self.cmd.outer_manifest = self.manifest | 549 | self.cmd.outer_manifest = self.manifest |
| 536 | 550 | ||
| 537 | # Mock projects. | 551 | # Mock projects. |
| @@ -549,6 +563,21 @@ class InterleavedSyncTest(unittest.TestCase): | |||
| 549 | mock.patch.object(sync, "_PostRepoUpgrade").start() | 563 | mock.patch.object(sync, "_PostRepoUpgrade").start() |
| 550 | mock.patch.object(sync, "_PostRepoFetch").start() | 564 | mock.patch.object(sync, "_PostRepoFetch").start() |
| 551 | 565 | ||
| 566 | # Mock parallel context for worker tests. | ||
| 567 | self.parallel_context_patcher = mock.patch( | ||
| 568 | "subcmds.sync.Sync.get_parallel_context" | ||
| 569 | ) | ||
| 570 | self.mock_get_parallel_context = self.parallel_context_patcher.start() | ||
| 571 | self.sync_dict = {} | ||
| 572 | self.mock_context = { | ||
| 573 | "projects": [], | ||
| 574 | "sync_dict": self.sync_dict, | ||
| 575 | } | ||
| 576 | self.mock_get_parallel_context.return_value = self.mock_context | ||
| 577 | |||
| 578 | # Mock _GetCurrentBranchOnly for worker tests. | ||
| 579 | mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() | ||
| 580 | |||
| 552 | def tearDown(self): | 581 | def tearDown(self): |
| 553 | """Clean up resources.""" | 582 | """Clean up resources.""" |
| 554 | shutil.rmtree(self.repodir) | 583 | shutil.rmtree(self.repodir) |
| @@ -635,3 +664,153 @@ class InterleavedSyncTest(unittest.TestCase): | |||
| 635 | work_items_sets = {frozenset(item) for item in work_items} | 664 | work_items_sets = {frozenset(item) for item in work_items} |
| 636 | expected_sets = {frozenset([0, 2]), frozenset([1])} | 665 | expected_sets = {frozenset([0, 2]), frozenset([1])} |
| 637 | self.assertEqual(work_items_sets, expected_sets) | 666 | self.assertEqual(work_items_sets, expected_sets) |
| 667 | |||
| 668 | def _get_opts(self, args=None): | ||
| 669 | """Helper to get default options for worker tests.""" | ||
| 670 | if args is None: | ||
| 671 | args = ["--interleaved"] | ||
| 672 | opt, _ = self.cmd.OptionParser.parse_args(args) | ||
| 673 | # Set defaults for options used by the worker. | ||
| 674 | opt.quiet = True | ||
| 675 | opt.verbose = False | ||
| 676 | opt.force_sync = False | ||
| 677 | opt.clone_bundle = False | ||
| 678 | opt.tags = False | ||
| 679 | opt.optimized_fetch = False | ||
| 680 | opt.retry_fetches = 0 | ||
| 681 | opt.prune = False | ||
| 682 | opt.detach_head = False | ||
| 683 | opt.force_checkout = False | ||
| 684 | opt.rebase = False | ||
| 685 | return opt | ||
| 686 | |||
| 687 | def test_worker_successful_sync(self): | ||
| 688 | """Test _SyncProjectList with a successful fetch and checkout.""" | ||
| 689 | opt = self._get_opts() | ||
| 690 | project = self.projA | ||
| 691 | project.Sync_NetworkHalf = mock.Mock( | ||
| 692 | return_value=SyncNetworkHalfResult(error=None, remote_fetched=True) | ||
| 693 | ) | ||
| 694 | project.Sync_LocalHalf = mock.Mock() | ||
| 695 | project.manifest.manifestProject.config = mock.MagicMock() | ||
| 696 | self.mock_context["projects"] = [project] | ||
| 697 | |||
| 698 | with mock.patch("subcmds.sync.SyncBuffer") as mock_sync_buffer: | ||
| 699 | mock_sync_buf_instance = mock.MagicMock() | ||
| 700 | mock_sync_buf_instance.Finish.return_value = True | ||
| 701 | mock_sync_buffer.return_value = mock_sync_buf_instance | ||
| 702 | |||
| 703 | result_obj = self.cmd._SyncProjectList(opt, [0]) | ||
| 704 | |||
| 705 | self.assertEqual(len(result_obj.results), 1) | ||
| 706 | result = result_obj.results[0] | ||
| 707 | self.assertTrue(result.fetch_success) | ||
| 708 | self.assertTrue(result.checkout_success) | ||
| 709 | self.assertIsNone(result.fetch_error) | ||
| 710 | self.assertIsNone(result.checkout_error) | ||
| 711 | project.Sync_NetworkHalf.assert_called_once() | ||
| 712 | project.Sync_LocalHalf.assert_called_once() | ||
| 713 | |||
| 714 | def test_worker_fetch_fails(self): | ||
| 715 | """Test _SyncProjectList with a failed fetch.""" | ||
| 716 | opt = self._get_opts() | ||
| 717 | project = self.projA | ||
| 718 | fetch_error = GitError("Fetch failed") | ||
| 719 | project.Sync_NetworkHalf = mock.Mock( | ||
| 720 | return_value=SyncNetworkHalfResult( | ||
| 721 | error=fetch_error, remote_fetched=False | ||
| 722 | ) | ||
| 723 | ) | ||
| 724 | project.Sync_LocalHalf = mock.Mock() | ||
| 725 | self.mock_context["projects"] = [project] | ||
| 726 | |||
| 727 | result_obj = self.cmd._SyncProjectList(opt, [0]) | ||
| 728 | result = result_obj.results[0] | ||
| 729 | |||
| 730 | self.assertFalse(result.fetch_success) | ||
| 731 | self.assertFalse(result.checkout_success) | ||
| 732 | self.assertEqual(result.fetch_error, fetch_error) | ||
| 733 | self.assertIsNone(result.checkout_error) | ||
| 734 | project.Sync_NetworkHalf.assert_called_once() | ||
| 735 | project.Sync_LocalHalf.assert_not_called() | ||
| 736 | |||
| 737 | def test_worker_fetch_fails_exception(self): | ||
| 738 | """Test _SyncProjectList with an exception during fetch.""" | ||
| 739 | opt = self._get_opts() | ||
| 740 | project = self.projA | ||
| 741 | fetch_error = GitError("Fetch failed") | ||
| 742 | project.Sync_NetworkHalf = mock.Mock(side_effect=fetch_error) | ||
| 743 | project.Sync_LocalHalf = mock.Mock() | ||
| 744 | self.mock_context["projects"] = [project] | ||
| 745 | |||
| 746 | result_obj = self.cmd._SyncProjectList(opt, [0]) | ||
| 747 | result = result_obj.results[0] | ||
| 748 | |||
| 749 | self.assertFalse(result.fetch_success) | ||
| 750 | self.assertFalse(result.checkout_success) | ||
| 751 | self.assertEqual(result.fetch_error, fetch_error) | ||
| 752 | project.Sync_NetworkHalf.assert_called_once() | ||
| 753 | project.Sync_LocalHalf.assert_not_called() | ||
| 754 | |||
| 755 | def test_worker_checkout_fails(self): | ||
| 756 | """Test _SyncProjectList with an exception during checkout.""" | ||
| 757 | opt = self._get_opts() | ||
| 758 | project = self.projA | ||
| 759 | project.Sync_NetworkHalf = mock.Mock( | ||
| 760 | return_value=SyncNetworkHalfResult(error=None, remote_fetched=True) | ||
| 761 | ) | ||
| 762 | checkout_error = GitError("Checkout failed") | ||
| 763 | project.Sync_LocalHalf = mock.Mock(side_effect=checkout_error) | ||
| 764 | project.manifest.manifestProject.config = mock.MagicMock() | ||
| 765 | self.mock_context["projects"] = [project] | ||
| 766 | |||
| 767 | with mock.patch("subcmds.sync.SyncBuffer"): | ||
| 768 | result_obj = self.cmd._SyncProjectList(opt, [0]) | ||
| 769 | result = result_obj.results[0] | ||
| 770 | |||
| 771 | self.assertTrue(result.fetch_success) | ||
| 772 | self.assertFalse(result.checkout_success) | ||
| 773 | self.assertIsNone(result.fetch_error) | ||
| 774 | self.assertEqual(result.checkout_error, checkout_error) | ||
| 775 | project.Sync_NetworkHalf.assert_called_once() | ||
| 776 | project.Sync_LocalHalf.assert_called_once() | ||
| 777 | |||
| 778 | def test_worker_local_only(self): | ||
| 779 | """Test _SyncProjectList with --local-only.""" | ||
| 780 | opt = self._get_opts(["--interleaved", "--local-only"]) | ||
| 781 | project = self.projA | ||
| 782 | project.Sync_NetworkHalf = mock.Mock() | ||
| 783 | project.Sync_LocalHalf = mock.Mock() | ||
| 784 | project.manifest.manifestProject.config = mock.MagicMock() | ||
| 785 | self.mock_context["projects"] = [project] | ||
| 786 | |||
| 787 | with mock.patch("subcmds.sync.SyncBuffer") as mock_sync_buffer: | ||
| 788 | mock_sync_buf_instance = mock.MagicMock() | ||
| 789 | mock_sync_buf_instance.Finish.return_value = True | ||
| 790 | mock_sync_buffer.return_value = mock_sync_buf_instance | ||
| 791 | |||
| 792 | result_obj = self.cmd._SyncProjectList(opt, [0]) | ||
| 793 | result = result_obj.results[0] | ||
| 794 | |||
| 795 | self.assertTrue(result.fetch_success) | ||
| 796 | self.assertTrue(result.checkout_success) | ||
| 797 | project.Sync_NetworkHalf.assert_not_called() | ||
| 798 | project.Sync_LocalHalf.assert_called_once() | ||
| 799 | |||
| 800 | def test_worker_network_only(self): | ||
| 801 | """Test _SyncProjectList with --network-only.""" | ||
| 802 | opt = self._get_opts(["--interleaved", "--network-only"]) | ||
| 803 | project = self.projA | ||
| 804 | project.Sync_NetworkHalf = mock.Mock( | ||
| 805 | return_value=SyncNetworkHalfResult(error=None, remote_fetched=True) | ||
| 806 | ) | ||
| 807 | project.Sync_LocalHalf = mock.Mock() | ||
| 808 | self.mock_context["projects"] = [project] | ||
| 809 | |||
| 810 | result_obj = self.cmd._SyncProjectList(opt, [0]) | ||
| 811 | result = result_obj.results[0] | ||
| 812 | |||
| 813 | self.assertTrue(result.fetch_success) | ||
| 814 | self.assertTrue(result.checkout_success) | ||
| 815 | project.Sync_NetworkHalf.assert_called_once() | ||
| 816 | project.Sync_LocalHalf.assert_not_called() | ||
