Skip to content

Commit

Permalink
Several fixes in the modularized enricher system. Switched to release…
Browse files Browse the repository at this point in the history
… versions.
  • Loading branch information
Aklakan committed May 24, 2024
1 parent 8f63a2e commit bdc795c
Show file tree
Hide file tree
Showing 16 changed files with 607 additions and 493 deletions.
73 changes: 46 additions & 27 deletions lsq-cli/src/main/java/org/aksw/simba/lsq/cli/main/MainCliLsq.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.aksw.simba.lsq.enricher.core.LsqEnricherRegistry;
import org.aksw.simba.lsq.enricher.core.LsqEnricherShell;
import org.aksw.simba.lsq.model.ExperimentConfig;
import org.aksw.simba.lsq.model.ExperimentExec;
import org.aksw.simba.lsq.model.ExperimentRun;
import org.aksw.simba.lsq.model.LsqQuery;
import org.aksw.simba.lsq.vocab.LSQ;
Expand All @@ -83,6 +84,8 @@
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.RDFFormat;
import org.apache.jena.riot.WebContent;
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.riot.system.StreamRDFWriter;
import org.apache.jena.shared.PrefixMapping;
import org.apache.jena.sparql.lang.arq.ParseException;
import org.apache.jena.tdb2.TDB2Factory;
Expand Down Expand Up @@ -163,20 +166,26 @@ public static int mainCore(String[] args) {
.execute(args);
}

public static String getOrCreateSalt(CmdLsqRdfizeBase rdfizeCmd) {
String result = rdfizeCmd.hostSalt;
if(result == null) {
result = UUID.randomUUID().toString();
// TODO Make host hashing a post processing step for the log rdfization
logger.info("Auto generated host hash salt (only used for non-rdf log input): " + result);
}
return result;
}

public static Flowable<ResourceInDataset> createLsqRdfFlow(CmdLsqRdfizeBase rdfizeCmd) throws FileNotFoundException, IOException, ParseException {
String tmpHostHashSalt = getOrCreateSalt(rdfizeCmd);
return createLsqRdfFlow(rdfizeCmd, tmpHostHashSalt);
}

