Skip to content

Commit

Permalink
GH-5121: support empty left bind join (OPTIONAL) in FedX
Browse files Browse the repository at this point in the history
Previously we introduced support for left bind joins in FedX. The case
of empty left bind joins (i.e. where the clause inside the OPTIONAL does
not provide any statements) was not handled and resulted in an exception

This change now adds support for empty optional joins and passes the
results from the left-handside through.
  • Loading branch information
aschwarte10 committed Nov 12, 2024
1 parent ab02413 commit e6988bf
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
Expand Down Expand Up @@ -42,7 +43,9 @@ protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
if (expr instanceof StatementTupleExpr) {
StatementTupleExpr stmt = (StatementTupleExpr) expr;
taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt);

} else if (expr instanceof EmptyStatementPattern) {
EmptyStatementPattern stmt = (EmptyStatementPattern) expr;
taskCreator = new EmptyLeftBoundJoinTaskCreator(strategy, stmt);
} else {
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
+ ". Please report this problem.");
Expand All @@ -67,4 +70,20 @@ public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, Li
}
}

static protected class EmptyLeftBoundJoinTaskCreator implements TaskCreator {
protected final FederationEvalStrategy _strategy;
protected final EmptyStatementPattern _expr;

public EmptyLeftBoundJoinTaskCreator(
FederationEvalStrategy strategy, EmptyStatementPattern expr) {
super();
_strategy = strategy;
_expr = expr;
}

@Override
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
return new ParallelEmptyBindLeftJoinTask(control, _strategy, _expr, bindings);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.join;

import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration;

/**
* A {@link ParallelTaskBase} for executing bind left joins, where the join argument is an
* {@link EmptyStatementPattern}. The effective result is that the input bindings from the left operand are passed
* through.
*
* @author Andreas Schwarte
*/
public class ParallelEmptyBindLeftJoinTask extends ParallelTaskBase<BindingSet> {

protected final FederationEvalStrategy strategy;
protected final EmptyStatementPattern rightArg;
protected final List<BindingSet> bindings;
protected final ParallelExecutor<BindingSet> joinControl;

public ParallelEmptyBindLeftJoinTask(ParallelExecutor<BindingSet> joinControl, FederationEvalStrategy strategy,
EmptyStatementPattern expr, List<BindingSet> bindings) {
this.strategy = strategy;
this.rightArg = expr;
this.bindings = bindings;
this.joinControl = joinControl;
}

@Override
public ParallelExecutor<BindingSet> getControl() {
return joinControl;
}

@Override
protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {
// simply return the input bindings (=> the empty statement pattern cannot add results)
return new CollectionIteration<BindingSet>(bindings);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male"));
}

fedxRule.enableDebug();

try {
// run query which joins results from multiple repos
// for a subset of persons there exist names
Expand Down Expand Up @@ -246,4 +244,99 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
}
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void test_leftBindJoin_emptyOptional(boolean bindLeftJoinOptimizationEnabled) throws Exception {

prepareTest(
Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl",
"/tests/basic/data_emptyStore.ttl"));

Repository repo1 = getRepository(1);
Repository repo2 = getRepository(2);
Repository repo3 = getRepository(3);

Repository fedxRepo = fedxRule.getRepository();

fedxRule.setConfig(config -> {
config.withBoundJoinBlockSize(10);
config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled);
});

// add some persons
try (RepositoryConnection conn = repo1.getConnection()) {

for (int i = 1; i <= 30; i++) {
var p = Values.iri("http://ex.com/p" + i);
var otherP = Values.iri("http://other.com/p" + i);
conn.add(p, OWL.SAMEAS, otherP);
}
}

// add names for person 1, 4, 7, ...
try (RepositoryConnection conn = repo2.getConnection()) {

for (int i = 1; i <= 30; i += 3) {
var otherP = Values.iri("http://other.com/p" + i);
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
}
}

// add names for person 2, 5, 8, ...
try (RepositoryConnection conn = repo3.getConnection()) {

for (int i = 2; i <= 30; i += 3) {
var otherP = Values.iri("http://other.com/p" + i);
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
}
}

try {
// run query which joins results from multiple repos
// for a subset of persons there exist names
// the age does not exist for any person
try (RepositoryConnection conn = fedxRepo.getConnection()) {
String query = "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
"SELECT * WHERE { "
+ " ?person owl:sameAs ?otherPerson . "
+ " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3
+ " OPTIONAL { ?otherPerson foaf:age ?age . } " // # does not exist
+ "}";

TupleQuery tupleQuery = conn.prepareTupleQuery(query);
try (TupleQueryResult tqr = tupleQuery.evaluate()) {
var bindings = Iterations.asList(tqr);

Assertions.assertEquals(30, bindings.size());

for (int i = 1; i <= 30; i++) {
var p = Values.iri("http://ex.com/p" + i);
var otherP = Values.iri("http://other.com/p" + i);

// find the bindingset for the person in the unordered result
BindingSet bs = bindings.stream()
.filter(b -> b.getValue("person").equals(p))
.findFirst()
.orElseThrow();

Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
if (i % 3 == 1 || i % 3 == 2) {
// names from repo 2 or 3
Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue());
} else {
// no name for others
Assertions.assertFalse(bs.hasBinding("name"));
}

Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
Assertions.assertFalse(bs.hasBinding("age"));
}
}
}

} finally {
fedxRepo.shutDown();
}
}

}

0 comments on commit e6988bf

Please sign in to comment.