Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
n-dusan committed Mar 19, 2024
1 parent 915e1e0 commit c8058f8
Showing 1 changed file with 214 additions and 81 deletions.
295 changes: 214 additions & 81 deletions src/history/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,90 +89,113 @@ async fn insert_changes_archive(
/// Insert changes from the RDF repository into the database
async fn insert_changes_from_rdf_repository(
conn: &DatabaseConnection,
rdf_repo_path: PathBuf,
name: &str,
rdf_repo: &Repository,
rdf_repo: Repo,
stele_id: &str,
// rdf_repo: &Repository,
) -> anyhow::Result<()> {
tracing::info!("Inserting changes from RDF repository: {}", name);
tracing::info!("RDF repository path: {}", rdf_repo_path.display());
tracing::info!("Inserting changes from RDF repository: {}", stele_id);
tracing::info!("RDF repository path: {}", rdf_repo.path.display());
let run_documents = false;
if run_documents {
// let response = reqwest::get(NAMESPACE_URL).await?.text().await?;
let mut graph = FastGraph::new();
let head_commit = rdf_repo.repo.head()?.peel_to_commit()?;
let tree = head_commit.tree()?;
tree.walk(git2::TreeWalkMode::PreOrder, |_, entry| {
let path_name = entry.name().unwrap();
// let path = entry.path().unwrap();
if path_name.contains(".rdf") {
let blob = rdf_repo.repo.find_blob(entry.id()).unwrap();
let data = blob.content();
let reader = std::io::BufReader::new(data);
parser::parse_bufread(reader)
.add_to_graph(&mut graph)
.unwrap();
}
git2::TreeWalkResult::Ok
})?;
// for entry in WalkDir::new(&rdf_repo.path) {
// match entry {
// Ok(entry) if is_rdf(&entry) => {
// tracing::debug!("Parsing file: {:?}", entry.path());
// let file = std::fs::File::open(entry.path())?;
// let reader = std::io::BufReader::new(file);
// parser::parse_bufread(reader).add_to_graph(&mut graph)?;
// }
// Ok(entry) => {
// tracing::debug!("Skipping non-RDF file: {:?}", entry.path());
// continue;
// }
// Err(err) => {
// tracing::error!("Error reading file: {:?}", err);
// }
// }
// }
// let namespace_url = "https://open.law/us/ngo/oll/_ontology/v0.1/ontology.owl#";
// let oll: Namespace<&str> = Namespace::new(namespace_url).unwrap();

// let response = reqwest::get(NAMESPACE_URL).await?.text().await?;
let mut graph = FastGraph::new();
// let oll_document_version: NsTerm = oll.get("DocumentVersion").unwrap();
// let oll_doc_id = oll.get("docId").unwrap();

for entry in WalkDir::new(&rdf_repo_path) {
match entry {
Ok(entry) if is_rdf(&entry) => {
tracing::debug!("Parsing file: {:?}", entry.path());
let file = std::fs::File::open(entry.path())?;
let reader = std::io::BufReader::new(file);
parser::parse_bufread(reader).add_to_graph(&mut graph)?;
}
Ok(entry) => {
tracing::debug!("Skipping non-RDF file: {:?}", entry.path());
continue;
}
Err(err) => {
tracing::error!("Error reading file: {:?}", err);
let documents = graph.triples_matching(Any, Any, [oll::DocumentVersion]);
let mut doc_to_versions: HashMap<String, Vec<String>> = HashMap::new();
for triple in documents {
let triple = triple.unwrap();
let document = triple.s();
let mut doc_id_triples = graph.triples_matching([document], [oll::docId], Any);
if let Some(doc_id_triple) = doc_id_triples.next() {
let object = doc_id_triple.unwrap().o();
let document_iri = document.iri().unwrap().to_string();
if let SimpleTerm::LiteralDatatype(doc_id, _) = object {
doc_to_versions
.entry(doc_id.to_string())
.or_insert_with(Vec::new)
.push(document_iri);
}
}
}
}
let namespace_url = "https://open.law/us/ngo/oll/_ontology/v0.1/ontology.owl#";
let oll: Namespace<&str> = Namespace::new(namespace_url).unwrap();

let oll_document_version: NsTerm = oll.get("DocumentVersion").unwrap();
let oll_doc_id = oll.get("docId").unwrap();

let documents = graph.triples_matching(Any, Any, [oll_document_version]);
let mut doc_to_versions: HashMap<String, Vec<String>> = HashMap::new();
for triple in documents {
let triple = triple.unwrap();
let document = triple.s();
let mut doc_id_triples = graph.triples_matching([document], [oll_doc_id], Any);
if let Some(doc_id_triple) = doc_id_triples.next() {
let object = doc_id_triple.unwrap().o();
let document_iri = document.iri().unwrap().to_string();
if let SimpleTerm::LiteralDatatype(doc_id, _) = object {
doc_to_versions
.entry(doc_id.to_string())
.or_insert_with(Vec::new)
.push(document_iri);
for versions in doc_to_versions.values() {
// Find the version with the maximum docId
let doc_version = versions
.iter()
.max_by_key(|&v| {
let mut doc_id_triples = graph.triples_matching([v.as_str()], [oll::docId], Any);
doc_id_triples
.next()
.map_or_else(String::new, |doc_id_triple| {
let object = doc_id_triple.unwrap().o();
if let SimpleTerm::LiteralDatatype(doc_id, _) = object {
doc_id.to_string()
} else {
String::new()
}
})
})
.unwrap();
// Get the docId for this version
// dbg!(&doc_version);
let doc_version_iri_ref = IriRef::new_unchecked(MownStr::from_str(doc_version.as_str()));
let mut doc_id_triples =
graph.triples_matching([SimpleTerm::Iri(doc_version_iri_ref)], [oll::docId], Any);
if let Some(doc_id_triple) = doc_id_triples.next() {
let object = doc_id_triple.unwrap().o();
if let SimpleTerm::LiteralDatatype(doc_id, _) = object {
insert_new_document(conn, doc_id).await?;
}
}
}
}
for versions in doc_to_versions.values() {
// Find the version with the maximum docId
let doc_version = versions
.iter()
.max_by_key(|&v| {
let mut doc_id_triples = graph.triples_matching([v.as_str()], [oll_doc_id], Any);
doc_id_triples
.next()
.map_or_else(String::new, |doc_id_triple| {
let object = doc_id_triple.unwrap().o();
if let SimpleTerm::LiteralDatatype(doc_id, _) = object {
doc_id.to_string()
} else {
String::new()
}
})
})
.unwrap();
// Get the docId for this version
// dbg!(&doc_version);
let doc_version_iri_ref = IriRef::new_unchecked(MownStr::from_str(doc_version.as_str()));
let mut doc_id_triples =
graph.triples_matching([SimpleTerm::Iri(doc_version_iri_ref)], [oll_doc_id], Any);
if let Some(doc_id_triple) = doc_id_triples.next() {
let object = doc_id_triple.unwrap().o();
if let SimpleTerm::LiteralDatatype(doc_id, _) = object {
insert_new_document(conn, doc_id).await?;
}
let tx = conn.pool.begin().await?;
match load_delta_from_publications(conn, &rdf_repo, stele_id).await {
Ok(_) => {
tx.commit().await?;
Ok(())
}
Err(err) => {
tx.rollback().await?;
Err(err)
}
}

load_delta_from_publications(&mut graph, conn, rdf_repo_path.join("_publication"), name)
.await?;
Ok(())
}

/// Check if the entry is an RDF file
Expand All @@ -182,15 +205,125 @@ fn is_rdf(entry: &walkdir::DirEntry) -> bool {

/// Load deltas from the publications
async fn load_delta_from_publications(
graph: &mut FastGraph,
conn: &DatabaseConnection,
publication_path: PathBuf,
name: &str,
rdf_repo: &Repo,
stele_id: &str,
) -> anyhow::Result<()> {
insert_new_stele(conn, name).await?;
let id = find_stele_by_name(conn, name).await?;
tracing::info!("Inserting changes from publications for stele: {}", name);
insert_new_stele(conn, stele_id).await?;
let id = find_stele_by_name(conn, stele_id).await?;
tracing::info!(
"Inserting changes from publications for stele: {}",
stele_id
);
dbg!(&id);
dbg!(&publication_path);
// dbg!(&publication_path);
load_delta_from_publications_from_beginning(conn, rdf_repo, stele_id)?;
Ok(())
}

/// Iterate and load delta from all publications in the `_publication` directory
///
/// # Errors
/// Errors if the delta cannot be loaded from the publications
fn load_delta_from_publications_from_beginning(
conn: &DatabaseConnection,
rdf_repo: &Repo,
stele_id: &str,
) -> anyhow::Result<()> {
let head_commit = rdf_repo.repo.head()?.peel_to_commit()?;
let tree = head_commit.tree()?;
let publications_dir_entry = tree.get_path(&PathBuf::from("_publication"))?;
let publications_subtree = rdf_repo.repo.find_tree(publications_dir_entry.id())?;
for publication_entry in publications_subtree.iter() {
let name = publication_entry.name().unwrap();
dbg!(&name);
let mut pub_graph = FastGraph::new();
let object = publication_entry.to_object(&rdf_repo.repo)?;
let Some(publication_tree) = object.as_tree() else {
anyhow::bail!("Expected a tree but got something else");
};
let index_rdf = publication_tree.get_path(&PathBuf::from("index.rdf"))?;
let blob = rdf_repo.repo.find_blob(index_rdf.id())?;
let data = blob.content();
let reader = std::io::BufReader::new(data);
parser::parse_bufread(reader).add_to_graph(&mut pub_graph)?;
let Some(pub_label_obj) = pub_graph.triples_matching(Any, [rdfs::label], Any).next() else {
anyhow::bail!("Could not find pub_label in a publication");
};
let pub_label = {
let SimpleTerm::LiteralLanguage(pub_label, _) = pub_label_obj?.o() else
{
anyhow::bail!("Found pub_label in a publication, but it was not a literal");
};
pub_label.to_string()
};
dbg!(&pub_label);
let Some(pub_date_obj) = pub_graph.triples_matching(Any, [dcterms::available], Any).next() else {
anyhow::bail!("Could not find pub_date in a publication");
};
let pub_date = {
let SimpleTerm::LiteralDatatype(pub_date, _) = pub_date_obj?.o() else
{
anyhow::bail!("Expected pub_date as a literal");
};
pub_date.to_string()
};
dbg!(&pub_date);
publication_tree.walk(git2::TreeWalkMode::PreOrder, |_, entry| {
let path_name = entry.name().unwrap();
if path_name.contains(".rdf") {
let current_blob = rdf_repo.repo.find_blob(entry.id()).unwrap();
let current_content = current_blob.content();
parser::parse_bufread(std::io::BufReader::new(current_content))
.add_to_graph(&mut pub_graph)
.unwrap();
}
git2::TreeWalkResult::Ok
})?;

let pub_document_versions = get_document_publication_versions(&pub_graph);
let pub_collection_versions = get_collection_publication_versions(&pub_graph);
load_delta_for_publication(conn, pub_document_versions, pub_collection_versions, pub_label, pub_date, &pub_graph, stele_id, None)?;
}
Ok(())
}

fn load_delta_for_publication(
conn: &DatabaseConnection,
pub_document_versions: Vec<[&SimpleTerm<'_>; 1]>,
pub_collection_versions: Vec<[&SimpleTerm<'_>; 1]>,
pub_label: String,
pub_date: String,
pub_graph: &FastGraph,
stele_id: &str,
last_inserted_date: Option<String>,
) -> anyhow::Result<()> {
// for version in pub_document_versions {
// let codified_date =
// }
Ok(())
}

/// Get the document publication version IRIs from the graph
fn get_document_publication_versions(graph: &FastGraph) -> Vec<[&SimpleTerm<'_>; 1]> {
let triples = graph.triples_matching(Any, Any, [oll::DocumentVersion]);
triples
.filter_map(|t| {
let t = t.ok()?;
let subject = t.s();
Some([subject])
})
.collect()
}

/// Get the collection publication version IRIs from the graph
fn get_collection_publication_versions(graph: &FastGraph) -> Vec<[&SimpleTerm<'_>; 1]> {
let triples = graph.triples_matching(Any, Any, [oll::CollectionVersion]);
triples
.filter_map(|t| {
let t = t.ok()?;
let subject = t.s();
Some([subject])
})
.collect()
}

0 comments on commit c8058f8

Please sign in to comment.