Skip to content

Commit

Permalink
feat(parse): all classes
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <johnandersen777@protonmail.com>
  • Loading branch information
johnandersen777 committed Dec 1, 2024
1 parent 814a0aa commit a3f5c64
Showing 1 changed file with 102 additions and 38 deletions.
140 changes: 102 additions & 38 deletions src/federation_git/push_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,53 @@
import warnings
import subprocess
import configparser
from typing import Optional
from typing import Optional, Union, Any

import yaml
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
import snoop

class CurrentUser(BaseModel):
email: str

class FederationGitRepo(BaseModel):
class ProtocolFederationGitRepo(BaseModel):
namespace: str
repo: str
name: str
group: bool = False
# If not specified, federate across all NS indexes
indexes: Optional[list[str]] = None

class FederationGitContext(BaseModel):
repos: list[FederationGitRepo]
class ProtocolFederationGit(BaseModel):
repos: list[ProtocolFederationGitRepo]

@field_validator("repos")
@classmethod
def parse_repos(cls, repos, _info):
return list(
[
repo
if isinstance(repo, ProtocolFederationGitRepo)
else ProtocolFederationGitRepo(**repo)
for repo in repos
]
)

class ProtocolFederation(BaseModel):
protocol: str
data: Any

class ProtocolIndexATProto(BaseModel):
handle: str
uri: str
cid: str

class ProtocolIndexGitHub(BaseModel):
owner: str

class PolicyIndex(BaseModel):
name: str
protocol: str
data: dict
data: Any

class PolicyDataNamespace(BaseModel):
indexes: list[PolicyIndex]
Expand All @@ -49,7 +74,8 @@ class PolicyData(BaseModel):
namespaces: dict[str, PolicyDataNamespace]
owners: list[Owner]
# List of federation entries from YAML
federation: list[dict]
# TODO Load class list dynamicly
federation: list[Any]

class Policy(BaseModel):
data: PolicyData
Expand All @@ -58,27 +84,39 @@ class Context(BaseModel):
current_user: CurrentUser
policy: Policy

def federation_git(ctx: Context, active: FederationGitContext):
def federation_git(ctx: Context, active: ProtocolFederationGit):
snoop.pp(ctx)

# Check if current user's email is in any of the owners' emails
owner_emails = [email for owner in ctx.policy.data.owners for email in owner.emails]
if ctx.current_user.email not in owner_emails:
print("Current user's email not in owners' emails.")
return
# No federation preformed
return False

# Indirect lookup of namespace name to owner email
current_user_namespaces = []
for owner in ctx.policy.data.owners:
if ctx.current_user.email in owner.emails:
current_user_namespaces.extend(owner.namespaces)

for repo in active.repos:
# TODO DEBUG REMOVE
snoop.pp(current_user_namespaces)
return

for repo in active.data.repos:
# TODO CHECK THIS LOGIC
if not repo.group or repo.namespace not in current_user_namespaces:
continue

# Perform git operations: clone, pull, push
print(f"Pushing to group repo: {repo.namespace}/{repo.repo}")
repo_dir = f"{repo.namespace}_{repo.repo}"
clone_url = f"git@github.com:{repo.namespace}/{repo.repo}.git" # Adjust as needed
print(f"Pushing to repo: {repo.namespace}/{repo.name}")

# TODO DEBUG REMOVE
continue

repo_dir = f"{repo.namespace}_{repo.name}"
clone_url = f"git@github.com:{repo.namespace}/{repo.name}.git" # Adjust as needed

# Clone the repository if it doesn't exist
if not os.path.isdir(repo_dir):
Expand All @@ -98,12 +136,12 @@ def federation_git(ctx: Context, active: FederationGitContext):
continue

# Push changes to the repository
print(f"Pushing changes to repository {repo.namespace}/{repo.repo}")
print(f"Pushing changes to repository {repo.namespace}/{repo.name}")
try:
subprocess.run(["git", "-C", repo_dir, "push"], check=True)
print(f"Successfully pushed to {repo.namespace}/{repo.repo}.")
print(f"Successfully pushed to {repo.namespace}/{repo.name}.")
except subprocess.CalledProcessError as e:
print(f"Failed to push to repository {repo.namespace}/{repo.repo}: {e}")
print(f"Failed to push to repository {repo.namespace}/{repo.name}: {e}")

# Helper Functions

Expand All @@ -116,6 +154,17 @@ def get_git_user_email():
except Exception as e:
raise Exception(f"You must run: $ git config --global user.email $USER@example.com") from e

def load_protocol_cls(protocol_name):
# TODO resourcelib + entrypoints stuff + dynamic based on known indexes
protocols = {
"publicdomainrelay/index-atproto-v2@v1": ProtocolIndexATProto,
"publicdomainrelay/index-github@v1": ProtocolIndexGitHub,
"publicdomainrelay/federation-git@v1": ProtocolFederationGit,
}
if protocol_name not in protocols:
raise ValueError(f"{index_protocol!r} not found in: {protocols.keys()}")
return protocols[protocol_name]

def build_federation_context(data):
# Build owners
owners_list = data.get('owners', [])
Expand All @@ -126,11 +175,39 @@ def build_federation_context(data):
namespaces = {}
for ns_name, ns_data in namespaces_dict.items():
indexes_list = ns_data.get('indexes', [])
indexes = [PolicyIndex(**index) for index in indexes_list]
indexes = []
for index_data in indexes_list:
protocol_name = index_data.get("protocol", "")
protocol_cls = load_protocol_cls(protocol_name)
protocol = protocol_cls(**index_data.get("data", {}))
indexes.append(
PolicyIndex(
**{
**index_data,
**{
"data": protocol,
},
},
),
)
namespaces[ns_name] = PolicyDataNamespace(indexes=indexes)

# Build federation data
federation_list = data.get('federation', [])
federation_list = []
for federation_data in data.get('federation', []):
protocol_name = federation_data.get("protocol", "")
protocol_cls = load_protocol_cls(protocol_name)
protocol = protocol_cls(**federation_data.get("data", {}))
federation_list.append(
ProtocolFederation(
**{
**federation_data,
**{
"data": protocol,
},
},
),
)

policy_data = PolicyData(
namespaces=namespaces,
Expand All @@ -140,16 +217,6 @@ def build_federation_context(data):

return policy_data

def get_active_repos(policy):
active_repos = []
for federation in policy.data.federation:
if federation.get('protocol') == 'publicdomainrelay/federation-git@v1':
repos = federation.get('data', {}).get('repos', [])
active_repos.extend([FederationGitRepo(**repo) for repo in repos])
return active_repos

# Main Execution Function

def main():
# Load the YAML configuration
# YAML_INPUT = "policy.yaml"
Expand All @@ -170,15 +237,12 @@ def main():
),
)

# Get active repositories
active_repos = get_active_repos(ctx.policy)
active = FederationGitContext(repos=active_repos)

import snoop
snoop.pp(ctx)

# Perform federation git operations
# federation_git(ctx, active)
for federation in ctx.policy.data.federation:
if not federation.protocol.startswith(
'publicdomainrelay/federation-git@',
):
continue
federation_git(ctx, federation)

if __name__ == "__main__":
main()

0 comments on commit a3f5c64

Please sign in to comment.