Skip to content

Commit

Permalink
[AMORO-2893] Using tableId as key for cache in TableService
Browse files Browse the repository at this point in the history
 tableId is unique for every table we can simplify the key using tableId,

so that we don't need pass other parameters(maybe need retrieve from db) when retrieve info from cache
  • Loading branch information
klion26 committed Sep 20, 2024
1 parent 1ebfd21 commit b100541
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void getTableDetail(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, database, tableName).buildTableIdentifier()));
if (serverTableIdentifier.isPresent()) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get());
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary();
if (tableRuntimeSummary != null) {
Expand Down Expand Up @@ -656,7 +656,9 @@ public void cancelOptimizingProcess(Context ctx) {
tableService.getServerTableIdentifier(
TableIdentifier.of(catalog, db, table).buildTableIdentifier());
TableRuntime tableRuntime =
serverTableIdentifier != null ? tableService.getRuntime(serverTableIdentifier) : null;
serverTableIdentifier != null
? tableService.getRuntime(serverTableIdentifier.getId())
: null;

Preconditions.checkArgument(
tableRuntime != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public class DefaultTableService extends StatedPersistentBase implements TableSe
private final Map<String, InternalCatalog> internalCatalogMap = new ConcurrentHashMap<>();
private final Map<String, ExternalCatalog> externalCatalogMap = new ConcurrentHashMap<>();

private final Map<ServerTableIdentifier, TableRuntime> tableRuntimeMap =
new ConcurrentHashMap<>();
private final Map<Long, TableRuntime> tableRuntimeMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService tableExplorerScheduler =
Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -213,7 +212,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat
}

ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table);
Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier))
Optional.ofNullable(tableRuntimeMap.remove(serverTableIdentifier.getId()))
.ifPresent(
tableRuntime -> {
if (headHandler != null) {
Expand Down Expand Up @@ -411,7 +410,7 @@ public void initialize() {

private TableRuntime getAndCheckExist(ServerTableIdentifier tableIdentifier) {
Preconditions.checkArgument(tableIdentifier != null, "tableIdentifier cannot be null");
TableRuntime tableRuntime = getRuntime(tableIdentifier);
TableRuntime tableRuntime = getRuntime(tableIdentifier.getId());
if (tableRuntime == null) {
throw new ObjectNotExistsException(tableIdentifier);
}
Expand Down Expand Up @@ -447,15 +446,15 @@ private ServerTableIdentifier getOrSyncServerTableIdentifier(TableIdentifier id)
}

@Override
public TableRuntime getRuntime(ServerTableIdentifier tableIdentifier) {
public TableRuntime getRuntime(Long tableId) {
checkStarted();
return tableRuntimeMap.get(tableIdentifier);
return tableRuntimeMap.get(tableId);
}

@Override
public boolean contains(ServerTableIdentifier tableIdentifier) {
public boolean contains(Long tableId) {
checkStarted();
return tableRuntimeMap.containsKey(tableIdentifier);
return tableRuntimeMap.containsKey(tableId);
}

public void dispose() {
Expand Down Expand Up @@ -645,7 +644,7 @@ private boolean triggerTableAdded(
}
}
TableRuntime tableRuntime = new TableRuntime(serverTableIdentifier, this, table.properties());
tableRuntimeMap.put(serverTableIdentifier, tableRuntime);
tableRuntimeMap.put(serverTableIdentifier.getId(), tableRuntime);
tableRuntime.registerMetric(MetricManager.getInstance().getGlobalRegistry());
if (headHandler != null) {
headHandler.fireTableAdded(table, tableRuntime);
Expand All @@ -659,7 +658,7 @@ private void revertTableRuntimeAdded(
externalCatalog.getServerTableIdentifier(
tableIdentity.getDatabase(), tableIdentity.getTableName());
if (tableIdentifier != null) {
tableRuntimeMap.remove(tableIdentifier);
tableRuntimeMap.remove(tableIdentifier.getId());
}
}

Expand All @@ -671,7 +670,7 @@ private void disposeTable(ServerTableIdentifier tableIdentifier) {
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));
Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier))
Optional.ofNullable(tableRuntimeMap.remove(tableIdentifier.getId()))
.ifPresent(
tableRuntime -> {
if (headHandler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public interface TableManager extends TableRuntimeHandler {
*/
AmoroTable<?> loadTable(ServerTableIdentifier tableIdentifier);

TableRuntime getRuntime(ServerTableIdentifier tableIdentifier);
TableRuntime getRuntime(Long tableId);

default boolean contains(ServerTableIdentifier tableIdentifier) {
return getRuntime(tableIdentifier) != null;
default boolean contains(Long tableId) {
return getRuntime(tableId) != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.amoro.server.table.RuntimeHandlerChain;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableRuntimeMeta;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,10 +62,9 @@ protected BaseTableExecutor(TableManager tableManager, int poolSize) {
}

@Override
protected void initHandler(List<TableRuntimeMeta> tableRuntimeMetaList) {
protected void initHandler(List<TableRuntime> tableRuntimeMetaList) {
tableRuntimeMetaList.stream()
.map(tableRuntimeMeta -> tableRuntimeMeta.getTableRuntime())
.filter(tableRuntime -> enabled(tableRuntime))
.filter(this::enabled)
.forEach(
tableRuntime -> {
if (scheduledTables.add(tableRuntime.getTableIdentifier())) {
Expand Down Expand Up @@ -109,7 +107,8 @@ protected String getThreadName() {
}

private boolean isExecutable(TableRuntime tableRuntime) {
return tableManager.contains(tableRuntime.getTableIdentifier()) && enabled(tableRuntime);
return tableManager.contains(tableRuntime.getTableIdentifier().getId())
&& enabled(tableRuntime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected TableMetadata getTableMetadata(TableIdentifier identifier) {

protected TableRuntime getTableRuntime(TableIdentifier identifier) {
ServerTableIdentifier serverTableIdentifier = getServerTableIdentifier(identifier);
return tableService.getRuntime(serverTableIdentifier);
return tableService.getRuntime(serverTableIdentifier.getId());
}

protected void assertTableRuntime(TableIdentifier identifier, TableFormat format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void initTableWithFiles() {
(MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable();
appendData(mixedTable.asUnkeyedTable(), 1);
appendData(mixedTable.asUnkeyedTable(), 2);
TableRuntime runtime = tableService().getRuntime(serverTableIdentifier());
TableRuntime runtime = tableService().getRuntime(serverTableIdentifier().getId());

runtime.refresh(tableService().loadTable(serverTableIdentifier()));
}
Expand Down Expand Up @@ -387,10 +387,13 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) {
0, optimizingService().listTasks(defaultResourceGroup().getName()).size());
Assertions.assertEquals(
OptimizingProcess.Status.RUNNING,
tableService().getRuntime(serverTableIdentifier()).getOptimizingProcess().getStatus());
tableService()
.getRuntime(serverTableIdentifier().getId())
.getOptimizingProcess()
.getStatus());
Assertions.assertEquals(
OptimizingStatus.COMMITTING,
tableService().getRuntime(serverTableIdentifier()).getOptimizingStatus());
tableService().getRuntime(serverTableIdentifier().getId()).getOptimizingStatus());
}

protected void reload() {
Expand All @@ -415,7 +418,7 @@ public TableRuntimeRefresher() {
}

void refreshPending() {
execute(tableService().getRuntime(serverTableIdentifier()));
execute(tableService().getRuntime(serverTableIdentifier().getId()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ public void testInitialize() throws Exception {
tableService.initialize();
Assert.assertEquals(1, handler.getInitTables().size());
Assert.assertEquals(
createTableId.getId().longValue(), handler.getInitTables().get(0).getTableId());
(Long) createTableId.getId().longValue(),
handler.getInitTables().get(0).getTableIdentifier().getId());

// test change properties
MixedTable mixedTable = (MixedTable) tableService().loadTable(createTableId).originalTable();

mixedTable.updateProperties().set(TableProperties.ENABLE_ORPHAN_CLEAN, "true").commit();
tableService()
.getRuntime(createTableId)
.getRuntime(createTableId.getId())
.refresh(tableService.loadTable(serverTableIdentifier()));
Assert.assertEquals(1, handler.getConfigChangedTables().size());
validateTableRuntime(handler.getConfigChangedTables().get(0).first());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,28 @@ public void testLoadTable() {

@Test
public void testTableContains() {
Assert.assertTrue(tableService().contains(serverTableIdentifier()));
Assert.assertTrue(tableService().contains(serverTableIdentifier().getId()));
ServerTableIdentifier copyId =
ServerTableIdentifier.of(
null,
serverTableIdentifier().getCatalog(),
serverTableIdentifier().getDatabase(),
serverTableIdentifier().getTableName(),
serverTableIdentifier().getFormat());
Assert.assertFalse(tableService().contains(copyId));
Assert.assertFalse(tableService().contains(copyId.getId()));
copyId =
ServerTableIdentifier.of(
serverTableIdentifier().getId(),
serverTableIdentifier().getCatalog(),
serverTableIdentifier().getDatabase(),
"unknown",
serverTableIdentifier().getFormat());
Assert.assertFalse(tableService().contains(copyId));
Assert.assertFalse(tableService().contains(copyId.getId()));
}

@Test
public void testTableRuntime() {
TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier());
TableRuntime tableRuntime = tableService().getRuntime(serverTableIdentifier().getId());
validateTableRuntime(tableRuntime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private boolean isBlocked(BlockableOperation operation) {
}

private boolean isTableRuntimeBlocked(BlockableOperation operation) {
return tableService().getRuntime(serverTableIdentifier()).isBlocked(operation);
return tableService().getRuntime(serverTableIdentifier().getId()).isBlocked(operation);
}

private void assertBlockerCnt(int i) {
Expand Down

0 comments on commit b100541

Please sign in to comment.