Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 121 additions & 50 deletions cronjobs/src/commands/git_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,33 @@ async def repo_sync_content(
changed_branches.add(common_branch_name)
created_tags.append(common_tag_name)

# Now process each collection changeset, creating/updating branches and tags accordingly.
changeset_changed_branches, changeset_created_tags = process_collections(
repo,
author=author,
committer=committer,
changesets=all_changesets,
is_first_run=common_base_tree is None,
# Process from oldest changeset to newest so that commits of the bucket branch
# are sorted chronologically.
# If a collection was resigned it should come after one that hasn't changed.
sorted_changesets = sorted(
all_changesets, key=lambda cs: cs["metadata"]["last_modified"]
)
changesets_by_bucket: dict[str, list[dict[str, Any]]] = {}
for changeset in sorted_changesets:
bid = changeset["metadata"]["bucket"]
changesets_by_bucket.setdefault(bid, []).append(changeset)

if common_base_tree is None:
# First run, initialize all bucket branches.
changeset_changed_branches, changeset_created_tags = initialize_bucket_branches(
repo,
author=author,
committer=committer,
changesets_by_bucket=changesets_by_bucket,
)
else:
# Now process each collection changeset, creating/updating branches and tags accordingly.
changeset_changed_branches, changeset_created_tags = update_bucket_branches(
repo,
author=author,
committer=committer,
changesets_by_bucket=changesets_by_bucket,
)

changed_branches.update(changeset_changed_branches)
created_tags += changeset_created_tags
Expand Down Expand Up @@ -485,66 +504,113 @@ def process_attachments(
return changed_attachments, common_content


def process_collections(
def changeset_to_branch_folder(changeset: dict[str, Any]) -> list[tuple[str, bytes]]:
"""
Convert a changeset to a list of files to be stored in the corresponding branch folder.
"""
# Create one blob per record.
cid = changeset["metadata"]["id"]
branch_content = [(f"{cid}/metadata.json", json_dumpb(changeset["metadata"]))]
records = sorted(changeset["changes"], key=lambda r: r["id"])
for record in records:
branch_content.append((f"{cid}/{record['id']}.json", json_dumpb(record)))
return branch_content


def initialize_bucket_branches(
repo,
author: pygit2.Signature,
committer: pygit2.Signature,
changesets: list[dict[str, Any]],
is_first_run: bool,
) -> tuple[list[str], list[str]]:
changesets_by_bucket: dict[str, list[dict[str, Any]]],
) -> tuple[set[str], list[str]]:
"""
Process the given changesets and create/update branches and tags accordingly.
Return the list of changed branches and created tags.
Initialize the bucket branches and tags from the given changesets.
Return the set of created branches and the list of created tags.
"""
changed_branches: set[str] = set()
created_changes: set[str] = set()
created_tags: list[str] = []
# On first run, we process all changesets to create the initial branches and tags.
# Each branch will all records of all collections of the related bucket.
for bid, bucket_changesets in changesets_by_bucket.items():
branch_content: list[tuple[str, bytes]] = []
for changeset in bucket_changesets:
branch_content += changeset_to_branch_folder(changeset)

# Bucket branch does not exist yet, create it as an empty branch.
empty_tree_id = repo.TreeBuilder().write()
branch_tree = repo.get(empty_tree_id)
branch_tree_id = tree_upsert_blobs(repo, branch_content, base_tree=branch_tree)

# Create all necessary bucket branches.
# This is only necessary when running git-export for the first time,
# or during tests. In normal operation, bucket branches already exist.
for changeset in changesets:
bid = changeset["metadata"]["bucket"]
branch_refname = f"refs/heads/{GIT_REF_PREFIX}buckets/{bid}"
try:
repo.lookup_reference(branch_refname)
except KeyError:
# Bucket branch does not exist yet, create it as an empty branch.
empty_tree_id = repo.TreeBuilder().write()
commit_oid = repo.create_commit(
branch_refname,
commit_oid = repo.create_commit(
branch_refname,
author,
committer,
f"Initialize bucket branch {bid}",
branch_tree_id,
[], # No parents.
)
print(f"Created bucket branch {branch_refname} at {commit_oid}")
created_changes.add(branch_refname)

# We add all tags on this initial commit.
for changeset in bucket_changesets:
cid = changeset["metadata"]["id"]
timestamp = changeset["timestamp"]
tag_name = f"{GIT_REF_PREFIX}timestamps/{bid}/{cid}/{timestamp}"
tag_refname = f"refs/tags/{tag_name}"
if repo.references.get(tag_refname) is not None:
repo.references.delete(tag_refname)
repo.create_tag(
tag_name,
commit_oid,
pygit2.GIT_OBJECT_COMMIT,
author,
committer,
f"Initialize bucket branch {bid}",
empty_tree_id,
[],
f"Initial tag for {bid}/{cid}@{timestamp}",
)
print(f"Created bucket branch {branch_refname} at {commit_oid}")
print(f"Created tag {tag_name} at {commit_oid}")
created_tags.append(tag_name)

for changeset in changesets:
bid = changeset["metadata"]["bucket"]
cid = changeset["metadata"]["id"]
timestamp = changeset["timestamp"]
branch_refname = f"refs/heads/{GIT_REF_PREFIX}buckets/{bid}"
dtcollection = ts2dt(timestamp).isoformat()
commit_message = f"{bid}/{cid}@{timestamp} ({dtcollection})"
return created_changes, created_tags


def update_bucket_branches(
repo,
author: pygit2.Signature,
committer: pygit2.Signature,
changesets_by_bucket: dict[str, list[dict[str, Any]]],
) -> tuple[list[str], list[str]]:
"""
Process the given changesets and create/update branches and tags accordingly.
Return the list of changed branches and created tags.
"""
changed_branches: set[str] = set()
created_tags: list[str] = []

# Find the bucket branch (changesets are not ordered by bucket)
# In the next runs, we only add commits on top of the existing branches with the
# changed data.
for bid, bucket_changesets in changesets_by_bucket.items():
# Find the bucket branch tip and base tree.
branch_refname = f"refs/heads/{GIT_REF_PREFIX}buckets/{bid}"
branch_tip = repo.lookup_reference(branch_refname).target
base_tree = repo.get(branch_tip).tree
branch_tree = repo.get(branch_tip).tree
parents = [branch_tip]

# Create one blob per record. We start a new tree for this collection
# anything from previous commits is lost.
branch_content = [(f"{cid}/metadata.json", json_dumpb(changeset["metadata"]))]
records = sorted(changeset["changes"], key=lambda r: r["id"])
for record in records:
branch_content.append((f"{cid}/{record['id']}.json", json_dumpb(record)))
for changeset in bucket_changesets:
cid = changeset["metadata"]["id"]

files_tree_id = tree_upsert_blobs(repo, branch_content, base_tree=base_tree)
timestamp = changeset["timestamp"]
dtcollection = ts2dt(timestamp).isoformat()
commit_message = f"{bid}/{cid}@{timestamp} ({dtcollection})"

branch_content = changeset_to_branch_folder(changeset)
files_tree_id = tree_upsert_blobs(
repo, branch_content, base_tree=branch_tree
)
if files_tree_id == branch_tree.id:
print(f"No changes for {bid}/{cid} branch, skipping.")
continue

if base_tree is not None and files_tree_id == base_tree.id:
print(f"No changes for {bid}/{cid} branch, skipping commit.")
else:
# Commit and tag.
# If the tag already exists (that happens when records don't change but metadata does),
# we move it to the new commit.
Expand All @@ -564,6 +630,11 @@ def process_collections(
if tag_moved:
created_tags.append(tag_name)

# Next collection will be put on top of the branch.
new_tip = repo.lookup_reference(branch_refname).target
branch_tree = repo.get(new_tip).tree
parents = [new_tip]

return changed_branches, created_tags


Expand Down
Loading
Loading