From ed3f3443359031e5efb1928f675a35cc4fa5f2f2 Mon Sep 17 00:00:00 2001 From: jabadji Date: Wed, 9 Feb 2022 17:00:15 +0100 Subject: [PATCH 1/2] feat: add text extraction --- Cargo.lock | 4 +- Cargo.toml | 2 +- src/cli.rs | 8 +- src/error.rs | 2 + src/extract_text.rs | 111 +++++++++++++++++++++++ src/impls/oscar_doc.rs | 190 +++++++++++++++++++++++++++++++++++++++- src/main.rs | 1 + src/ops/extract_text.rs | 9 ++ src/ops/mod.rs | 2 + 9 files changed, 324 insertions(+), 5 deletions(-) create mode 100644 src/extract_text.rs create mode 100644 src/ops/extract_text.rs diff --git a/Cargo.lock b/Cargo.lock index b30a5fd..f1fdf8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,9 +393,9 @@ checksum = "96b3c34c1690edf8174f5b289a336ab03f568a4460d8c6df75f2f3a692b3bc6a" [[package]] name = "serde_json" -version = "1.0.75" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79" +checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index d895791..87683b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" structopt = "0.3.21" log = "0.4.14" env_logger = "0.9.0" -serde_json = "1.0.75" +serde_json = "1.0.78" rayon = "1.5.1" flate2 = "1.0.22" diff --git a/src/cli.rs b/src/cli.rs index 9e43492..6b9f3be 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -2,6 +2,7 @@ use crate::compress::CompressCorpus; use crate::error::Error; use crate::extract_clean::ExtractCleanCorpus; +use crate::extract_text::ExtractText; use crate::lang_codes::UpdateLangCodes; use crate::split_latest::SplitLatest; use structopt::StructOpt; @@ -26,8 +27,12 @@ pub enum OscarTools { ExtractCleanCorpus(ExtractCleanCorpus), #[structopt(about = "Split a corpus into a set of smaller files")] SplitLatest(SplitLatest), - #[structopt(about = "compress")] + #[structopt(about = "Compress corpus. Useable on files and folders (compresses on a depth=1)")] Compress(CompressCorpus), + #[structopt( + about = "Extracts textual information, discarding metadata. Produces a corpus following OSCAR Scheme v1" + )] + Extract(ExtractText), } impl Runnable for OscarTools { @@ -37,6 +42,7 @@ impl Runnable for OscarTools { OscarTools::ExtractCleanCorpus(u) => u.run(), OscarTools::SplitLatest(u) => u.run(), OscarTools::Compress(u) => u.run(), + OscarTools::Extract(u) => u.run(), } } } diff --git a/src/error.rs b/src/error.rs index 2cb8e62..18fba9e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,6 +4,8 @@ pub enum Error { Io(std::io::Error), Json(serde_json::Error), ThreadPoolBuild(rayon::ThreadPoolBuildError), + MissingContent(serde_json::Value), + MalformedContent(serde_json::Value), Custom(String), } diff --git a/src/extract_text.rs b/src/extract_text.rs new file mode 100644 index 0000000..f614690 --- /dev/null +++ b/src/extract_text.rs @@ -0,0 +1,111 @@ +//! Splitting of OSCAR Schema v2 corpora +//! +//! Untested but should work on OSCAR Schema v1 corpora +use std::path::PathBuf; + +use crate::impls::OscarDoc; +use crate::ops::ExtractText as ET; +use crate::{cli::Runnable, error::Error}; +use log::error; +use structopt::StructOpt; + +#[derive(StructOpt, Debug)] +pub struct ExtractText { + #[structopt(help = "source corpus file")] + src: PathBuf, + #[structopt(help = "dest corpus folder.")] + dst: PathBuf, + + #[structopt(help = "delete source files", short = "m")] + del_src: bool, + // #[structopt( + // short, + // long, + // default_value = "0", + // help = "Number of threads (ignored if source is a single file)" + // )] + // num_threads: usize, +} + +impl Runnable for ExtractText { + fn run(&self) -> Result<(), Error> { + if self.src.is_file() { + OscarDoc::extract_text(&self.src, &self.dst, self.del_src)?; + Ok(()) + } else { + error!("Extraction is not supported on folders. Call on each file."); + Err(Error::Custom( + "Extraction is not supported on folders. Call on each file.".to_string(), + )) + } + } +} + +// #[cfg(test)] +// mod tests { +// use std::{ +// fs::File, +// io::{Read, Write}, +// }; + +// use tempfile; + +// use crate::{impls::OscarDoc, ops::Compress}; + +// pub fn setup_oscardoc() -> String { +// let mut corpus = String::new(); +// for i in 0..10000 { +// corpus.push_str(&format!(r#"{{"item":{}}}"#, i)); +// corpus.push('\n'); +// } + +// corpus +// } + +// // the way of checking results is bad, since we merge then sort results +// // we should rather check the individual files one by one +// #[test] +// fn test_compress() { +// let content = setup_oscardoc(); +// let content: Vec<&str> = content.lines().collect(); +// let content_files = (&content).chunks(1000).into_iter(); +// let tmpdir = tempfile::tempdir().unwrap(); +// for (idx, chunk) in content_files.enumerate() { +// // should be safe since it does not rely on rust destructor +// // + it is in a tempfile that will be cleaned at the exit of the test +// let tempfile_path = tmpdir.path().join(format!("file_{idx}.jsonl")); +// let mut tempfile = File::create(tempfile_path).unwrap(); +// tempfile.write_all(chunk.join("\n").as_bytes()).unwrap(); +// } + +// // create destination path and compress +// let tmpdst = tempfile::tempdir().unwrap(); +// OscarDoc::compress_folder(tmpdir.path(), tmpdst.path(), false, 1).unwrap(); + +// println!( +// "{:?}", +// std::fs::read_dir(tmpdir.path()) +// .unwrap() +// .collect::>() +// ); +// // let mut items_decompressed = Vec::new(); + +// let mut decompressed_data = Vec::new(); +// for file in std::fs::read_dir(tmpdst.path()).unwrap() { +// println!("file: {:?}", file); +// // for file in split_files { +// let file = file.unwrap(); +// let file = File::open(file.path()).unwrap(); +// let mut reader = flate2::read::GzDecoder::new(file); +// let mut decompressed = String::new(); +// reader.read_to_string(&mut decompressed).unwrap(); +// decompressed_data.extend(decompressed.lines().map(|x| x.to_string()).into_iter()); +// } + +// // sort results +// decompressed_data.sort(); +// let mut content = content; +// content.sort(); +// assert_eq!(decompressed_data, content); +// } +// } diff --git a/src/impls/oscar_doc.rs b/src/impls/oscar_doc.rs index 18afbcb..ecacb16 100644 --- a/src/impls/oscar_doc.rs +++ b/src/impls/oscar_doc.rs @@ -1,8 +1,17 @@ //! OSCAR Schema v2 (See [oscar-corpus.com](https://oscar-corpus.com)) operation implementations. //! //! Implementations mostly use default trait implementations, as the format is simple. +use std::{ + fs::File, + io::{BufRead, BufReader, Read, Write}, + path::PathBuf, +}; + +use serde_json::Value; + use crate::{ - ops::{Compress, Split}, + error::Error, + ops::{Compress, ExtractText, Split}, versions::{Schema, Version}, }; @@ -20,3 +29,182 @@ impl Schema for OscarDoc { /// Use default implementation of splitting (see [crate::ops::Split]) impl Split for OscarDoc {} impl Compress for OscarDoc {} + +/// impl block for helper functions related to [ExtractText]. +impl OscarDoc { + /// Extracts content from a Document. + /// + /// Fails if the `content` field is missing or is not a string. + fn extract_from_doc(doc: &str) -> Result { + let v: Value = serde_json::from_str(doc)?; + + if let Some(content) = v.get("content") { + if let Value::String(c) = content { + let mut content_str = c.to_string().replace(r#"\n"#, "\n"); + content_str.push('\n'); + Ok(content_str) + } else { + Err(Error::MalformedContent(v)) + } + } else { + Err(Error::MissingContent(v)) + } + } + + fn extract(src: T, dst: &mut U) -> Result<(), Error> { + let b = BufReader::new(src); + let docs = b.lines(); + for doc in docs { + //extract and add newline + let doc = doc?; + let content = Self::extract_from_doc(&doc)? + "\n"; + let content_length = content.len(); + + // check written bytes + if dst.write(content.as_bytes())? > content_length { + error!("IO Error: Could not write into destination writer."); + } + } + + // flush output + dst.flush()?; + + Ok(()) + } +} + +impl ExtractText for OscarDoc { + fn extract_text( + src: &std::path::Path, + dst: &std::path::Path, + del_src: bool, + ) -> Result<(), Error> { + if !src.is_file() { + warn!("{:?} is not a file: ignoring", src); + return Ok(()); + } + let src_file = File::open(src)?; + + // gen filename + let filename = src.file_name().unwrap(); + let mut dst: PathBuf = [dst.as_os_str(), filename].iter().collect(); + let extension = String::from(dst.extension().unwrap().to_str().unwrap()); + dst.set_extension(extension + ".txt"); + + info!("extracting text from {:?} to {:?}", src, dst); + + let mut dest_file = File::create(dst)?; + OscarDoc::extract(src_file, &mut dest_file)?; + + if del_src { + std::fs::remove_file(src)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::Value; + + use crate::impls::OscarDoc; + + fn get_doc() -> &'static str { + r#"{"content":"foo\nbar\nbaz\nquux"} +{"content":"123456789"} +{"content":"246810"} +{"content":"test"}"# + } + + #[test] + fn test_extract_single() { + let docs = get_doc(); + let doc = docs.lines().next().unwrap().as_bytes(); + + let mut buf = Vec::new(); + OscarDoc::extract(doc, &mut buf).unwrap(); + + assert_eq!(String::from_utf8(buf).unwrap(), "foo\nbar\nbaz\nquux\n\n"); + } + #[test] + fn test_extract_multiple() { + let doc = get_doc().as_bytes(); + let mut buf = Vec::new(); + OscarDoc::extract(doc, &mut buf).unwrap(); + + assert_eq!( + String::from_utf8(buf).unwrap(), + "foo\nbar\nbaz\nquux\n\n123456789\n\n246810\n\ntest\n\n" + ); + } + #[test] + fn extract_no_content() { + let document = r#"{"no_content": "hehe"}"#; + let extracted = OscarDoc::extract_from_doc(document); + + assert!(extracted.is_err()) + } + + #[test] + fn extract_bad_content() { + let document = r#"{"content": ["hehe"]}"#; + let extracted = OscarDoc::extract_from_doc(document); + + assert!(extracted.is_err()) + } + + #[test] + fn text_extract_from_doc() { + let content = "foo +bar +baz +quux +"; + + let document = r#" + { + "content":"foo\nbar\nbaz\nquux", + "warc_headers":{ + "warc-block-digest":"sha1:X3OWP47FG2O5LBNMFSNB44FJF2SSRC26", + "content-type":"text/plain", + "warc-refers-to":"", + "content-length":"16", + "warc-identified-content-language":"eng", + "warc-target-uri":"http://3dv2015.inria.fr/registration-2/index.html", + "warc-date":"2021-09-16T11:07:14Z", + "warc-record-id":"", + "warc-type":"conversion" + }, + "metadata":{ + "identification":{ + "label":"en", + "prob":0.6268374 + }, + "annotation":[ + "short_sentences", + "footer" + ], + "sentence_identifications":[ + { + "label":"en", + "prob":0.93925816 + }, + null, + { + "label":"en", + "prob":0.9937219 + }, + { + "label":"en", + "prob":0.9996538 + } + ] + } + } + "#; + + let extracted = OscarDoc::extract_from_doc(document).unwrap(); + assert_eq!(extracted, content); + } +} diff --git a/src/main.rs b/src/main.rs index 56dec29..52169af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod cli; mod compress; mod error; mod extract_clean; +mod extract_text; mod impls; mod lang_codes; mod ops; diff --git a/src/ops/extract_text.rs b/src/ops/extract_text.rs new file mode 100644 index 0000000..c314abf --- /dev/null +++ b/src/ops/extract_text.rs @@ -0,0 +1,9 @@ +/*! Extracts textual content into new files, discarding metadata. +!*/ + +use std::path::Path; + +use crate::error::Error; +pub trait ExtractText { + fn extract_text(src: &Path, dst: &Path, del_src: bool) -> Result<(), Error>; +} diff --git a/src/ops/mod.rs b/src/ops/mod.rs index 077d71a..5164d34 100644 --- a/src/ops/mod.rs +++ b/src/ops/mod.rs @@ -2,6 +2,8 @@ //! //! A subset of these should be implemented for different corpus versions. mod compress; +mod extract_text; mod split; pub(crate) use compress::Compress; +pub(crate) use extract_text::ExtractText; pub(crate) use split::Split; From 3f73ac202ab0b9819f5bcec9a3c8f34aaf237b74 Mon Sep 17 00:00:00 2001 From: jabadji Date: Thu, 10 Feb 2022 17:09:32 +0100 Subject: [PATCH 2/2] test: add test on extraction command also changes behaviour of extract which now expects a file path on `dst` --- src/extract_text.rs | 148 ++++++++++++++++++++--------------------- src/impls/oscar_doc.rs | 20 +++--- 2 files changed, 84 insertions(+), 84 deletions(-) diff --git a/src/extract_text.rs b/src/extract_text.rs index f614690..59ec87d 100644 --- a/src/extract_text.rs +++ b/src/extract_text.rs @@ -11,20 +11,13 @@ use structopt::StructOpt; #[derive(StructOpt, Debug)] pub struct ExtractText { - #[structopt(help = "source corpus file")] + #[structopt(help = "source corpus file. Does not work with folders")] src: PathBuf, #[structopt(help = "dest corpus folder.")] dst: PathBuf, #[structopt(help = "delete source files", short = "m")] del_src: bool, - // #[structopt( - // short, - // long, - // default_value = "0", - // help = "Number of threads (ignored if source is a single file)" - // )] - // num_threads: usize, } impl Runnable for ExtractText { @@ -41,71 +34,74 @@ impl Runnable for ExtractText { } } -// #[cfg(test)] -// mod tests { -// use std::{ -// fs::File, -// io::{Read, Write}, -// }; - -// use tempfile; - -// use crate::{impls::OscarDoc, ops::Compress}; - -// pub fn setup_oscardoc() -> String { -// let mut corpus = String::new(); -// for i in 0..10000 { -// corpus.push_str(&format!(r#"{{"item":{}}}"#, i)); -// corpus.push('\n'); -// } - -// corpus -// } - -// // the way of checking results is bad, since we merge then sort results -// // we should rather check the individual files one by one -// #[test] -// fn test_compress() { -// let content = setup_oscardoc(); -// let content: Vec<&str> = content.lines().collect(); -// let content_files = (&content).chunks(1000).into_iter(); -// let tmpdir = tempfile::tempdir().unwrap(); -// for (idx, chunk) in content_files.enumerate() { -// // should be safe since it does not rely on rust destructor -// // + it is in a tempfile that will be cleaned at the exit of the test -// let tempfile_path = tmpdir.path().join(format!("file_{idx}.jsonl")); -// let mut tempfile = File::create(tempfile_path).unwrap(); -// tempfile.write_all(chunk.join("\n").as_bytes()).unwrap(); -// } - -// // create destination path and compress -// let tmpdst = tempfile::tempdir().unwrap(); -// OscarDoc::compress_folder(tmpdir.path(), tmpdst.path(), false, 1).unwrap(); - -// println!( -// "{:?}", -// std::fs::read_dir(tmpdir.path()) -// .unwrap() -// .collect::>() -// ); -// // let mut items_decompressed = Vec::new(); - -// let mut decompressed_data = Vec::new(); -// for file in std::fs::read_dir(tmpdst.path()).unwrap() { -// println!("file: {:?}", file); -// // for file in split_files { -// let file = file.unwrap(); -// let file = File::open(file.path()).unwrap(); -// let mut reader = flate2::read::GzDecoder::new(file); -// let mut decompressed = String::new(); -// reader.read_to_string(&mut decompressed).unwrap(); -// decompressed_data.extend(decompressed.lines().map(|x| x.to_string()).into_iter()); -// } - -// // sort results -// decompressed_data.sort(); -// let mut content = content; -// content.sort(); -// assert_eq!(decompressed_data, content); -// } -// } +#[cfg(test)] +mod tests { + use std::io::Write; + + use tempfile; + + use crate::{impls::OscarDoc, ops::ExtractText}; + + pub fn setup_oscardoc() -> (String, String) { + let mut corpus = String::new(); + let mut content_only = String::new(); + for i in 0..100 { + let content = format!(r#"document n{0}\nthis is document n{0}"#, i); + corpus.push_str(&format!( + r#"{{"content":"{content}", "metadata": ["foo"]}}"#, + )); + corpus.push('\n'); + + content_only.push_str(&content.replace(r#"\n"#, "\n")); + content_only.push_str("\n\n"); + } + + (corpus, content_only) + } + + #[test] + fn test_extract() { + //get both documents and expected output + let (docs, content_only) = setup_oscardoc(); + let mut src = tempfile::NamedTempFile::new().unwrap(); + + //write fake corpus + src.write_all(docs.as_bytes()).unwrap(); + + // create destination path and file path + let dst = tempfile::tempdir().unwrap(); + let dst_path = dst.into_path().join("text_only.txt"); + + let src_path = src.into_temp_path(); + OscarDoc::extract_text(&src_path, &dst_path, false).unwrap(); + + // read extracted + let text = std::fs::read_to_string(dst_path).unwrap(); + + assert!(src_path.exists()); + assert_eq!(text, content_only); + } + + #[test] + fn test_extract_rm_src() { + //get both documents and expected output + let (docs, content_only) = setup_oscardoc(); + let mut src = tempfile::NamedTempFile::new().unwrap(); + + //write fake corpus + src.write_all(docs.as_bytes()).unwrap(); + + // create destination path and file path + let dst = tempfile::tempdir().unwrap(); + let dst_path = dst.into_path().join("text_only.txt"); + + let src_path = src.into_temp_path(); + OscarDoc::extract_text(&src_path, &dst_path, true).unwrap(); + + // read extracted + let text = std::fs::read_to_string(dst_path).unwrap(); + + assert!(!src_path.exists()); + assert_eq!(text, content_only); + } +} diff --git a/src/impls/oscar_doc.rs b/src/impls/oscar_doc.rs index ecacb16..916aef7 100644 --- a/src/impls/oscar_doc.rs +++ b/src/impls/oscar_doc.rs @@ -4,7 +4,6 @@ use std::{ fs::File, io::{BufRead, BufReader, Read, Write}, - path::PathBuf, }; use serde_json::Value; @@ -85,15 +84,21 @@ impl ExtractText for OscarDoc { } let src_file = File::open(src)?; - // gen filename - let filename = src.file_name().unwrap(); - let mut dst: PathBuf = [dst.as_os_str(), filename].iter().collect(); - let extension = String::from(dst.extension().unwrap().to_str().unwrap()); - dst.set_extension(extension + ".txt"); + if dst.exists() { + error!("File {:?} already exists!", dst); + return Err(std::io::Error::new( + std::io::ErrorKind::AlreadyExists, + format!("{:?}", dst), + ) + .into()); + } + + let mut dst = dst.to_path_buf(); + dst.set_extension("txt"); + let mut dest_file = File::create(&dst)?; info!("extracting text from {:?} to {:?}", src, dst); - let mut dest_file = File::create(dst)?; OscarDoc::extract(src_file, &mut dest_file)?; if del_src { @@ -106,7 +111,6 @@ impl ExtractText for OscarDoc { #[cfg(test)] mod tests { - use serde_json::Value; use crate::impls::OscarDoc;