Skip to content

Commit 207853c

Browse files
committed
HIVE-28578: Concurrency issue in updateTableColumnStatistics
1 parent 94fe7a2 commit 207853c

File tree

1 file changed

+36
-30
lines changed
  • standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore

1 file changed

+36
-30
lines changed

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9504,6 +9504,9 @@ public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats
95049504
// So let's not use them anywhere unless absolutely necessary.
95059505
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf);
95069506
MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), statsDesc.getTableName());
9507+
lockForUpdate("TBLS", "TBL_ID", Optional.of("\"TBL_ID\" = " + mTable.getId()));
9508+
// Get the newest version of mTable
9509+
pm.refresh(mTable);
95079510
Table table = convertToTable(mTable);
95089511
List<String> colNames = new ArrayList<>();
95099512
for (ColumnStatisticsObj statsObj : statsObjs) {
@@ -9592,23 +9595,28 @@ public Map<String, String> updatePartitionColumnStatistics(Table table, MTable m
95929595
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
95939596
ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
95949597
String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf);
9595-
Partition partition = convertToPart(catName, statsDesc.getDbName(), statsDesc.getTableName(), getMPartition(
9596-
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable), TxnUtils.isAcidTable(table));
9598+
MPartition mPartition = getMPartition(
9599+
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable);
9600+
Partition partition = convertToPart(catName, statsDesc.getDbName(), statsDesc.getTableName(),
9601+
mPartition, TxnUtils.isAcidTable(table));
95979602
List<String> colNames = new ArrayList<>();
95989603

95999604
for(ColumnStatisticsObj statsObj : statsObjs) {
96009605
colNames.add(statsObj.getColName());
96019606
}
96029607

9603-
Map<String, MPartitionColumnStatistics> oldStats = getPartitionColStats(table, statsDesc
9604-
.getPartName(), colNames, colStats.getEngine());
9605-
9606-
MPartition mPartition = getMPartition(
9607-
catName, statsDesc.getDbName(), statsDesc.getTableName(), partVals, mTable);
9608-
if (partition == null) {
9608+
List<Long> partitionIds = directSql.getPartitionFieldsViaSqlFilter(catName, statsDesc.getDbName(), statsDesc.getTableName(),
9609+
Arrays.asList("\"PART_ID\""), "\"PARTITIONS\".\"PART_NAME\" = ?",
9610+
Arrays.asList(Warehouse.makePartName(table.getPartitionKeys(), partVals)), Collections.emptyList(), -1);
9611+
if (partition == null || partitionIds.isEmpty()) {
96099612
throw new NoSuchObjectException("Partition for which stats is gathered doesn't exist.");
96109613
}
96119614

9615+
Map<String, MPartitionColumnStatistics> oldStats = getPartitionColStats(table, statsDesc
9616+
.getPartName(), colNames, colStats.getEngine());
9617+
lockForUpdate("PARTITIONS", "PART_ID", Optional.of("\"PART_ID\" = " + partitionIds.getFirst()));
9618+
pm.refresh(mPartition);
9619+
96129620
for (ColumnStatisticsObj statsObj : statsObjs) {
96139621
MPartitionColumnStatistics mStatsObj =
96149622
StatObjectConverter.convertToMPartitionColumnStatistics(mPartition, statsDesc, statsObj, colStats.getEngine());
@@ -11435,36 +11443,34 @@ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, Stri
1143511443
return writeEventInfoList;
1143611444
}
1143711445

11438-
private void prepareQuotes() throws SQLException {
11439-
String s = dbType.getPrepareTxnStmt();
11440-
if (s != null) {
11441-
assert pm.currentTransaction().isActive();
11442-
JDOConnection jdoConn = pm.getDataStoreConnection();
11443-
try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) {
11444-
statement.execute(s);
11445-
} finally {
11446-
jdoConn.close();
11447-
}
11448-
}
11449-
}
11450-
11451-
private void lockNotificationSequenceForUpdate() throws MetaException {
11446+
private void lockForUpdate(String tableName, String column, Optional<String> rowFilter)
11447+
throws MetaException {
1145211448
if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) {
1145311449
// Derby doesn't allow FOR UPDATE to lock the row being selected (See https://db.apache
1145411450
// .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole table. Since there's
1145511451
// only one row in the table, this shouldn't cause any performance degradation.
1145611452
new RetryingExecutor(conf, () -> {
11457-
directSql.lockDbTable("NOTIFICATION_SEQUENCE");
11453+
directSql.lockDbTable(tableName);
1145811454
}).run();
1145911455
} else {
11460-
String selectQuery = "select \"NEXT_EVENT_ID\" from \"NOTIFICATION_SEQUENCE\"";
11456+
String selectQuery = "select \"" + column + "\" from \"" + tableName + "\"" +
11457+
rowFilter.map(f -> " where " + f).orElse("");
1146111458
String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery);
1146211459
new RetryingExecutor(conf, () -> {
11463-
prepareQuotes();
11464-
try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", lockingQuery))) {
11465-
query.setUnique(true);
11466-
// only need to execute it to get db Lock
11467-
query.execute();
11460+
String txnStmt = dbType.getPrepareTxnStmt();
11461+
List<String> statements = new ArrayList<>();
11462+
if (txnStmt != null) {
11463+
statements.add(txnStmt);
11464+
}
11465+
statements.add(lockingQuery);
11466+
assert pm.currentTransaction().isActive();
11467+
JDOConnection jdoConn = pm.getDataStoreConnection();
11468+
try (Statement statement = ((Connection) jdoConn.getNativeConnection()).createStatement()) {
11469+
for (String s : statements) {
11470+
statement.execute(s);
11471+
}
11472+
} finally {
11473+
jdoConn.close();
1146811474
}
1146911475
}).run();
1147011476
}
@@ -11532,7 +11538,7 @@ public void addNotificationEvent(NotificationEvent entry) throws MetaException {
1153211538
try {
1153311539
pm.flush();
1153411540
openTransaction();
11535-
lockNotificationSequenceForUpdate();
11541+
lockForUpdate("NOTIFICATION_SEQUENCE", "NEXT_EVENT_ID", Optional.empty());
1153611542
query = pm.newQuery(MNotificationNextId.class);
1153711543
Collection<MNotificationNextId> ids = (Collection) query.execute();
1153811544
MNotificationNextId mNotificationNextId = null;

0 commit comments

Comments
 (0)