Skip to content

Commit

Permalink
In-Condition Support (#167)
Browse files Browse the repository at this point in the history
* Started working on condition in support

* Implemented "in" conditions
  • Loading branch information
johanhaleby authored Nov 1, 2024
1 parent b33f0a1 commit 50030a8
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 122 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### Next version
* Implemented "in" conditions so you can now do e.g. `subscriptionModel.subscribe("id", OccurrentSubscriptionFilter.filter(Filter.streamVersion(Condition.in(12L, 14L))`

### 0.19.6 (2024-10-11)
* Fixed so that inserting events with "any" WriteCondition never fails even if more than two threads are writing events to the same stream at the same time. (Fixed in MongoEventStore and SpringMongoEventStore)

Expand Down
44 changes: 41 additions & 3 deletions common/filter/src/main/java/org/occurrent/condition/Condition.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package org.occurrent.condition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -57,6 +55,25 @@ public String toString() {
}
}

record InOperandCondition<T>(Collection<T> operand, String description) implements Condition<T> {

public InOperandCondition {
requireNonNull(operand, "Operand cannot be null");
requireNonNull(description, "Description cannot be null");
}

@Override
public <T2> Condition<T2> map(Function<T, T2> fn) {
requireNonNull(fn, "Mapping function cannot be null");
return new InOperandCondition<>(operand.stream().map(fn).toList(), description);
}

@Override
public String toString() {
return description;
}
}

