diff options
| author | Gavin Mak <gavinmak@google.com> | 2025-06-17 10:54:41 -0700 | 
|---|---|---|
| committer | LUCI <gerrit-scoped@luci-project-accounts.iam.gserviceaccount.com> | 2025-06-17 16:13:36 -0700 | 
| commit | b4b323a8bd02d52d060f7f6fa15ba045df5af5b2 (patch) | |
| tree | c8a5d836db5ca1b4d3532c3ae995005081ff2cb1 /tests | |
| parent | f91f4462e6365b5545b39be597dab23619b8d291 (diff) | |
| download | git-repo-b4b323a8bd02d52d060f7f6fa15ba045df5af5b2.tar.gz | |
sync: Add orchestration logic for --interleaved
Introduce the parallel orchestration framework for `repo sync
--interleaved`.
The new logic respects project dependencies by processing them in
hierarchical levels. Projects sharing a git object directory are grouped
and processed serially. Also reuse the familiar fetch progress bar UX.
Bug: 421935613
Change-Id: Ia388a231fa96b3220e343f952f07021bc9817d19
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/483281
Commit-Queue: Gavin Mak <gavinmak@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/test_subcmds_sync.py | 124 | 
1 files changed, 123 insertions, 1 deletions
| diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index b871317c..60f283af 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py | |||
| @@ -305,8 +305,10 @@ class LocalSyncState(unittest.TestCase): | |||
| 305 | 305 | ||
| 306 | 306 | ||
| 307 | class FakeProject: | 307 | class FakeProject: | 
| 308 | def __init__(self, relpath): | 308 | def __init__(self, relpath, name=None, objdir=None): | 
| 309 | self.relpath = relpath | 309 | self.relpath = relpath | 
| 310 | self.name = name or relpath | ||
| 311 | self.objdir = objdir or relpath | ||
| 310 | 312 | ||
| 311 | def __str__(self): | 313 | def __str__(self): | 
| 312 | return f"project: {self.relpath}" | 314 | return f"project: {self.relpath}" | 
| @@ -513,3 +515,123 @@ class SyncCommand(unittest.TestCase): | |||
| 513 | self.cmd.Execute(self.opt, []) | 515 | self.cmd.Execute(self.opt, []) | 
| 514 | self.assertIn(self.sync_local_half_error, e.aggregate_errors) | 516 | self.assertIn(self.sync_local_half_error, e.aggregate_errors) | 
| 515 | self.assertIn(self.sync_network_half_error, e.aggregate_errors) | 517 | self.assertIn(self.sync_network_half_error, e.aggregate_errors) | 
| 518 | |||
| 519 | |||
| 520 | class InterleavedSyncTest(unittest.TestCase): | ||
| 521 | """Tests for interleaved sync.""" | ||
| 522 | |||
| 523 | def setUp(self): | ||
| 524 | """Set up a sync command with mocks.""" | ||
| 525 | self.repodir = tempfile.mkdtemp(".repo") | ||
| 526 | self.manifest = mock.MagicMock(repodir=self.repodir) | ||
| 527 | self.manifest.repoProject.LastFetch = time.time() | ||
| 528 | self.manifest.repoProject.worktree = self.repodir | ||
| 529 | self.manifest.manifestProject.worktree = self.repodir | ||
| 530 | self.manifest.IsArchive = False | ||
| 531 | self.manifest.CloneBundle = False | ||
| 532 | self.manifest.default.sync_j = 1 | ||
| 533 | |||
| 534 | self.cmd = sync.Sync(manifest=self.manifest) | ||
| 535 | self.cmd.outer_manifest = self.manifest | ||
| 536 | |||
| 537 | # Mock projects. | ||
| 538 | self.projA = FakeProject("projA", objdir="objA") | ||
| 539 | self.projB = FakeProject("projB", objdir="objB") | ||
| 540 | self.projA_sub = FakeProject( | ||
| 541 | "projA/sub", name="projA_sub", objdir="objA_sub" | ||
| 542 | ) | ||
| 543 | self.projC = FakeProject("projC", objdir="objC") | ||
| 544 | |||
| 545 | # Mock methods that are not part of the core interleaved sync logic. | ||
| 546 | mock.patch.object(self.cmd, "_UpdateAllManifestProjects").start() | ||
| 547 | mock.patch.object(self.cmd, "_UpdateProjectsRevisionId").start() | ||
| 548 | mock.patch.object(self.cmd, "_ValidateOptionsWithManifest").start() | ||
| 549 | mock.patch.object(sync, "_PostRepoUpgrade").start() | ||
| 550 | mock.patch.object(sync, "_PostRepoFetch").start() | ||
| 551 | |||
| 552 | def tearDown(self): | ||
| 553 | """Clean up resources.""" | ||
| 554 | shutil.rmtree(self.repodir) | ||
| 555 | mock.patch.stopall() | ||
| 556 | |||
| 557 | def test_interleaved_fail_fast(self): | ||
| 558 | """Test that --fail-fast is respected in interleaved mode.""" | ||
| 559 | opt, args = self.cmd.OptionParser.parse_args( | ||
| 560 | ["--interleaved", "--fail-fast", "-j2"] | ||
| 561 | ) | ||
| 562 | opt.quiet = True | ||
| 563 | |||
| 564 | # With projA/sub, _SafeCheckoutOrder creates two batches: | ||
| 565 | # 1. [projA, projB] | ||
| 566 | # 2. [projA/sub] | ||
| 567 | # We want to fail on the first batch and ensure the second isn't run. | ||
| 568 | all_projects = [self.projA, self.projB, self.projA_sub] | ||
| 569 | mock.patch.object( | ||
| 570 | self.cmd, "GetProjects", return_value=all_projects | ||
| 571 | ).start() | ||
| 572 | |||
| 573 | # Mock ExecuteInParallel to simulate a failed run on the first batch of | ||
| 574 | # projects. | ||
| 575 | execute_mock = mock.patch.object( | ||
| 576 | self.cmd, "ExecuteInParallel", return_value=False | ||
| 577 | ).start() | ||
| 578 | |||
| 579 | with self.assertRaises(sync.SyncFailFastError): | ||
| 580 | self.cmd._SyncInterleaved( | ||
| 581 | opt, | ||
| 582 | args, | ||
| 583 | [], | ||
| 584 | self.manifest, | ||
| 585 | self.manifest.manifestProject, | ||
| 586 | all_projects, | ||
| 587 | {}, | ||
| 588 | ) | ||
| 589 | |||
| 590 | execute_mock.assert_called_once() | ||
| 591 | |||
| 592 | def test_interleaved_shared_objdir_serial(self): | ||
| 593 | """Test that projects with shared objdir are processed serially.""" | ||
| 594 | opt, args = self.cmd.OptionParser.parse_args(["--interleaved", "-j4"]) | ||
| 595 | opt.quiet = True | ||
| 596 | |||
| 597 | # Setup projects with a shared objdir. | ||
| 598 | self.projA.objdir = "common_objdir" | ||
| 599 | self.projC.objdir = "common_objdir" | ||
| 600 | |||
| 601 | all_projects = [self.projA, self.projB, self.projC] | ||
| 602 | mock.patch.object( | ||
| 603 | self.cmd, "GetProjects", return_value=all_projects | ||
| 604 | ).start() | ||
| 605 | |||
| 606 | def execute_side_effect(jobs, target, work_items, **kwargs): | ||
| 607 | # The callback is a partial object. The first arg is the set we | ||
| 608 | # need to update to avoid the stall detection. | ||
| 609 | synced_relpaths_set = kwargs["callback"].args[0] | ||
| 610 | projects_in_pass = self.cmd.get_parallel_context()["projects"] | ||
| 611 | for item in work_items: | ||
| 612 | for project_idx in item: | ||
| 613 | synced_relpaths_set.add( | ||
| 614 | projects_in_pass[project_idx].relpath | ||
| 615 | ) | ||
| 616 | return True | ||
| 617 | |||
| 618 | execute_mock = mock.patch.object( | ||
| 619 | self.cmd, "ExecuteInParallel", side_effect=execute_side_effect | ||
| 620 | ).start() | ||
| 621 | |||
| 622 | self.cmd._SyncInterleaved( | ||
| 623 | opt, | ||
| 624 | args, | ||
| 625 | [], | ||
| 626 | self.manifest, | ||
| 627 | self.manifest.manifestProject, | ||
| 628 | all_projects, | ||
| 629 | {}, | ||
| 630 | ) | ||
| 631 | |||
| 632 | execute_mock.assert_called_once() | ||
| 633 | jobs_arg, _, work_items = execute_mock.call_args.args | ||
| 634 | self.assertEqual(jobs_arg, 2) | ||
| 635 | work_items_sets = {frozenset(item) for item in work_items} | ||
| 636 | expected_sets = {frozenset([0, 2]), frozenset([1])} | ||
| 637 | self.assertEqual(work_items_sets, expected_sets) | ||
