From 5dde97fb4dc0429883f72790702a151e8e2f6880 Mon Sep 17 00:00:00 2001 From: Mathieu Leplatre Date: Mon, 12 Jan 2026 14:05:45 +0100 Subject: [PATCH] Create initial branches with all changesets --- cronjobs/src/commands/git_export.py | 171 +++++++++++++++------ cronjobs/tests/commands/test_git_export.py | 78 +++++++++- 2 files changed, 191 insertions(+), 58 deletions(-) diff --git a/cronjobs/src/commands/git_export.py b/cronjobs/src/commands/git_export.py index 54c877d1..9d90ae9b 100644 --- a/cronjobs/src/commands/git_export.py +++ b/cronjobs/src/commands/git_export.py @@ -344,14 +344,33 @@ async def repo_sync_content( changed_branches.add(common_branch_name) created_tags.append(common_tag_name) - # Now process each collection changeset, creating/updating branches and tags accordingly. - changeset_changed_branches, changeset_created_tags = process_collections( - repo, - author=author, - committer=committer, - changesets=all_changesets, - is_first_run=common_base_tree is None, + # Process from oldest changeset to newest so that commits of the bucket branch + # are sorted chronologically. + # If a collection was resigned it should come after one that hasn't changed. + sorted_changesets = sorted( + all_changesets, key=lambda cs: cs["metadata"]["last_modified"] ) + changesets_by_bucket: dict[str, list[dict[str, Any]]] = {} + for changeset in sorted_changesets: + bid = changeset["metadata"]["bucket"] + changesets_by_bucket.setdefault(bid, []).append(changeset) + + if common_base_tree is None: + # First run, initialize all bucket branches. + changeset_changed_branches, changeset_created_tags = initialize_bucket_branches( + repo, + author=author, + committer=committer, + changesets_by_bucket=changesets_by_bucket, + ) + else: + # Now process each collection changeset, creating/updating branches and tags accordingly. + changeset_changed_branches, changeset_created_tags = update_bucket_branches( + repo, + author=author, + committer=committer, + changesets_by_bucket=changesets_by_bucket, + ) changed_branches.update(changeset_changed_branches) created_tags += changeset_created_tags @@ -485,66 +504,113 @@ def process_attachments( return changed_attachments, common_content -def process_collections( +def changeset_to_branch_folder(changeset: dict[str, Any]) -> list[tuple[str, bytes]]: + """ + Convert a changeset to a list of files to be stored in the corresponding branch folder. + """ + # Create one blob per record. + cid = changeset["metadata"]["id"] + branch_content = [(f"{cid}/metadata.json", json_dumpb(changeset["metadata"]))] + records = sorted(changeset["changes"], key=lambda r: r["id"]) + for record in records: + branch_content.append((f"{cid}/{record['id']}.json", json_dumpb(record))) + return branch_content + + +def initialize_bucket_branches( repo, author: pygit2.Signature, committer: pygit2.Signature, - changesets: list[dict[str, Any]], - is_first_run: bool, -) -> tuple[list[str], list[str]]: + changesets_by_bucket: dict[str, list[dict[str, Any]]], +) -> tuple[set[str], list[str]]: """ - Process the given changesets and create/update branches and tags accordingly. - Return the list of changed branches and created tags. + Initialize the bucket branches and tags from the given changesets. + Return the set of created branches and the list of created tags. """ - changed_branches: set[str] = set() + created_changes: set[str] = set() created_tags: list[str] = [] + # On first run, we process all changesets to create the initial branches and tags. + # Each branch will all records of all collections of the related bucket. + for bid, bucket_changesets in changesets_by_bucket.items(): + branch_content: list[tuple[str, bytes]] = [] + for changeset in bucket_changesets: + branch_content += changeset_to_branch_folder(changeset) + + # Bucket branch does not exist yet, create it as an empty branch. + empty_tree_id = repo.TreeBuilder().write() + branch_tree = repo.get(empty_tree_id) + branch_tree_id = tree_upsert_blobs(repo, branch_content, base_tree=branch_tree) - # Create all necessary bucket branches. - # This is only necessary when running git-export for the first time, - # or during tests. In normal operation, bucket branches already exist. - for changeset in changesets: - bid = changeset["metadata"]["bucket"] branch_refname = f"refs/heads/{GIT_REF_PREFIX}buckets/{bid}" - try: - repo.lookup_reference(branch_refname) - except KeyError: - # Bucket branch does not exist yet, create it as an empty branch. - empty_tree_id = repo.TreeBuilder().write() - commit_oid = repo.create_commit( - branch_refname, + commit_oid = repo.create_commit( + branch_refname, + author, + committer, + f"Initialize bucket branch {bid}", + branch_tree_id, + [], # No parents. + ) + print(f"Created bucket branch {branch_refname} at {commit_oid}") + created_changes.add(branch_refname) + + # We add all tags on this initial commit. + for changeset in bucket_changesets: + cid = changeset["metadata"]["id"] + timestamp = changeset["timestamp"] + tag_name = f"{GIT_REF_PREFIX}timestamps/{bid}/{cid}/{timestamp}" + tag_refname = f"refs/tags/{tag_name}" + if repo.references.get(tag_refname) is not None: + repo.references.delete(tag_refname) + repo.create_tag( + tag_name, + commit_oid, + pygit2.GIT_OBJECT_COMMIT, author, - committer, - f"Initialize bucket branch {bid}", - empty_tree_id, - [], + f"Initial tag for {bid}/{cid}@{timestamp}", ) - print(f"Created bucket branch {branch_refname} at {commit_oid}") + print(f"Created tag {tag_name} at {commit_oid}") + created_tags.append(tag_name) - for changeset in changesets: - bid = changeset["metadata"]["bucket"] - cid = changeset["metadata"]["id"] - timestamp = changeset["timestamp"] - branch_refname = f"refs/heads/{GIT_REF_PREFIX}buckets/{bid}" - dtcollection = ts2dt(timestamp).isoformat() - commit_message = f"{bid}/{cid}@{timestamp} ({dtcollection})" + return created_changes, created_tags + + +def update_bucket_branches( + repo, + author: pygit2.Signature, + committer: pygit2.Signature, + changesets_by_bucket: dict[str, list[dict[str, Any]]], +) -> tuple[list[str], list[str]]: + """ + Process the given changesets and create/update branches and tags accordingly. + Return the list of changed branches and created tags. + """ + changed_branches: set[str] = set() + created_tags: list[str] = [] - # Find the bucket branch (changesets are not ordered by bucket) + # In the next runs, we only add commits on top of the existing branches with the + # changed data. + for bid, bucket_changesets in changesets_by_bucket.items(): + # Find the bucket branch tip and base tree. + branch_refname = f"refs/heads/{GIT_REF_PREFIX}buckets/{bid}" branch_tip = repo.lookup_reference(branch_refname).target - base_tree = repo.get(branch_tip).tree + branch_tree = repo.get(branch_tip).tree parents = [branch_tip] - # Create one blob per record. We start a new tree for this collection - # anything from previous commits is lost. - branch_content = [(f"{cid}/metadata.json", json_dumpb(changeset["metadata"]))] - records = sorted(changeset["changes"], key=lambda r: r["id"]) - for record in records: - branch_content.append((f"{cid}/{record['id']}.json", json_dumpb(record))) + for changeset in bucket_changesets: + cid = changeset["metadata"]["id"] - files_tree_id = tree_upsert_blobs(repo, branch_content, base_tree=base_tree) + timestamp = changeset["timestamp"] + dtcollection = ts2dt(timestamp).isoformat() + commit_message = f"{bid}/{cid}@{timestamp} ({dtcollection})" + + branch_content = changeset_to_branch_folder(changeset) + files_tree_id = tree_upsert_blobs( + repo, branch_content, base_tree=branch_tree + ) + if files_tree_id == branch_tree.id: + print(f"No changes for {bid}/{cid} branch, skipping.") + continue - if base_tree is not None and files_tree_id == base_tree.id: - print(f"No changes for {bid}/{cid} branch, skipping commit.") - else: # Commit and tag. # If the tag already exists (that happens when records don't change but metadata does), # we move it to the new commit. @@ -564,6 +630,11 @@ def process_collections( if tag_moved: created_tags.append(tag_name) + # Next collection will be put on top of the branch. + new_tip = repo.lookup_reference(branch_refname).target + branch_tree = repo.get(new_tip).tree + parents = [new_tip] + return changed_branches, created_tags diff --git a/cronjobs/tests/commands/test_git_export.py b/cronjobs/tests/commands/test_git_export.py index fa13297e..50bd0eb5 100644 --- a/cronjobs/tests/commands/test_git_export.py +++ b/cronjobs/tests/commands/test_git_export.py @@ -110,6 +110,11 @@ def mock_rs_server_content(): "bucket": "bid2", "collection": "cid2", }, + { + "last_modified": 1500000000000, + "bucket": "bid2", + "collection": "cid3", + }, ], }, ) @@ -125,6 +130,7 @@ def mock_rs_server_content(): "signature": { "x5u": "https://autograph.example.com/keys/123", }, + "last_modified": 1777777777000, }, "changes": [ { @@ -147,6 +153,7 @@ def mock_rs_server_content(): "signature": { "x5u": "https://autograph.example.com/keys/123", }, + "last_modified": 16666666666000, }, "changes": [ { @@ -162,6 +169,22 @@ def mock_rs_server_content(): ], }, ) + responses.add( + responses.GET, + "http://testserver:9999/v1/buckets/bid2/collections/cid3/changeset", + json={ + "timestamp": 1500000000000, + "metadata": { + "bucket": "bid2", + "id": "cid3", + "signature": { + "x5u": "https://autograph.example.com/keys/123", + }, + "last_modified": 15555555555000, + }, + "changes": [], + }, + ) responses.add( responses.GET, @@ -197,11 +220,15 @@ def read_file(repo, branch, filepath): def init_fake_repo(path): repo = pygit2.init_repository(path, bare=True, initial_head="main") repo.remotes.create("origin", git_export.GIT_REMOTE_URL) + return repo + + +def create_branch_with_empty_commit(repo, branch_name): author = pygit2.Signature("Test User", "test@example.com") builder = repo.TreeBuilder() tree = builder.write() commit_id = repo.create_commit( - "HEAD", # reference name + branch_name, # reference name author, # author author, # committer "initial commit", @@ -210,12 +237,10 @@ def init_fake_repo(path): ) commit = repo[commit_id] - repo.branches.local.create("v1/common", commit) - refname = "refs/remotes/origin/v1/common" + repo.branches.local.create(branch_name, commit) + refname = f"refs/remotes/origin/{branch_name}" repo.references.create(refname, commit.id) - - repo.set_head("refs/heads/v1/common") - return repo + repo.set_head(f"refs/heads/{branch_name}") def simulate_pushed(repo, mock_ls_remotes): @@ -288,7 +313,7 @@ def test_repo_sync_content_starts_from_scratch_if_no_previous_run( mock_git_fetch.assert_called_once() stdout = capsys.readouterr().out assert "No previous tags found" in stdout - assert "2 collections changed" in stdout + assert "3 collections changed" in stdout (args, _) = mock_git_push.call_args_list[0] assert args == ( @@ -298,10 +323,24 @@ def test_repo_sync_content_starts_from_scratch_if_no_previous_run( "+refs/heads/v1/common:refs/heads/v1/common", "+refs/tags/v1/timestamps/bid1/cid1/1700000000000:refs/tags/v1/timestamps/bid1/cid1/1700000000000", "+refs/tags/v1/timestamps/bid2/cid2/1600000000000:refs/tags/v1/timestamps/bid2/cid2/1600000000000", + "+refs/tags/v1/timestamps/bid2/cid3/1500000000000:refs/tags/v1/timestamps/bid2/cid3/1500000000000", "+refs/tags/v1/timestamps/common/1700000000000:refs/tags/v1/timestamps/common/1700000000000", ], ) + # Verify that all collections tags point to the same commit (initial commit with all collections) + all_timestamps_tags = set( + repo.lookup_reference(tag).peel().id # commit id of the tag + for tag in repo.listall_references() + if tag.startswith("refs/tags/v1/timestamps/bid2") + ) + assert len(all_timestamps_tags) == 1 + + # Verify that branch root contains all collections folders. + tree = repo.lookup_reference("refs/heads/v1/buckets/bid2").peel().tree + assert "cid2" in tree + assert "cid3" in tree + @responses.activate def test_repo_sync_does_nothing_if_up_to_date( @@ -314,6 +353,10 @@ def test_repo_sync_does_nothing_if_up_to_date( mock_github_lfs, mock_git_push, ): + create_branch_with_empty_commit(repo, "v1/common") + create_branch_with_empty_commit(repo, "v1/buckets/bid1") + create_branch_with_empty_commit(repo, "v1/buckets/bid2") + git_export.git_export(None, None) simulate_pushed(repo, mock_ls_remotes) capsys.readouterr() # Clear previous output @@ -338,6 +381,10 @@ def test_repo_sync_can_be_forced_even_if_up_to_date( mock_github_lfs, mock_git_push, ): + create_branch_with_empty_commit(repo, "v1/common") + create_branch_with_empty_commit(repo, "v1/buckets/bid1") + create_branch_with_empty_commit(repo, "v1/buckets/bid2") + git_export.git_export(None, None) simulate_pushed(repo, mock_ls_remotes) capsys.readouterr() # Clear previous output @@ -361,6 +408,10 @@ def test_repo_sync_content_uses_previous_run_to_fetch_changes( mock_github_lfs, mock_git_push, ): + create_branch_with_empty_commit(repo, "v1/common") + create_branch_with_empty_commit(repo, "v1/buckets/bid1") + create_branch_with_empty_commit(repo, "v1/buckets/bid2") + repo.create_tag( "v1/timestamps/common/1600000000000", repo.lookup_reference("refs/heads/v1/common").target, @@ -405,6 +456,10 @@ def test_repo_sync_content_ignores_previous_run_if_forced( mock_github_lfs, mock_git_push, ): + create_branch_with_empty_commit(repo, "v1/common") + create_branch_with_empty_commit(repo, "v1/buckets/bid1") + create_branch_with_empty_commit(repo, "v1/buckets/bid2") + repo.create_tag( "v1/timestamps/common/1600000000000", repo.lookup_reference("refs/heads/v1/common").target, @@ -421,7 +476,7 @@ def test_repo_sync_content_ignores_previous_run_if_forced( stdout = capsys.readouterr().out assert "Found latest tag: 1600000000000. Ignoring (forced)" in stdout - assert "2 collections changed" in stdout + assert "3 collections changed" in stdout git_export.FORCE = False @@ -579,6 +634,7 @@ def test_repo_syncs_attachment_bundles( "signature": { "x5u": "https://autograph.example.com/keys/123", }, + "last_modified": 188888888880000, }, "changes": [ { @@ -627,6 +683,7 @@ def test_attachment_bundles_is_skipped_if_no_attachment_in_changeset( "signature": { "x5u": "https://autograph.example.com/keys/123", }, + "last_modified": 188888888880000, }, "changes": [{"id": "rid1-1", "last_modified": 1800000000000}], }, @@ -689,6 +746,10 @@ def test_repo_is_resetted_to_local_content_on_error( mock_git_push, mock_ls_remotes, ): + create_branch_with_empty_commit(repo, "v1/common") + create_branch_with_empty_commit(repo, "v1/buckets/bid1") + create_branch_with_empty_commit(repo, "v1/buckets/bid2") + git_export.git_export(None, None) simulate_pushed(repo, mock_ls_remotes) @@ -727,6 +788,7 @@ def test_repo_is_resetted_to_local_content_on_error( "signature": { "x5u": "https://autograph.example.com/keys/123", }, + "last_modified": 188888888880000, }, "changes": [], },