Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private void updateWithSequenceGroup(KeyValue kv) {
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = fieldAggregators.iterator();
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? aggIter.next() : null;

boolean[] isEmptySequenceGroup = new boolean[getters.length];
boolean[] isProcessedSequenceField = new boolean[getters.length];
for (int i = 0; i < getters.length; i++) {
FieldsComparator seqComparator = null;
if (curComparator != null && curComparator.fieldIndex == i) {
Expand All @@ -214,15 +214,13 @@ private void updateWithSequenceGroup(KeyValue kv) {
}

Object accumulator = row.getField(i);
if (seqComparator == null) {
Object field = getters[i].getFieldOrNull(kv.value());
if (aggregator != null) {
row.setField(i, aggregator.agg(accumulator, field));
} else if (field != null) {
row.setField(i, field);
if (seqComparator != null) {
// Skip if this field has already been processed as part of a sequence group
if (isProcessedSequenceField[i]) {
continue;
}
} else {
if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) {

if (isEmptySequenceGroup(kv, seqComparator, isProcessedSequenceField)) {
// skip null sequence group
continue;
}
Expand All @@ -237,6 +235,8 @@ private void updateWithSequenceGroup(KeyValue kv) {
for (int fieldIndex : seqComparator.compareFields()) {
row.setField(
fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
// Mark these sequence fields as processed
isProcessedSequenceField[fieldIndex] = true;
}
continue;
}
Expand All @@ -245,27 +245,28 @@ private void updateWithSequenceGroup(KeyValue kv) {
} else if (aggregator != null) {
row.setField(i, aggregator.aggReversed(accumulator, field));
}
} else {
Object field = getters[i].getFieldOrNull(kv.value());
if (aggregator != null) {
row.setField(i, aggregator.agg(accumulator, field));
} else if (field != null) {
row.setField(i, field);
}
}
}
}

private boolean isEmptySequenceGroup(
KeyValue kv, FieldsComparator comparator, boolean[] isEmptySequenceGroup) {

// If any flag of the sequence fields is set, it means the sequence group is empty.
if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
return true;
}

KeyValue kv, FieldsComparator comparator, boolean[] isProcessedSequenceField) {
for (int fieldIndex : comparator.compareFields()) {
if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
return false;
}
}

// Set the flag of all the sequence fields of the sequence group.
// Mark these sequence fields as processed
for (int fieldIndex : comparator.compareFields()) {
isEmptySequenceGroup[fieldIndex] = true;
isProcessedSequenceField[fieldIndex] = true;
}

return true;
Expand All @@ -280,7 +281,7 @@ private void retractWithSequenceGroup(KeyValue kv) {
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = fieldAggregators.iterator();
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? aggIter.next() : null;

boolean[] isEmptySequenceGroup = new boolean[getters.length];
boolean[] isProcessedSequenceField = new boolean[getters.length];
for (int i = 0; i < getters.length; i++) {
FieldsComparator seqComparator = null;
if (curComparator != null && curComparator.fieldIndex == i) {
Expand All @@ -295,7 +296,12 @@ private void retractWithSequenceGroup(KeyValue kv) {
}

if (seqComparator != null) {
if (isEmptySequenceGroup(kv, seqComparator, isEmptySequenceGroup)) {
// Skip if this field has already been processed as part of a sequence group
if (isProcessedSequenceField[i]) {
continue;
}

if (isEmptySequenceGroup(kv, seqComparator, isProcessedSequenceField)) {
// skip null sequence group
continue;
}
Expand All @@ -319,6 +325,8 @@ private void retractWithSequenceGroup(KeyValue kv) {
updatedSequenceFields.add(field);
}
}
// Mark these sequence fields as processed
isProcessedSequenceField[field] = true;
}
} else {
// retract normal field
Expand Down