Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-494 fix hdt file loading issue #495

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.sail.NotifyingSail;
import com.the_qa_company.qendpoint.core.enums.CompressionType;
import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
Expand All @@ -43,7 +42,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -463,8 +461,9 @@ public LoadFileResult loadFile(InputStream input, String filename) throws IOExce
if (sparqlRepository.getOptions().getStorageMode().equals(SailCompilerSchema.ENDPOINTSTORE_STORAGE)) {
shutdown();

RDFFormat format = Rio.getParserFormatForFileName(filename)
.orElseThrow(() -> new ServerWebInputException("file format not supported " + filename));
RDFFormat format = filename.toLowerCase().endsWith(".hdt") ? RDFFormat.HDT
: Rio.getParserFormatForFileName(filename).orElseThrow(
() -> new ServerWebInputException("file format not supported " + filename));

EndpointStore endpoint = (EndpointStore) compiledSail.getSource();
EndpointFiles files = endpoint.getEndpointFiles();
Expand Down Expand Up @@ -524,7 +523,7 @@ public LoadFileResult loadFile(InputStream input, String filename) throws IOExce
} else {
shutdown();
initializeEndpointStore(false);
sendUpdates(input, baseURI, filename);
sendUpdates(input, filename);
}
try {
sparqlRepository.reindexLuceneSails();
Expand Down Expand Up @@ -575,7 +574,7 @@ public Map<String, String> getPrefixes() {
return prefixes;
}

private void sendUpdates(InputStream inputStream, String baseURI, String filename) throws IOException {
private void sendUpdates(InputStream inputStream, String filename) throws IOException {
StopWatch timeWatch = new StopWatch();

// uncompress the file if required
Expand Down Expand Up @@ -613,43 +612,6 @@ private void sendUpdates(InputStream inputStream, String baseURI, String filenam
logger.info("NT file loaded in {}", timeWatch.stopAndShow());
}

private void generateHDT(Iterator<TripleString> it, String baseURI, HDTOptions spec, String hdtOutput)
throws IOException {
if (sparqlRepository.getOptions().getPassMode().equals(SailCompilerSchema.HDT_TWO_PASS_MODE)) {
// dump the file to the disk to allow 2 passes
Path tempNTFile = Paths.get(hdtOutput + "-tmp.nt");
logger.info("Create TEMP NT file '{}'", tempNTFile);
try {
try (PrintWriter stream = new PrintWriter(tempNTFile.toFile())) {
while (it.hasNext()) {
TripleString ts = it.next();
ts.dumpNtriple(stream);
}
}
logger.info("NT file created, generating HDT...");
try {
HDT hdtDump = HDTManager.generateHDT(tempNTFile.toFile().getAbsolutePath(), baseURI,
RDFNotation.NTRIPLES, spec, null);
hdtDump.saveToHDT(hdtOutput, null);
hdtDump.close();
} catch (ParserException e) {
throw new IOException("Can't generate HDT", e);
}
} finally {
Files.deleteIfExists(tempNTFile);
}
} else {
// directly use the TripleString stream to generate the HDT
try {
HDT hdtDump = HDTManager.generateHDT(it, baseURI, spec, null);
hdtDump.saveToHDT(hdtOutput, null);
hdtDump.close();
} catch (ParserException e) {
throw new IOException("Can't generate HDT", e);
}
}
}

public int getPort() {
return port;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.the_qa_company.qendpoint.controller;

import com.the_qa_company.qendpoint.Application;
import com.the_qa_company.qendpoint.core.enums.RDFNotation;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.store.EndpointStore;
import com.the_qa_company.qendpoint.utils.LargeFakeDataSetStreamSupplier;
import com.the_qa_company.qendpoint.utils.RDFStreamUtils;
Expand Down Expand Up @@ -60,7 +66,9 @@ public class FileUploadTest {

@Parameterized.Parameters(name = "{0}")
public static Collection<Object> params() {
return new ArrayList<>(RDFParserRegistry.getInstance().getKeys());
ArrayList<Object> list = new ArrayList<>(RDFParserRegistry.getInstance().getKeys());
list.add(RDFFormat.HDT);
return list;
}

@Autowired
Expand All @@ -69,7 +77,7 @@ public static Collection<Object> params() {
private final String fileName;
private final RDFFormat format;

public FileUploadTest(RDFFormat format) throws IOException {
public FileUploadTest(RDFFormat format) throws IOException, ParserException {
this.format = format;
RDFFormat originalFormat = Rio.getParserFormatForFileName(COKTAILS_NT).orElseThrow();

Expand All @@ -79,9 +87,16 @@ public FileUploadTest(RDFFormat format) throws IOException {
Path RDFFile = testDir.resolve(COKTAILS_NT + "." + format.getDefaultFileExtension());
if (!Files.exists(RDFFile)) {
try (OutputStream os = new FileOutputStream(RDFFile.toFile()); InputStream is = stream(COKTAILS_NT)) {
RDFWriter writer = Rio.createWriter(format, os);
parser.setRDFHandler(noBNode(writer));
parser.parse(is);
if (format == RDFFormat.HDT) {
try (HDT hdt = HDTManager.generateHDT(is, "http://example.org/#", RDFNotation.TURTLE,
HDTOptions.empty(), ProgressListener.ignore())) {
hdt.saveToHDT(os);
}
} else {
RDFWriter writer = Rio.createWriter(format, os);
parser.setRDFHandler(noBNode(writer));
parser.parse(is);
}
}
}

Expand Down Expand Up @@ -127,18 +142,6 @@ private InputStream streamOut(String file) throws FileNotFoundException {
return new FileInputStream(file);
}

private long fileSize(String file) throws IOException {
InputStream testNt = streamOut(file);
byte[] buff = new byte[1024];

long r;
long size = 0;
while ((r = testNt.read(buff)) != -1) {
size += r;
}
return size;
}

private String clearSpaces(String text) {
return text.matches("(\\s|[\\n\\r])*") ? "" : text;
}
Expand Down Expand Up @@ -222,6 +225,8 @@ public void loadTest() throws IOException {
@Test
@Ignore("large test")
public void loadLargeTest() throws IOException {
if (format == RDFFormat.HDT)
return;
long size = Sparql.getMaxChunkSize() * 10;
LargeFakeDataSetStreamSupplier supplier = new LargeFakeDataSetStreamSupplier(size, 42);
sparql.loadFile(supplier.createRDFStream(format), "fake." + format.getDefaultFileExtension());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.the_qa_company.qendpoint.utils;

import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.core.util.LiteralsUtils;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
Expand All @@ -9,6 +11,7 @@
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFHandler;
import org.eclipse.rdf4j.rio.RDFHandlerException;
Expand All @@ -17,6 +20,9 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
Expand Down Expand Up @@ -61,6 +67,22 @@ public static InputStream uncompressedStream(InputStream stream, String filename
*/
public static void readRDFStream(InputStream stream, RDFFormat format, boolean keepBNode,
Consumer<Statement> statementConsumer) throws IOException {
if (format == RDFFormat.HDT) {
// write HDT into a temp file, map it and iterate over it
Path path = Files.createTempFile(RDFStreamUtils.class.getName(), ".hdt");
try {
Files.copy(stream, path, StandardCopyOption.REPLACE_EXISTING);
try (HDT hdt = HDTManager.mapHDT(path)) {
for (TripleString ts : hdt) {
SimpleValueFactory vf = SimpleValueFactory.getInstance();
statementConsumer.accept(convertStatement(vf, ts));
}
}
} finally {
Files.deleteIfExists(path);
}
return;
}
RDFParser parser = Rio.createParser(format);
parser.setPreserveBNodeIDs(keepBNode);
parser.setRDFHandler(new RDFHandler() {
Expand Down
Loading