Skip to content

Commit

Permalink
refactor walkers (#76)
Browse files Browse the repository at this point in the history
* refactor walkers

* unit tests for conditions, replace null assignment in ElementCondition, primary constructor for IndexStatementCondition explanatory comments for LatestCondition and EarliestCondition

* cleanup walker conditions tests

* use DSL condition instead of Optional, add invalid element name tests

* conditions equality tests

* add an override for equals() in walker conditions

* add equal pointer check for equals methods for conditions, separate equals and notEquals tests

* conditions use String parameters instead of getting them from Element class, refactor tests

* add javadoc for IndexStatementCondition equals method

* add symmetric equality check and null object equality check

* fix IndexCondition equals method

* remove unnecessary comment
  • Loading branch information
elliVM authored Sep 5, 2024
1 parent d7eb7b3 commit 7959e79
Show file tree
Hide file tree
Showing 20 changed files with 1,370 additions and 245 deletions.
53 changes: 53 additions & 0 deletions src/main/java/com/teragrep/pth_06/config/ConditionConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.teragrep.pth_06.config;

import com.teragrep.pth_06.planner.walker.conditions.ElementCondition;
import org.jooq.DSLContext;

public final class ConditionConfig {
private final DSLContext ctx;
private final boolean streamQuery;
private final boolean bloomEnabled;
private final boolean withoutFilters;

public ConditionConfig(DSLContext ctx, boolean streamQuery) {
this.ctx = ctx;
this.streamQuery = streamQuery;
this.bloomEnabled = false;
this.withoutFilters = false;
}

public ConditionConfig(DSLContext ctx, boolean streamQuery, boolean bloomEnabled, boolean withoutFilters) {
this.ctx = ctx;
this.streamQuery = streamQuery;
this.bloomEnabled = bloomEnabled;
this.withoutFilters = withoutFilters;
}

public DSLContext context() {
return ctx;
}

public boolean bloomEnabled() {
return bloomEnabled;
}

public boolean streamQuery() {
return streamQuery;
}

public boolean withoutFilter() {
return withoutFilters;
}

@Override
public boolean equals(Object object) {
if (this == object) return true;
if (object == null) return false;
if (object.getClass() != this.getClass()) return false;
final ConditionConfig cast = (ConditionConfig) object;
return this.bloomEnabled == cast.bloomEnabled &&
this.streamQuery == cast.streamQuery &&
this.withoutFilters == cast.withoutFilters &&
this.ctx == cast.ctx;
}
}
228 changes: 6 additions & 222 deletions src/main/java/com/teragrep/pth_06/planner/walker/ConditionWalker.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,14 @@
*/
package com.teragrep.pth_06.planner.walker;

import com.teragrep.blf_01.Token;
import com.teragrep.blf_01.Tokenizer;
import com.teragrep.pth_06.planner.StreamDBClient;
import org.apache.spark.util.sketch.BloomFilter;
import com.teragrep.pth_06.config.ConditionConfig;
import com.teragrep.pth_06.planner.walker.conditions.ElementCondition;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Element;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;

import static com.teragrep.pth_06.jooq.generated.bloomdb.Bloomdb.BLOOMDB;
import static com.teragrep.pth_06.jooq.generated.journaldb.Journaldb.JOURNALDB;
import static com.teragrep.pth_06.jooq.generated.streamdb.Streamdb.STREAMDB;

/**
* <h1>Condition Walker</h1> Walker for conditions.
*
Expand All @@ -76,45 +61,27 @@
* @author Mikko Kortelainen
* @author Ville Manninen
*/
public class ConditionWalker extends XmlWalker {
public class ConditionWalker extends XmlWalker<Condition> {

private final boolean bloomEnabled;
private final Logger LOGGER = LoggerFactory.getLogger(ConditionWalker.class);
// Default query is full
private boolean streamQuery = false;
private final DSLContext ctx;

final Tokenizer tokenizer;

// TODO a hack to get global earliest value, default -24h from now
private long globalEarliestEpoch = Instant.now().getEpochSecond() - 24 * 3600;

private void updateGlobalEarliestEpoch(long earliest) {
if (globalEarliestEpoch > earliest) {
// decrease global earliest value
globalEarliestEpoch = earliest;
}
}

public long getGlobalEarliestEpoch() {
return globalEarliestEpoch;
}

/**
* Constructor without connection. Used during unit-tests. Enables jooq-query construction.
*/
public ConditionWalker() {
super();
this.ctx = null;
this.bloomEnabled = false;
this.tokenizer = new Tokenizer(32);
}

public ConditionWalker(DSLContext ctx, boolean bloomEnabled) {
super();
this.ctx = ctx;
this.bloomEnabled = bloomEnabled;
this.tokenizer = new Tokenizer(32);
}

public Condition fromString(String inXml, boolean streamQuery) throws Exception {
Expand Down Expand Up @@ -172,191 +139,8 @@ public Condition emitUnaryOperation(String op, Element current) throws Exception
}

Condition emitElem(Element current) {

String tag = current.getTagName();

if (tag == null) {
throw new IllegalArgumentException("Tag name for Element was null");
}
if (!current.hasAttribute("operation")) {
throw new IllegalStateException(
"Could not find specified or default value for 'operation' attribute from Element"
);
}
if (!current.hasAttribute("value")) {
throw new IllegalStateException(
"Could not find specified or default value for 'value' attribute from Element"
);
}

String value = current.getAttribute("value");
String operation = current.getAttribute("operation");

//System.out.println("StreamQuery="+streamQuery+" Node is terminal tag:" + tag + " val:" + value + " Operation:" + operation);
Condition queryCondition = null;
// directory
if (tag.equalsIgnoreCase("index")) {
if (streamQuery) {
queryCondition = STREAMDB.STREAM.DIRECTORY.like(value.replace('*', '%'));
}
else {
queryCondition = StreamDBClient.GetArchivedObjectsFilterTable.directory
.like(value.replace('*', '%').toLowerCase());
}
if (operation.equalsIgnoreCase("NOT_EQUALS")) {
queryCondition = queryCondition.not();
}
}
// stream
if (tag.equalsIgnoreCase("sourcetype")) {
if (streamQuery) {
queryCondition = STREAMDB.STREAM.STREAM_.like(value.replace('*', '%'));
}
else {
queryCondition = StreamDBClient.GetArchivedObjectsFilterTable.stream
.like(value.replace('*', '%').toLowerCase());
}
if (operation.equalsIgnoreCase("NOT_EQUALS")) {
queryCondition = queryCondition.not();
}
}
// host
if (tag.equalsIgnoreCase("host")) {
if (streamQuery) {
queryCondition = STREAMDB.HOST.NAME.like(value.replace('*', '%'));
}
else {
queryCondition = StreamDBClient.GetArchivedObjectsFilterTable.host
.like(value.replace('*', '%').toLowerCase());
}
if (operation.equalsIgnoreCase("NOT_EQUALS")) {
queryCondition = queryCondition.not();
}
}
if (!streamQuery) {
// Handle also time qualifiers
if (tag.equalsIgnoreCase("earliest") || tag.equalsIgnoreCase("index_earliest")) {
// SQL connection uses localTime in the session, so we use unix to come over the conversions
// hour based files are being used so earliest needs conversion to the point of the last hour
int earliestEpoch = Integer.parseInt(value);

// TODO this is a hack to update globaol earliest value
updateGlobalEarliestEpoch(earliestEpoch);

int earliestEpochHour = earliestEpoch - earliestEpoch % 3600;
Instant instant = Instant.ofEpochSecond(earliestEpochHour);
java.sql.Date timequalifier = new Date(instant.toEpochMilli());

queryCondition = JOURNALDB.LOGFILE.LOGDATE.greaterOrEqual(timequalifier);
/* not supported for mariadb
queryCondition = queryCondition.and(toTimestamp(
regexpReplaceAll(JOURNALDB.LOGFILE.PATH, "((^.*\\/.*-)|(\\.log\\.gz.*))", ""),
"YYYYMMDDHH24").greaterOrEqual(Timestamp.from(instant)));
*/
// NOTE uses literal path
queryCondition = queryCondition
.and(
"UNIX_TIMESTAMP(STR_TO_DATE(SUBSTRING(REGEXP_SUBSTR(path,'[0-9]+(\\.log)?\\.gz(\\.[0-9]*)?$'), 1, 10), '%Y%m%d%H'))"
+ " >= " + instant.getEpochSecond()
);
}
if (tag.equalsIgnoreCase("latest") || tag.equalsIgnoreCase("index_latest")) {
// SQL connection uses localTime in the session, so we use unix to come over the conversions
Instant instant = Instant.ofEpochSecond(Integer.parseInt(value));
java.sql.Date timequalifier = new Date(instant.toEpochMilli());

queryCondition = JOURNALDB.LOGFILE.LOGDATE.lessOrEqual(timequalifier);
/* not supported for mariadb
queryCondition = queryCondition.and(toTimestamp(
regexpReplaceAll(JOURNALDB.LOGFILE.PATH, "((^.*\\/.*-)|(\\.log\\.gz.*))", ""),
"YYYYMMDDHH24").lessOrEqual(Timestamp.from(instant)));
*/
// NOTE uses literal path
/*
to match
2021/09-27/sc-99-99-14-244/messages/messages-2021092722.gz.4
2018/04-29/sc-99-99-14-245/f17/f17.logGLOB-2018042900.log.gz
*/

queryCondition = queryCondition
.and(
"UNIX_TIMESTAMP(STR_TO_DATE(SUBSTRING(REGEXP_SUBSTR(path,'[0-9]+(\\.log)?\\.gz(\\.[0-9]*)?$'), 1, 10), '%Y%m%d%H'))"
+ " <= " + instant.getEpochSecond()
);
}
// value search
if ("indexstatement".equalsIgnoreCase(tag) && bloomEnabled) {
if ("EQUALS".equals(operation)) {

final Set<Token> tokenSet = new HashSet<>(
tokenizer.tokenize(new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)))
);

LOGGER.info("BloomFilter tokenSet <[{}]>", tokenSet.toString());

final BloomFilter smallFilter = BloomFilter.create(100000, 0.01);
final BloomFilter mediumFilter = BloomFilter.create(1000000, 0.03);
final BloomFilter largeFilter = BloomFilter.create(2500000, 0.05);

tokenSet.forEach(token -> {
smallFilter.put(token.toString());
mediumFilter.put(token.toString());
largeFilter.put(token.toString());
});

long rowId = StreamDBClient.BloomFiltersTempTable
.insert(ctx, smallFilter, mediumFilter, largeFilter);

Condition rowIdCondition = StreamDBClient.BloomFiltersTempTable.id.eq(rowId);

Field<byte[]> smallColumn = DSL
.select(StreamDBClient.BloomFiltersTempTable.fe100kfp001)
.from(StreamDBClient.BloomFiltersTempTable.BLOOM_TABLE)
.where(rowIdCondition)
.asField();
Field<byte[]> mediumColumn = DSL
.select(StreamDBClient.BloomFiltersTempTable.fe1000kfpp003)
.from(StreamDBClient.BloomFiltersTempTable.BLOOM_TABLE)
.where(rowIdCondition)
.asField();
Field<byte[]> largeColumn = DSL
.select(StreamDBClient.BloomFiltersTempTable.fe2500kfpp005)
.from(StreamDBClient.BloomFiltersTempTable.BLOOM_TABLE)
.where(rowIdCondition)
.asField();

final Field<Boolean> fe100kfp001 = DSL
.function(
"bloommatch", Boolean.class, smallColumn,
BLOOMDB.FILTER_EXPECTED_100000_FPP_001.FILTER
);
final Field<Boolean> fe1000kfpp003 = DSL
.function(
"bloommatch", Boolean.class, mediumColumn,
BLOOMDB.FILTER_EXPECTED_1000000_FPP_003.FILTER
);
final Field<Boolean> fe2500kfpp005 = DSL
.function(
"bloommatch", Boolean.class, largeColumn,
BLOOMDB.FILTER_EXPECTED_2500000_FPP_005.FILTER
);

Condition noBloomFilter = BLOOMDB.FILTER_EXPECTED_100000_FPP_001.FILTER
.isNull()
.and(
BLOOMDB.FILTER_EXPECTED_1000000_FPP_003.FILTER
.isNull()
.and(BLOOMDB.FILTER_EXPECTED_2500000_FPP_005.FILTER.isNull())
);
queryCondition = fe100kfp001
.eq(true)
.or(fe1000kfpp003.eq(true).or(fe2500kfpp005.eq(true).or(noBloomFilter)));
LOGGER.trace("ConditionWalker.emitElement bloomCondition part <{}>", queryCondition);
}
}
}

return queryCondition;
ElementCondition elementCondition = new ElementCondition(current,
new ConditionConfig(ctx, streamQuery, bloomEnabled, false));
return elementCondition.condition();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* @author Mikko Kortelainen
* @author Ville Manninen
*/
public class EarliestWalker extends XmlWalker {
public class EarliestWalker extends XmlWalker<Long> {

private final Logger LOGGER = LoggerFactory.getLogger(EarliestWalker.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* @author Mikko Kortelainen
* @since 08/06/2022
*/
public class KafkaWalker extends XmlWalker {
public class KafkaWalker extends XmlWalker<String> {

@Override
String emitElem(Element current) {
Expand Down
Loading

0 comments on commit 7959e79

Please sign in to comment.