public static Flowable<ResourceInDataset> createLsqRdfFlow(CmdLsqRdfizeBase rdfizeCmd, String hostHashSalt) throws FileNotFoundException, IOException, ParseException {
String logFormat = rdfizeCmd.inputLogFormat;
List<String> logSources = rdfizeCmd.nonOptionArgs;
String baseIri = rdfizeCmd.baseIri;

String tmpHostHashSalt = rdfizeCmd.hostSalt;
if(tmpHostHashSalt == null) {
tmpHostHashSalt = UUID.randomUUID().toString();
// TODO Make host hashing a post processing step for the log rdfization
logger.info("Auto generated host hash salt (only used for non-rdf log input): " + tmpHostHashSalt);
}

String hostHashSalt = tmpHostHashSalt;

String endpointUrl = rdfizeCmd.getEndpointUrl();
// TODO Validate all sources first: For trig files it is ok if no endpoint is specified
// if(endpointUrl == null) {
Expand All @@ -193,21 +202,21 @@ public static Flowable<ResourceInDataset> createLsqRdfFlow(CmdLsqRdfizeBase rdfi


// Hash function which is applied after combining host names with salts
Function<String, String> hashFn = str -> BaseEncoding.base64Url().omitPadding().encode(Hashing.sha256()
Function<String, String> hostHashFn = str -> BaseEncoding.base64Url().omitPadding().encode(Hashing.sha256()
.hashString(str, StandardCharsets.UTF_8)
.asBytes());

Function<Resource, Resource> rdfizer;
if (rdfizeCmd.isQueryOnly()) {
rdfizer = new LsqLogRecordRdfizerQueryOnly(sparqlStmtParser, baseIri, hashFn);
rdfizer = new LsqLogRecordRdfizerQueryOnly(sparqlStmtParser, baseIri, hostHashFn);

} else {
rdfizer = new LsqLogRecordRdfizer(
sparqlStmtParser,
baseIri,
hostHashSalt,
endpointUrl,
hashFn
hostHashFn
);
}

Expand Down Expand Up @@ -635,17 +644,25 @@ public static void benchmarkExecute(CmdLsqRxBenchmarkRun benchmarkExecuteCmd) th
//List<String> logSources = benchmarkCmd.logSources;
// Load the benchmark config and create a benchmark run for it

ExperimentRun run = tryLoadRun(configSrc)
ExperimentExec expExec = tryLoadExec(configSrc)
.orElseThrow(() -> new IllegalArgumentException(
"Could not detect a resource with " + LSQ.Terms.config + " property in " + configSrc));

ExperimentConfig cfg = run.getConfig();
ExperimentConfig expConfig = expExec.getConfig();


String lsqBaseIri = Objects.requireNonNull(cfg.getBaseIri(), "Base IRI (e.g. http://lsq.aksw.org/) not provided");
ExperimentRun expRun = expExec.getModel().createResource().as(ExperimentRun.class)
.setExec(expExec)
.setRunId(0)
// .setTimestamp(null); // xsddt
;


String lsqBaseIri = Objects.requireNonNull(expConfig.getBaseIri(), "Base IRI (e.g. http://lsq.aksw.org/) not provided");
//LsqBenchmeclipse-javadoc:%E2%98%82=jena-sparql-api-conjure/src%5C/main%5C/java%3Corg.aksw.jena_sparql_apiarkProcessor.createProcessor()


String runId = run.getIdentifier();
String runId = expConfig.getIdentifier();
Objects.requireNonNull(runId, "Experiment run identifier must not be null");

// Create a folder with the database for the run
Expand All @@ -656,23 +673,24 @@ public static void benchmarkExecute(CmdLsqRxBenchmarkRun benchmarkExecuteCmd) th
Files.createDirectories(tdb2FullPath);
String fullPathStr = tdb2FullPath.toString();


LsqEnricherShell enricherFactory = new LsqEnricherShell(lsqBaseIri, benchmarkExecuteCmd.enricherSpec.getEffectiveList(), LsqEnricherRegistry::get);
Function<Resource, Resource> enricher = enricherFactory.get();


logger.info("TDB2 benchmark db location: " + tdb2FullPath);

Dataset dataset = TDB2Factory.connectDataset(fullPathStr);
try (OutputStream out = StdIo.openStdOutWithCloseShield();
RDFConnection indexConn = RDFConnection.connect(dataset)) {
RdfDataRefSparqlEndpoint dataRef = cfg.getDataRef();
try (OutputStream outStream = StdIo.openStdOutWithCloseShield();
RDFConnection indexConn = RDFConnection.connect(dataset)) {
StreamRDF out = StreamRDFWriter.getWriterStream(outStream, RDFFormat.TRIG_BLOCKS);
out.start();
RdfDataRefSparqlEndpoint dataRef = expConfig.getDataRef();
try (RdfDataPod dataPod = DataPods.fromDataRef(dataRef)) {
try (SparqlQueryConnection benchmarkConn =
SparqlQueryConnectionWithReconnect.create(() -> dataPod.getConnection())) {
LsqBenchmarkProcessor.process(out, queryFlow, lsqBaseIri, cfg, run, enricher, benchmarkConn, indexConn);
LsqBenchmarkProcessor.process(out, queryFlow, lsqBaseIri, expConfig, expExec, expRun, enricher, benchmarkConn, indexConn);
}
}
out.finish();
} finally {
dataset.close();
}
Expand All @@ -694,8 +712,8 @@ public static Optional<ExperimentConfig> tryLoadConfig(String src) {
return tryLoadResourceWithProperty(src, LSQ.endpoint, ExperimentConfig.class);
}

public static Optional<ExperimentRun> tryLoadRun(String src) {
return tryLoadResourceWithProperty(src, LSQ.config, ExperimentRun.class);
public static Optional<ExperimentExec> tryLoadExec(String src) {
return tryLoadResourceWithProperty(src, LSQ.config, ExperimentExec.class);
}


Expand Down Expand Up @@ -733,12 +751,13 @@ public static void benchmarkPrepare(CmdLsqRxBenchmarkPrepare benchmarkCmd) throw
// Not used if stdout flag is set
String outFilename = sanitizeFilename(runId) + ".run.ttl";

ExperimentRun expRun = configModel
ExperimentExec expRun = configModel
.createResource()
.as(ExperimentRun.class)
.as(ExperimentExec.class)
.setConfig(config)
.setTimestamp(xsddt)
.setIdentifier(runId);
;
// .setIdentifier(runId);


String runIri = config.getBaseIri() + runId;
Expand Down
11 changes: 6 additions & 5 deletions lsq-core/src/main/java/org/aksw/simba/lsq/core/LsqRdfizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@
import org.aksw.jenax.stmt.core.SparqlStmtParserImpl;
import org.aksw.jenax.stmt.core.SparqlStmtQuery;
import org.aksw.jenax.stmt.util.SparqlStmtUtils;
import org.aksw.simba.lsq.core.util.Skolemize;
import org.aksw.simba.lsq.core.util.SkolemizeBackport;
import org.aksw.simba.lsq.model.LsqQuery;
import org.aksw.simba.lsq.model.RemoteExecution;
import org.aksw.simba.lsq.parser.WebLogParser;
import org.aksw.simba.lsq.vocab.LSQ;
import com.google.common.collect.Iterables;
import org.apache.jena.query.Syntax;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.shared.PrefixMapping;

import com.google.common.collect.Iterables;

public class LsqRdfizer {


Expand Down Expand Up @@ -107,7 +108,7 @@ public static Optional<Resource> rdfizeLogRecord(
String baseIri,
String hostHashSalt,
String serviceUrl,
Function<String, String> hashFn,
Function<String, String> hostHashFn,
Resource x) {
RemoteExecution re = x.as(RemoteExecution.class);

Expand Down Expand Up @@ -162,7 +163,7 @@ public static Optional<Resource> rdfizeLogRecord(
String host = re.getHost();
String hostHash = host == null
? null
: hashFn.apply(hostHashSalt + host);
: hostHashFn.apply(hostHashSalt + host);

// FIXME Respect the noHoshHash = true flag
re.setHostHash(hostHash);
Expand Down Expand Up @@ -190,7 +191,7 @@ public static Optional<Resource> rdfizeLogRecord(
// NodeTransformLib2.applyNodeTransform(NodeTransformLib2.makeNullSafe(renames::get), dataset);
// result = Maybe.just(new ResourceInDatasetImpl(dataset, newRoot.getURI(), newRoot));

Resource r = Skolemize.skolemize(queryInDataset, baseIri, LsqQuery.class, (newRoot, renames) -> {
Resource r = SkolemizeBackport.skolemize(queryInDataset, baseIri, LsqQuery.class, (newRoot, renames) -> {
Optional.ofNullable(renames.get(re.asNode()))
.map(newRoot.getModel()::wrapAsResource)
.ifPresent(newRe -> newRe.as(RemoteExecution.class).setSequenceId(null));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package org.aksw.simba.lsq.core.rx.io.input;

import java.util.Optional;
import java.util.function.Function;

import org.aksw.jenax.stmt.core.SparqlStmt;
import org.aksw.jenax.stmt.core.SparqlStmtQuery;
import org.aksw.simba.lsq.core.LsqRdfizer;
import org.aksw.simba.lsq.core.util.Skolemize;
import org.aksw.simba.lsq.core.util.SkolemizeBackport;
import org.aksw.simba.lsq.model.LsqQuery;
import org.aksw.simba.lsq.model.RemoteExecution;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.rdf.model.Resource;

Expand Down Expand Up @@ -53,7 +51,7 @@ public Resource apply(Resource logEntry) {
q.setParseError(t.toString());
}

Resource r = Skolemize.skolemize(queryInDataset, baseIri, LsqQuery.class, (newRoot, renames) -> {
Resource r = SkolemizeBackport.skolemize(queryInDataset, baseIri, LsqQuery.class, (newRoot, renames) -> {
// Optional.ofNullable(renames.get(re.asNode()))
// .map(newRoot.getModel()::wrapAsResource)
// .ifPresent(newRe -> newRe.as(RemoteExecution.class).setSequenceId(null));
Expand Down
Loading

0 comments on commit bdc795c

Please sign in to comment.