Skip to content

Commit

Permalink
PHOENIX-5055 Split mutations batches probably affects correctness of …
Browse files Browse the repository at this point in the history
…index data
  • Loading branch information
jaanai committed Jan 5, 2019
1 parent e6e043b commit 52a89f0
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.Properties;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down Expand Up @@ -157,5 +163,43 @@ public void testMutationEstimatedSize() throws Exception {
stmt.execute();
assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize());
}


@Test
public void testSplitMutationsIntoSameGroupForSingleRow() throws Exception {
String tableName = "TBL_" + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
Properties props = new Properties();
props.put("phoenix.mutate.batchSize", "2");
try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class)) {
conn.setAutoCommit(false);
conn.createStatement().executeUpdate(
"CREATE TABLE " + tableName + " ("
+ "A VARCHAR NOT NULL PRIMARY KEY,"
+ "B VARCHAR,"
+ "C VARCHAR,"
+ "D VARCHAR) COLUMN_ENCODED_BYTES = 0");
conn.createStatement().executeUpdate("CREATE INDEX " + indexName + " on " + tableName + " (C) INCLUDE(D)");

conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A2','B2','C2','D2')");
conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A3','B3', 'C3', null)");
conn.commit();

Table htable = conn.getQueryServices().getTable(Bytes.toBytes(tableName));
Scan scan = new Scan();
scan.setRaw(true);
Iterator<Result> scannerIter = htable.getScanner(scan).iterator();
while (scannerIter.hasNext()) {
long ts = -1;
Result r = scannerIter.next();
for (Cell cell : r.listCells()) {
if (ts == -1) {
ts = cell.getTimestamp();
} else {
assertEquals("(" + cell.toString() + ") has different ts", ts, cell.getTimestamp());
}
}
}
htable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,15 +492,15 @@ public void testMutationBatch() throws Exception {
upsertRows(connection, fullTableName);
connection.commit();
assertEquals(2L, connection.getMutationState().getBatchCount());
// set the batch size (rows) to 1
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1");

// set the batch size (rows) to 2 since three are at least 2 mutations when updates a single row
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "2");
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
upsertRows(connection, fullTableName);
connection.commit();
// each row should be in its own batch
assertEquals(4L, connection.getMutationState().getBatchCount());
assertEquals(2L, connection.getMutationState().getBatchCount());
}

private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1090,34 +1091,56 @@ public void doMutation() throws IOException {
}

/**
* Split the list of mutations into multiple lists that don't exceed row and byte thresholds
*
*
* Split the list of mutations into multiple lists. since a single row update can contain multiple mutations,
* we only check if the current batch has exceeded the row or size limit for different rows,
* so that mutations for a single row don't end up in different batches.
*
* @param allMutationList
* List of HBase mutations
* @return List of lists of mutations
*/
public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes,
List<Mutation> allMutationList) {
public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, List<Mutation> allMutationList) {
Preconditions.checkArgument(batchSize> 1,
"Mutation types are put or delete, for one row all mutations must be in one batch.");
Preconditions.checkArgument(batchSizeBytes > 0, "Batch size must be larger than 0");
List<List<Mutation>> mutationBatchList = Lists.newArrayList();
List<Mutation> currentList = Lists.newArrayList();
List<Mutation> sameRowList = Lists.newArrayList();
long currentBatchSizeBytes = 0L;
for (Mutation mutation : allMutationList) {
long mutationSizeBytes = PhoenixKeyValueUtil.calculateMutationDiskSize(mutation);
if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) {
for (int i = 0; i < allMutationList.size(); ) {
long sameRowBatchSize = 1L;
Mutation mutation = allMutationList.get(i);
long sameRowMutationSizeBytes = PhoenixKeyValueUtil.calculateMutationDiskSize(mutation);
sameRowList.add(mutation);
while (i + 1 < allMutationList.size() &&
Bytes.compareTo(allMutationList.get(i + 1).getRow(), mutation.getRow()) == 0) {
Mutation sameRowMutation = allMutationList.get(i + 1);
sameRowList.add(sameRowMutation);
sameRowMutationSizeBytes += PhoenixKeyValueUtil.calculateMutationDiskSize(sameRowMutation);
sameRowBatchSize++;
i++;
}

if (currentList.size() + sameRowBatchSize > batchSize ||
currentBatchSizeBytes + sameRowMutationSizeBytes > batchSizeBytes) {
if (currentList.size() > 0) {
mutationBatchList.add(currentList);
currentList = Lists.newArrayList();
currentBatchSizeBytes = 0L;
}
}
currentList.add(mutation);
currentBatchSizeBytes += mutationSizeBytes;

currentList.addAll(sameRowList);
currentBatchSizeBytes += sameRowMutationSizeBytes;
sameRowList.clear();
i++;
}

if (currentList.size() > 0) {
mutationBatchList.add(currentList);
}
return mutationBatchList;

}

public byte[] encodeTransaction() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.phoenix.execute;

import com.google.common.collect.ImmutableList;

import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand All @@ -30,6 +32,9 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.schema.types.PUnsignedInt;
Expand Down Expand Up @@ -135,4 +140,39 @@ private void assertTable(String tableName1,List<Cell> keyValues1,String tableNam
assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(1)))));

}

@Test
public void testGetMutationBatchList() {
byte[] r1 = Bytes.toBytes(1);
byte[] r2 = Bytes.toBytes(2);
byte[] r3 = Bytes.toBytes(3);
byte[] r4 = Bytes.toBytes(4);
// one put and one delete as a group
{
List<Mutation> list = ImmutableList.of(new Put(r1), new Put(r2), new Delete(r2));
List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
assertTrue(batchLists.size() == 2);
assertEquals(batchLists.get(0).size(), 1);
assertEquals(batchLists.get(1).size(), 2);
}

{
List<Mutation> list = ImmutableList.of(new Put(r1), new Delete(r1), new Put(r2));
List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
assertTrue(batchLists.size() == 2);
assertEquals(batchLists.get(0).size(), 2);
assertEquals(batchLists.get(1).size(), 1);
}

{
List<Mutation> list = ImmutableList.of(new Put(r3), new Put(r1), new Delete(r1), new Put(r2), new Put(r4), new Delete(r4));
List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
assertTrue(batchLists.size() == 4);
assertEquals(batchLists.get(0).size(), 1);
assertEquals(batchLists.get(1).size(), 2);
assertEquals(batchLists.get(2).size(), 1);
assertEquals(batchLists.get(3).size(), 2);
}

}
}

0 comments on commit 52a89f0

Please sign in to comment.