record MultiOperandCondition<T>(MultiOperandConditionName operationName, List<Condition<T>> operations, String description) implements Condition<T> {

public MultiOperandCondition {
Expand All @@ -76,6 +93,17 @@ public String toString() {
}
}

static <T> Condition<T> in(Collection<T> values) {
String join = values.stream().map(Objects::toString).collect(Collectors.joining(","));
return new InOperandCondition<>(values, String.format("in any of (%s)", join));
}

@SafeVarargs
static <T> Condition<T> in(T value, T... additionalValues) {
List<T> values = createList(value, additionalValues);
return in(values);
}

static <T> Condition<T> eq(T t) {
return new SingleOperandCondition<>(EQ, t, String.format("to be equal to %s", t));
}
Expand Down Expand Up @@ -154,6 +182,16 @@ private static <T> List<T> createList(T firstCondition, T secondCondition, T[] a
return conditions;
}

private static <T> List<T> createList(T firstCondition, T[] additionalConditions) {
if (additionalConditions == null || additionalConditions.length == 0) {
return List.of(firstCondition);
}
List<T> conditions = new ArrayList<>(1 + additionalConditions.length);
conditions.add(firstCondition);
Collections.addAll(conditions, additionalConditions);
return conditions;
}

enum SingleOperandConditionName {
EQ, LT, GT, LTE, GTE, NE
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import org.occurrent.condition.Condition.SingleOperandConditionName;

import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static java.util.function.Predicate.isEqual;
import static org.occurrent.condition.Condition.SingleOperandConditionName.EQ;
import static org.occurrent.condition.Condition.SingleOperandConditionName.NE;
import static org.occurrent.filter.Filter.*;
Expand All @@ -45,16 +46,11 @@ public static <T> boolean matchesCondition(CloudEvent cloudEvent, String fieldNa
Condition.MultiOperandConditionName operationName = operation.operationName();
List<Condition<T>> operations = operation.operations();
Stream<Boolean> filters = operations.stream().map(c -> matchesCondition(cloudEvent, fieldName, c));
switch (operationName) {
case AND:
return filters.allMatch(Predicate.isEqual(true));
case OR:
return filters.anyMatch(Predicate.isEqual(true));
case NOT:
return filters.allMatch(Predicate.isEqual(false));
default:
throw new IllegalStateException("Unexpected value: " + operationName);
}
return switch (operationName) {
case AND -> filters.allMatch(isEqual(true));
case OR -> filters.anyMatch(isEqual(true));
case NOT -> filters.allMatch(isEqual(false));
};
} else if (condition instanceof SingleOperandCondition<T> singleOperandCondition) {
T expected = singleOperandCondition.operand();
SingleOperandConditionName singleOperandConditionName = singleOperandCondition.operandConditionName();
Expand All @@ -68,24 +64,19 @@ public static <T> boolean matchesCondition(CloudEvent cloudEvent, String fieldNa
Comparable<Object> expectedComparable = toComparable(expected, "Expected value must implement " + Comparable.class.getName() + " in order to be used in Filter's");
Comparable<Object> actualComparable = toComparable(actual, "Value in CloudEvent must implement " + Comparable.class.getName() + " in order to be used in Filter's");
int comparisonResult = actualComparable.compareTo(expectedComparable);
switch (singleOperandConditionName) {
case LT:
matches = comparisonResult < 0;
break;
case GT:
matches = comparisonResult > 0;
break;
case LTE:
matches = comparisonResult <= 0;
break;
case GTE:
matches = comparisonResult >= 0;
break;
default:
throw new IllegalStateException("Unexpected value: " + singleOperandConditionName);
}
matches = switch (singleOperandConditionName) {
case LT -> comparisonResult < 0;
case GT -> comparisonResult > 0;
case LTE -> comparisonResult <= 0;
case GTE -> comparisonResult >= 0;
default -> throw new IllegalStateException("Unexpected value: " + singleOperandConditionName);
};
}
return matches;
} else if (condition instanceof Condition.InOperandCondition<T> inOperandCondition) {
Object actual = extractValue(cloudEvent, fieldName);
Collection<T> operand = inOperandCondition.operand();
return operand.stream().anyMatch(it -> Objects.equals(it, actual));
} else {
throw new IllegalArgumentException("Unsupported condition: " + condition.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.occurrent.condition.Condition.SingleOperandCondition;
import org.occurrent.condition.Condition.SingleOperandConditionName;

import java.util.Collection;
import java.util.List;

/**
Expand All @@ -35,36 +36,25 @@ public static <T> Bson convertConditionToBsonCriteria(String fieldName, Conditio
Condition.MultiOperandConditionName operationName = operation.operationName();
List<Condition<T>> operations = operation.operations();
Bson[] filters = operations.stream().map(c -> convertConditionToBsonCriteria(fieldName, c)).toArray(Bson[]::new);
switch (operationName) {
case AND:
return Filters.and(filters);
case OR:
return Filters.or(filters);
case NOT:
return Filters.not(filters[0]);
default:
throw new IllegalStateException("Unexpected value: " + operationName);
}
} else if (condition instanceof SingleOperandCondition) {
SingleOperandCondition<T> singleOperandCondition = (SingleOperandCondition<T>) condition;
T expectedVersion = singleOperandCondition.operand();
return switch (operationName) {
case AND -> Filters.and(filters);
case OR -> Filters.or(filters);
case NOT -> Filters.not(filters[0]);
};
} else if (condition instanceof SingleOperandCondition<T> singleOperandCondition) {
T operand = singleOperandCondition.operand();
SingleOperandConditionName singleOperandConditionName = singleOperandCondition.operandConditionName();
switch (singleOperandConditionName) {
case EQ:
return Filters.eq(fieldName, expectedVersion);
case LT:
return Filters.lt(fieldName, expectedVersion);
case GT:
return Filters.gt(fieldName, expectedVersion);
case LTE:
return Filters.lte(fieldName, expectedVersion);
case GTE:
return Filters.gte(fieldName, expectedVersion);
case NE:
return Filters.ne(fieldName, expectedVersion);
default:
throw new IllegalStateException("Unexpected value: " + singleOperandConditionName);
}
return switch (singleOperandConditionName) {
case EQ -> Filters.eq(fieldName, operand);
case LT -> Filters.lt(fieldName, operand);
case GT -> Filters.gt(fieldName, operand);
case LTE -> Filters.lte(fieldName, operand);
case GTE -> Filters.gte(fieldName, operand);
case NE -> Filters.ne(fieldName, operand);
};
} else if (condition instanceof Condition.InOperandCondition<T> inOperandCondition) {
Collection<T> operand = inOperandCondition.operand();
return Filters.in(fieldName, operand);
} else {
throw new IllegalArgumentException("Unsupported condition: " + condition.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.occurrent.condition.Condition.SingleOperandCondition;
import org.springframework.data.mongodb.core.query.Criteria;

import java.util.Collection;
import java.util.List;

/**
Expand All @@ -33,35 +34,25 @@ public static <T> Criteria convertConditionToCriteria(String fieldName, Conditio
Condition.MultiOperandConditionName operationName = operation.operationName();
List<Condition<T>> operations = operation.operations();
Criteria[] criteria = operations.stream().map(c -> convertConditionToCriteria(fieldName, c)).toArray(Criteria[]::new);
switch (operationName) {
case AND:
return new Criteria().andOperator(criteria);
case OR:
return new Criteria().orOperator(criteria);
case NOT:
return new Criteria().norOperator(criteria);
default:
throw new IllegalStateException("Unexpected value: " + operationName);
}
return switch (operationName) {
case AND -> new Criteria().andOperator(criteria);
case OR -> new Criteria().orOperator(criteria);
case NOT -> new Criteria().norOperator(criteria);
};
} else if (condition instanceof SingleOperandCondition<T> singleOperandCondition) {
T value = singleOperandCondition.operand();
Condition.SingleOperandConditionName singleOperandConditionName = singleOperandCondition.operandConditionName();
switch (singleOperandConditionName) {
case EQ:
return Criteria.where(fieldName).is(value);
case LT:
return Criteria.where(fieldName).lt(value);
case GT:
return Criteria.where(fieldName).gt(value);
case LTE:
return Criteria.where(fieldName).lte(value);
case GTE:
return Criteria.where(fieldName).gte(value);
case NE:
return Criteria.where(fieldName).ne(value);
default:
throw new IllegalStateException("Unexpected value: " + singleOperandConditionName);
}
return switch (singleOperandConditionName) {
case EQ -> Criteria.where(fieldName).is(value);
case LT -> Criteria.where(fieldName).lt(value);
case GT -> Criteria.where(fieldName).gt(value);
case LTE -> Criteria.where(fieldName).lte(value);
case GTE -> Criteria.where(fieldName).gte(value);
case NE -> Criteria.where(fieldName).ne(value);
};
} else if (condition instanceof Condition.InOperandCondition<T> inOperandCondition) {
Collection<T> operand = inOperandCondition.operand();
return Criteria.where(fieldName).in(operand);
} else {
throw new IllegalArgumentException("Unsupported condition: " + condition.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package org.occurrent.eventstore.api;

import org.occurrent.condition.Condition;
import org.occurrent.condition.Condition.MultiOperandCondition;
import org.occurrent.condition.Condition.MultiOperandConditionName;
import org.occurrent.condition.Condition.SingleOperandCondition;
import org.occurrent.condition.Condition.SingleOperandConditionName;
import org.occurrent.condition.Condition.*;

import java.util.Collection;
import java.util.Objects;
import java.util.stream.Stream;

Expand All @@ -36,35 +34,25 @@ public static boolean evaluate(Condition<Long> condition, long value) {
if (condition instanceof MultiOperandCondition<Long> operation) {
MultiOperandConditionName operationName = operation.operationName();
Stream<Condition<Long>> operations = operation.operations().stream();
switch (operationName) {
case AND:
return operations.allMatch(c -> evaluate(c, value));
case OR:
return operations.anyMatch(c -> evaluate(c, value));
case NOT:
return operations.noneMatch(c -> evaluate(c, value));
default:
throw new IllegalStateException("Unexpected value: " + operationName);
}
return switch (operationName) {
case AND -> operations.allMatch(c -> evaluate(c, value));
case OR -> operations.anyMatch(c -> evaluate(c, value));
case NOT -> operations.noneMatch(c -> evaluate(c, value));
};
} else if (condition instanceof SingleOperandCondition<Long> singleOperandCondition) {
long operand = singleOperandCondition.operand();
SingleOperandConditionName singleOperandConditionName = singleOperandCondition.operandConditionName();
switch (singleOperandConditionName) {
case EQ:
return value == operand;
case LT:
return value < operand;
case GT:
return value > operand;
case LTE:
return value <= operand;
case GTE:
return value >= operand;
case NE:
return value != operand;
default:
throw new IllegalStateException("Unexpected value: " + singleOperandConditionName);
}
return switch (singleOperandConditionName) {
case EQ -> value == operand;
case LT -> value < operand;
case GT -> value > operand;
case LTE -> value <= operand;
case GTE -> value >= operand;
case NE -> value != operand;
};
} else if (condition instanceof InOperandCondition<Long> inOperandCondition) {
Collection<Long> longs = inOperandCondition.operand();
return longs.contains(value);
} else {
throw new IllegalArgumentException("Unsupported condition: " + condition.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,41 @@ void throws_write_condition_not_fulfilled_when_stream_version_does_not_match_exp
}
}

@Nested
@DisplayName("in")
class In {

@Test
void writes_events_when_stream_version_matches_expected_version() {
// When
DomainEvent event1 = new NameDefined(UUID.randomUUID().toString(), now, "name", "John Doe");
unconditionallyPersist(inMemoryEventStore, "name", event1);

DomainEvent event2 = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Jan Doe");
EventStream<CloudEvent> eventStream1 = inMemoryEventStore.read("name");
conditionallyPersist(inMemoryEventStore, eventStream1.id(), WriteCondition.streamVersion(in(eventStream1.version(), eventStream1.version() + 1)), Stream.of(event2));

// Then
EventStream<CloudEvent> eventStream2 = inMemoryEventStore.read("name");
assertThat(eventStream2.map(deserialize(objectMapper))).containsExactly(event1, event2);
}

@Test
void throws_write_condition_not_fulfilled_when_stream_version_does_not_match_expected_version() {
// Given
DomainEvent event1 = new NameDefined(UUID.randomUUID().toString(), now, "name", "John Doe");
unconditionallyPersist(inMemoryEventStore, "name", Stream.of(event1));

// When
DomainEvent event2 = new NameWasChanged(UUID.randomUUID().toString(), now, "name", "Jan Doe");
Throwable throwable = catchThrowable(() -> conditionallyPersist(inMemoryEventStore, "name", streamVersion(in(10L, 12L)), Stream.of(event2)));

// Then
assertThat(throwable).isExactlyInstanceOf(WriteConditionNotFulfilledException.class)
.hasMessage("WriteCondition was not fulfilled. Expected version in any of (10,12) but was 1.");
}
}

@Nested
@DisplayName("ne")
class Ne {
Expand Down
Loading

0 comments on commit 50030a8

Please sign in to comment.