diff options
| -rw-r--r-- | subcmds/sync.py | 295 | ||||
| -rw-r--r-- | tests/test_subcmds_sync.py | 124 | 
2 files changed, 398 insertions, 21 deletions
| diff --git a/subcmds/sync.py b/subcmds/sync.py index 6e369a10..711baca2 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py | |||
| @@ -25,7 +25,7 @@ from pathlib import Path | |||
| 25 | import sys | 25 | import sys | 
| 26 | import tempfile | 26 | import tempfile | 
| 27 | import time | 27 | import time | 
| 28 | from typing import List, NamedTuple, Set, Union | 28 | from typing import List, NamedTuple, Optional, Set, Union | 
| 29 | import urllib.error | 29 | import urllib.error | 
| 30 | import urllib.parse | 30 | import urllib.parse | 
| 31 | import urllib.request | 31 | import urllib.request | 
| @@ -194,6 +194,49 @@ class _CheckoutOneResult(NamedTuple): | |||
| 194 | finish: float | 194 | finish: float | 
| 195 | 195 | ||
| 196 | 196 | ||
| 197 | class _SyncResult(NamedTuple): | ||
| 198 | """Individual project sync result for interleaved mode. | ||
| 199 | |||
| 200 | Attributes: | ||
| 201 | relpath (str): The project's relative path from the repo client top. | ||
| 202 | fetch_success (bool): True if the fetch operation was successful. | ||
| 203 | checkout_success (bool): True if the checkout operation was | ||
| 204 | successful. | ||
| 205 | fetch_error (Optional[Exception]): The Exception from a failed fetch, | ||
| 206 | or None. | ||
| 207 | checkout_error (Optional[Exception]): The Exception from a failed | ||
| 208 | checkout, or None. | ||
| 209 | fetch_start (Optional[float]): The time.time() when fetch started. | ||
| 210 | fetch_finish (Optional[float]): The time.time() when fetch finished. | ||
| 211 | checkout_start (Optional[float]): The time.time() when checkout | ||
| 212 | started. | ||
| 213 | checkout_finish (Optional[float]): The time.time() when checkout | ||
| 214 | finished. | ||
| 215 | """ | ||
| 216 | |||
| 217 | relpath: str | ||
| 218 | fetch_success: bool | ||
| 219 | checkout_success: bool | ||
| 220 | fetch_error: Optional[Exception] | ||
| 221 | checkout_error: Optional[Exception] | ||
| 222 | |||
| 223 | fetch_start: Optional[float] | ||
| 224 | fetch_finish: Optional[float] | ||
| 225 | checkout_start: Optional[float] | ||
| 226 | checkout_finish: Optional[float] | ||
| 227 | |||
| 228 | |||
| 229 | class _InterleavedSyncResult(NamedTuple): | ||
| 230 | """Result of an interleaved sync. | ||
| 231 | |||
| 232 | Attributes: | ||
| 233 | results (List[_SyncResult]): A list of results, one for each project | ||
| 234 | processed. Empty if the worker failed before creating results. | ||
| 235 | """ | ||
| 236 | |||
| 237 | results: List[_SyncResult] | ||
| 238 | |||
| 239 | |||
| 197 | class SuperprojectError(SyncError): | 240 | class SuperprojectError(SyncError): | 
| 198 | """Superproject sync repo.""" | 241 | """Superproject sync repo.""" | 
| 199 | 242 | ||
| @@ -837,15 +880,7 @@ later is required to fix a server side protocol bug. | |||
| 837 | ) | 880 | ) | 
| 838 | 881 | ||
| 839 | sync_event = _threading.Event() | 882 | sync_event = _threading.Event() | 
| 840 | 883 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | |
| 841 | def _MonitorSyncLoop(): | ||
| 842 | while True: | ||
| 843 | pm.update(inc=0, msg=self._GetSyncProgressMessage()) | ||
| 844 | if sync_event.wait(timeout=1): | ||
| 845 | return | ||
| 846 | |||
| 847 | sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop) | ||
| 848 | sync_progress_thread.daemon = True | ||
| 849 | 884 | ||
| 850 | def _ProcessResults(pool, pm, results_sets): | 885 | def _ProcessResults(pool, pm, results_sets): | 
| 851 | ret = True | 886 | ret = True | 
| @@ -1828,6 +1863,16 @@ later is required to fix a server side protocol bug. | |||
| 1828 | all_manifests=not opt.this_manifest_only, | 1863 | all_manifests=not opt.this_manifest_only, | 
| 1829 | ) | 1864 | ) | 
| 1830 | 1865 | ||
| 1866 | # Log the repo projects by existing and new. | ||
| 1867 | existing = [x for x in all_projects if x.Exists] | ||
| 1868 | mp.config.SetString("repo.existingprojectcount", str(len(existing))) | ||
| 1869 | mp.config.SetString( | ||
| 1870 | "repo.newprojectcount", str(len(all_projects) - len(existing)) | ||
| 1871 | ) | ||
| 1872 | |||
| 1873 | self._fetch_times = _FetchTimes(manifest) | ||
| 1874 | self._local_sync_state = LocalSyncState(manifest) | ||
| 1875 | |||
| 1831 | if opt.interleaved: | 1876 | if opt.interleaved: | 
| 1832 | sync_method = self._SyncInterleaved | 1877 | sync_method = self._SyncInterleaved | 
| 1833 | else: | 1878 | else: | 
| @@ -1864,6 +1909,34 @@ later is required to fix a server side protocol bug. | |||
| 1864 | if not opt.quiet: | 1909 | if not opt.quiet: | 
| 1865 | print("repo sync has finished successfully.") | 1910 | print("repo sync has finished successfully.") | 
| 1866 | 1911 | ||
| 1912 | def _CreateSyncProgressThread( | ||
| 1913 | self, pm: Progress, stop_event: _threading.Event | ||
| 1914 | ) -> _threading.Thread: | ||
| 1915 | """Creates and returns a daemon thread to update a Progress object. | ||
| 1916 | |||
| 1917 | The returned thread is not yet started. The thread will periodically | ||
| 1918 | update the progress bar with information from _GetSyncProgressMessage | ||
| 1919 | until the stop_event is set. | ||
| 1920 | |||
| 1921 | Args: | ||
| 1922 | pm: The Progress object to update. | ||
| 1923 | stop_event: The threading.Event to signal the monitor to stop. | ||
| 1924 | |||
| 1925 | Returns: | ||
| 1926 | The configured _threading.Thread object. | ||
| 1927 | """ | ||
| 1928 | |||
| 1929 | def _monitor_loop(): | ||
| 1930 | """The target function for the monitor thread.""" | ||
| 1931 | while True: | ||
| 1932 | # Update the progress bar with the current status message. | ||
| 1933 | pm.update(inc=0, msg=self._GetSyncProgressMessage()) | ||
| 1934 | # Wait for 1 second or until the stop_event is set. | ||
| 1935 | if stop_event.wait(timeout=1): | ||
| 1936 | return | ||
| 1937 | |||
| 1938 | return _threading.Thread(target=_monitor_loop, daemon=True) | ||
| 1939 | |||
| 1867 | def _SyncPhased( | 1940 | def _SyncPhased( | 
| 1868 | self, | 1941 | self, | 
| 1869 | opt, | 1942 | opt, | 
| @@ -1890,15 +1963,6 @@ later is required to fix a server side protocol bug. | |||
| 1890 | err_update_projects = False | 1963 | err_update_projects = False | 
| 1891 | err_update_linkfiles = False | 1964 | err_update_linkfiles = False | 
| 1892 | 1965 | ||
| 1893 | # Log the repo projects by existing and new. | ||
| 1894 | existing = [x for x in all_projects if x.Exists] | ||
| 1895 | mp.config.SetString("repo.existingprojectcount", str(len(existing))) | ||
| 1896 | mp.config.SetString( | ||
| 1897 | "repo.newprojectcount", str(len(all_projects) - len(existing)) | ||
| 1898 | ) | ||
| 1899 | |||
| 1900 | self._fetch_times = _FetchTimes(manifest) | ||
| 1901 | self._local_sync_state = LocalSyncState(manifest) | ||
| 1902 | if not opt.local_only: | 1966 | if not opt.local_only: | 
| 1903 | with multiprocessing.Manager() as manager: | 1967 | with multiprocessing.Manager() as manager: | 
| 1904 | with ssh.ProxyManager(manager) as ssh_proxy: | 1968 | with ssh.ProxyManager(manager) as ssh_proxy: | 
| @@ -2003,6 +2067,88 @@ later is required to fix a server side protocol bug. | |||
| 2003 | ) | 2067 | ) | 
| 2004 | raise SyncError(aggregate_errors=errors) | 2068 | raise SyncError(aggregate_errors=errors) | 
| 2005 | 2069 | ||
| 2070 | @classmethod | ||
| 2071 | def _SyncProjectList(cls, opt, project_indices) -> _InterleavedSyncResult: | ||
| 2072 | """Worker for interleaved sync. | ||
| 2073 | |||
| 2074 | This function is responsible for syncing a group of projects that share | ||
| 2075 | a git object directory. | ||
| 2076 | |||
| 2077 | Args: | ||
| 2078 | opt: Program options returned from optparse. See _Options(). | ||
| 2079 | project_indices: A list of indices into the projects list stored in | ||
| 2080 | the parallel context. | ||
| 2081 | |||
| 2082 | Returns: | ||
| 2083 | An `_InterleavedSyncResult` containing the results for each project. | ||
| 2084 | """ | ||
| 2085 | results = [] | ||
| 2086 | context = cls.get_parallel_context() | ||
| 2087 | projects = context["projects"] | ||
| 2088 | sync_dict = context["sync_dict"] | ||
| 2089 | |||
| 2090 | assert project_indices, "_SyncProjectList called with no indices." | ||
| 2091 | |||
| 2092 | # Use the first project as the representative for the progress bar. | ||
| 2093 | first_project = projects[project_indices[0]] | ||
| 2094 | key = f"{first_project.name} @ {first_project.relpath}" | ||
| 2095 | start_time = time.time() | ||
| 2096 | sync_dict[key] = start_time | ||
| 2097 | |||
| 2098 | try: | ||
| 2099 | for idx in project_indices: | ||
| 2100 | project = projects[idx] | ||
| 2101 | # For now, simulate a successful sync. | ||
| 2102 | # TODO(b/421935613): Perform the actual git fetch and checkout. | ||
| 2103 | results.append( | ||
| 2104 | _SyncResult( | ||
| 2105 | relpath=project.relpath, | ||
| 2106 | fetch_success=True, | ||
| 2107 | checkout_success=True, | ||
| 2108 | fetch_error=None, | ||
| 2109 | checkout_error=None, | ||
| 2110 | fetch_start=None, | ||
| 2111 | fetch_finish=None, | ||
| 2112 | checkout_start=None, | ||
| 2113 | checkout_finish=None, | ||
| 2114 | ) | ||
| 2115 | ) | ||
| 2116 | finally: | ||
| 2117 | del sync_dict[key] | ||
| 2118 | |||
| 2119 | return _InterleavedSyncResult(results=results) | ||
| 2120 | |||
| 2121 | def _ProcessSyncInterleavedResults( | ||
| 2122 | self, | ||
| 2123 | synced_relpaths: Set[str], | ||
| 2124 | err_event: _threading.Event, | ||
| 2125 | errors: List[Exception], | ||
| 2126 | opt: optparse.Values, | ||
| 2127 | pool: Optional[multiprocessing.Pool], | ||
| 2128 | pm: Progress, | ||
| 2129 | results_sets: List[_InterleavedSyncResult], | ||
| 2130 | ): | ||
| 2131 | """Callback to process results from interleaved sync workers.""" | ||
| 2132 | ret = True | ||
| 2133 | for result_group in results_sets: | ||
| 2134 | for result in result_group.results: | ||
| 2135 | pm.update() | ||
| 2136 | if result.fetch_success and result.checkout_success: | ||
| 2137 | synced_relpaths.add(result.relpath) | ||
| 2138 | else: | ||
| 2139 | ret = False | ||
| 2140 | err_event.set() | ||
| 2141 | if result.fetch_error: | ||
| 2142 | errors.append(result.fetch_error) | ||
| 2143 | if result.checkout_error: | ||
| 2144 | errors.append(result.checkout_error) | ||
| 2145 | |||
| 2146 | if not ret and opt.fail_fast: | ||
| 2147 | if pool: | ||
| 2148 | pool.close() | ||
| 2149 | break | ||
| 2150 | return ret | ||
| 2151 | |||
| 2006 | def _SyncInterleaved( | 2152 | def _SyncInterleaved( | 
| 2007 | self, | 2153 | self, | 
| 2008 | opt, | 2154 | opt, | 
| @@ -2026,7 +2172,116 @@ later is required to fix a server side protocol bug. | |||
| 2026 | 2. Projects that share git objects are processed serially to prevent | 2172 | 2. Projects that share git objects are processed serially to prevent | 
| 2027 | race conditions. | 2173 | race conditions. | 
| 2028 | """ | 2174 | """ | 
| 2029 | raise NotImplementedError("Interleaved sync is not implemented yet.") | 2175 | err_event = multiprocessing.Event() | 
| 2176 | synced_relpaths = set() | ||
| 2177 | project_list = list(all_projects) | ||
| 2178 | pm = Progress( | ||
| 2179 | "Syncing", | ||
| 2180 | len(project_list), | ||
| 2181 | delay=False, | ||
| 2182 | quiet=opt.quiet, | ||
| 2183 | show_elapsed=True, | ||
| 2184 | elide=True, | ||
| 2185 | ) | ||
| 2186 | previously_pending_relpaths = set() | ||
| 2187 | |||
| 2188 | sync_event = _threading.Event() | ||
| 2189 | sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) | ||
| 2190 | |||
| 2191 | with self.ParallelContext(): | ||
| 2192 | # TODO(gavinmak): Use multprocessing.Queue instead of dict. | ||
| 2193 | self.get_parallel_context()[ | ||
| 2194 | "sync_dict" | ||
| 2195 | ] = multiprocessing.Manager().dict() | ||
| 2196 | sync_progress_thread.start() | ||
| 2197 | |||
| 2198 | try: | ||
| 2199 | # Outer loop for dynamic project discovery (e.g., submodules). | ||
| 2200 | # It continues until no unsynced projects remain. | ||
| 2201 | while True: | ||
| 2202 | projects_to_sync = [ | ||
| 2203 | p | ||
| 2204 | for p in project_list | ||
| 2205 | if p.relpath not in synced_relpaths | ||
| 2206 | ] | ||
| 2207 | if not projects_to_sync: | ||
| 2208 | break | ||
| 2209 | |||
| 2210 | pending_relpaths = {p.relpath for p in projects_to_sync} | ||
| 2211 | if previously_pending_relpaths == pending_relpaths: | ||
| 2212 | logger.error( | ||
| 2213 | "Stall detected in interleaved sync, not all " | ||
| 2214 | "projects could be synced." | ||
| 2215 | ) | ||
| 2216 | err_event.set() | ||
| 2217 | break | ||
| 2218 | previously_pending_relpaths = pending_relpaths | ||
| 2219 | |||
| 2220 | # Update the projects list for workers in the current pass. | ||
| 2221 | self.get_parallel_context()["projects"] = projects_to_sync | ||
| 2222 | project_index_map = { | ||
| 2223 | p: i for i, p in enumerate(projects_to_sync) | ||
| 2224 | } | ||
| 2225 | |||
| 2226 | # Inner loop to process projects in a hierarchical order. | ||
| 2227 | # This iterates through levels of project dependencies (e.g. | ||
| 2228 | # 'foo' then 'foo/bar'). All projects in one level can be | ||
| 2229 | # processed in parallel, but we must wait for a level to | ||
| 2230 | # complete before starting the next. | ||
| 2231 | for level_projects in _SafeCheckoutOrder(projects_to_sync): | ||
| 2232 | if not level_projects: | ||
| 2233 | continue | ||
| 2234 | |||
| 2235 | objdir_project_map = collections.defaultdict(list) | ||
| 2236 | for p in level_projects: | ||
| 2237 | objdir_project_map[p.objdir].append( | ||
| 2238 | project_index_map[p] | ||
| 2239 | ) | ||
| 2240 | |||
| 2241 | work_items = list(objdir_project_map.values()) | ||
| 2242 | if not work_items: | ||
| 2243 | continue | ||
| 2244 | |||
| 2245 | jobs = max(1, min(opt.jobs, len(work_items))) | ||
| 2246 | callback = functools.partial( | ||
| 2247 | self._ProcessSyncInterleavedResults, | ||
| 2248 | synced_relpaths, | ||
| 2249 | err_event, | ||
| 2250 | errors, | ||
| 2251 | opt, | ||
| 2252 | ) | ||
| 2253 | if not self.ExecuteInParallel( | ||
| 2254 | jobs, | ||
| 2255 | functools.partial(self._SyncProjectList, opt), | ||
| 2256 | work_items, | ||
| 2257 | callback=callback, | ||
| 2258 | output=pm, | ||
| 2259 | chunksize=1, | ||
| 2260 | ): | ||
| 2261 | err_event.set() | ||
| 2262 | |||
| 2263 | if err_event.is_set() and opt.fail_fast: | ||
| 2264 | raise SyncFailFastError(aggregate_errors=errors) | ||
| 2265 | |||
| 2266 | self._ReloadManifest(None, manifest) | ||
| 2267 | project_list = self.GetProjects( | ||
| 2268 | args, | ||
| 2269 | missing_ok=True, | ||
| 2270 | submodules_ok=opt.fetch_submodules, | ||
| 2271 | manifest=manifest, | ||
| 2272 | all_manifests=not opt.this_manifest_only, | ||
| 2273 | ) | ||
| 2274 | finally: | ||
| 2275 | sync_event.set() | ||
| 2276 | sync_progress_thread.join() | ||
| 2277 | |||
| 2278 | pm.end() | ||
| 2279 | |||
| 2280 | if err_event.is_set(): | ||
| 2281 | logger.error( | ||
| 2282 | "error: Unable to fully sync the tree in interleaved mode." | ||
| 2283 | ) | ||
| 2284 | raise SyncError(aggregate_errors=errors) | ||
| 2030 | 2285 | ||
| 2031 | 2286 | ||
| 2032 | def _PostRepoUpgrade(manifest, quiet=False): | 2287 | def _PostRepoUpgrade(manifest, quiet=False): | 
| diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index b871317c..60f283af 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
| @@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase): | |||
| 305 | 305 | ||
| 306 | 306 | ||
| 307 | class FakeProject: | 307 | class FakeProject: | 
| 308 | def __init__(self, relpath): | 308 | def __init__(self, relpath, name=None, objdir=None): | 
| 309 | self.relpath = relpath | 309 | self.relpath = relpath | 
| 310 | self.name = name or relpath | ||
| 311 | self.objdir = objdir or relpath | ||
| 310 | 312 | ||
| 311 | def __str__(self): | 313 | def __str__(self): | 
| 312 | return f"project: {self.relpath}" | 314 | return f"project: {self.relpath}" | 
| @@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase): | |||
| 513 | self.cmd.Execute(self.opt, []) | 515 | self.cmd.Execute(self.opt, []) | 
| 514 | self.assertIn(self.sync_local_half_error, e.aggregate_errors) | 516 | self.assertIn(self.sync_local_half_error, e.aggregate_errors) | 
| 515 | self.assertIn(self.sync_network_half_error, e.aggregate_errors) | 517 | self.assertIn(self.sync_network_half_error, e.aggregate_errors) | 
| 518 | |||
| 519 | |||
| 520 | class InterleavedSyncTest(unittest.TestCase): | ||
| 521 | """Tests for interleaved sync.""" | ||
| 522 | |||
| 523 | def setUp(self): | ||
| 524 | """Set up a sync command with mocks.""" | ||
| 525 | self.repodir = tempfile.mkdtemp(".repo") | ||
| 526 | self.manifest = mock.MagicMock(repodir=self.repodir) | ||
| 527 | self.manifest.repoProject.LastFetch = time.time() | ||
| 528 | self.manifest.repoProject.worktree = self.repodir | ||
| 529 | self.manifest.manifestProject.worktree = self.repodir | ||
| 530 | self.manifest.IsArchive = False | ||
| 531 | self.manifest.CloneBundle = False | ||
| 532 | self.manifest.default.sync_j = 1 | ||
| 533 | |||
| 534 | self.cmd = sync.Sync(manifest=self.manifest) | ||
| 535 | self.cmd.outer_manifest = self.manifest | ||
| 536 | |||
| 537 | # Mock projects. | ||
| 538 | self.projA = FakeProject("projA", objdir="objA") | ||
| 539 | self.projB = FakeProject("projB", objdir="objB") | ||
| 540 | self.projA_sub = FakeProject( | ||
| 541 | "projA/sub", name="projA_sub", objdir="objA_sub" | ||
| 542 | ) | ||
| 543 | self.projC = FakeProject("projC", objdir="objC") | ||
| 544 | |||
| 545 | # Mock methods that are not part of the core interleaved sync logic. | ||
| 546 | mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start() | ||
| 547 | mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start() | ||
| 548 | mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start() | ||
| 549 | mock.patch.object(sync, "_PostRepoUpgrade").start() | ||
| 550 | mock.patch.object(sync, "_PostRepoFetch").start() | ||
| 551 | |||
| 552 | def tearDown(self): | ||
| 553 | """Clean up resources.""" | ||
| 554 | shutil.rmtree(self.repodir) | ||
| 555 | mock.patch.stopall() | ||
| 556 | |||
| 557 | def test_interleaved_fail_fast(self): | ||
| 558 | """Test that --fail-fast is respected in interleaved mode.""" | ||
| 559 | opt, args = self.cmd.OptionParser.parse_args( | ||
| 560 | ["--interleaved", "--fail-fast", "-j2"] | ||
| 561 | ) | ||
| 562 | opt.quiet = True | ||
| 563 | |||
| 564 | # With projA/sub, _SafeCheckoutOrder creates two batches: | ||
| 565 | # 1. [projA, projB] | ||
| 566 | # 2. [projA/sub] | ||
| 567 | # We want to fail on the first batch and ensure the second isn't run. | ||
| 568 | all_projects = [self.projA, self.projB, self.projA_sub] | ||
| 569 | mock.patch.object( | ||
| 570 | self.cmd, "GetProjects", return_value=all_projects | ||
| 571 | ).start() | ||
| 572 | |||
| 573 | # Mock ExecuteInParallel to simulate a failed run on the first batch of | ||
| 574 | # projects. | ||
| 575 | execute_mock = mock.patch.object( | ||
| 576 | self.cmd, "ExecuteInParallel", return_value=False | ||
| 577 | ).start() | ||
| 578 | |||
| 579 | with self.assertRaises(sync.SyncFailFastError): | ||
| 580 | self.cmd._SyncInterleaved( | ||
| 581 | opt, | ||
| 582 | args, | ||
| 583 | [], | ||
| 584 | self.manifest, | ||
| 585 | self.manifest.manifestProject, | ||
| 586 | all_projects, | ||
| 587 | {}, | ||
| 588 | ) | ||
| 589 | |||
| 590 | execute_mock.assert_called_once() | ||
| 591 | |||
| 592 | def test_interleaved_shared_objdir_serial(self): | ||
| 593 | """Test that projects with shared objdir are processed serially.""" | ||
| 594 | opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"]) | ||
| 595 | opt.quiet = True | ||
| 596 | |||
| 597 | # Setup projects with a shared objdir. | ||
| 598 | self.projA.objdir = "common_objdir" | ||
| 599 | self.projC.objdir = "common_objdir" | ||
| 600 | |||
| 601 | all_projects = [self.projA, self.projB, self.projC] | ||
| 602 | mock.patch.object( | ||
| 603 | self.cmd, "GetProjects", return_value=all_projects | ||
| 604 | ).start() | ||
| 605 | |||
| 606 | def execute_side_effect(jobs, target, work_items, **kwargs): | ||
| 607 | # The callback is a partial object. The first arg is the set we | ||
| 608 | # need to update to avoid the stall detection. | ||
| 609 | synced_relpaths_set = kwargs["callback"].args[0] | ||
| 610 | projects_in_pass = self.cmd.get_parallel_context()["projects"] | ||
| 611 | for item in work_items: | ||
| 612 | for project_idx in item: | ||
| 613 | synced_relpaths_set.add( | ||
| 614 | projects_in_pass[project_idx].relpath | ||
| 615 | ) | ||
| 616 | return True | ||
| 617 | |||
| 618 | execute_mock = mock.patch.object( | ||
| 619 | self.cmd, "ExecuteInParallel", side_effect=execute_side_effect | ||
| 620 | ).start() | ||
| 621 | |||
| 622 | self.cmd._SyncInterleaved( | ||
| 623 | opt, | ||
| 624 | args, | ||
| 625 | [], | ||
| 626 | self.manifest, | ||
| 627 | self.manifest.manifestProject, | ||
| 628 | all_projects, | ||
| 629 | {}, | ||
| 630 | ) | ||
| 631 | |||
| 632 | execute_mock.assert_called_once() | ||
| 633 | jobs_arg, _, work_items = execute_mock.call_args.args | ||
| 634 | self.assertEqual(jobs_arg, 2) | ||
| 635 | work_items_sets = {frozenset(item) for item in work_items} | ||
| 636 | expected_sets = {frozenset([0, 2]), frozenset([1])} | ||
| 637 | self.assertEqual(work_items_sets, expected_sets) | ||
