diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java index eb5226e6ab..44b9364e70 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java @@ -525,14 +525,24 @@ public List getTableBranches(AmoroTable amoroTable) { @Override public List getTableConsumerInfos(AmoroTable amoroTable) { FileStoreTable table = getTable(amoroTable); + FileStore store = table.store(); ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location()); List consumerInfos = new ArrayList<>(); try { consumerManager .consumers() .forEach( - (consumerId, nextSnapshot) -> - consumerInfos.add(new ConsumerInfo(consumerId, nextSnapshot))); + (consumerId, nextSnapshotId) -> { + long currentSnapshotId = nextSnapshotId; + if (!table.snapshotManager().snapshotExists(currentSnapshotId)) { + // if not exits,maybe steaming scan is running,so need to nextSnapshotId -1 + currentSnapshotId = nextSnapshotId - 1; + } + Snapshot snapshot = table.snapshotManager().snapshot(currentSnapshotId); + AmoroSnapshotsOfTable amoroSnapshotsOfTable = getSnapshotsOfTable(store, snapshot); + consumerInfos.add( + new ConsumerInfo(consumerId, nextSnapshotId, amoroSnapshotsOfTable)); + }); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java index 1e5f6e9c96..4e21a69d66 100644 --- a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java +++ b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ConsumerInfo.java @@ -20,32 +20,26 @@ public class ConsumerInfo { - public static final String CONSUMER_ID = "consumer_id"; - public static final String NEXT_SNAPSHOT_ID = "next_snapshot_id"; + private final String consumerId; + private final long nextSnapshotId; + private final AmoroSnapshotsOfTable amoroCurrentSnapshotsOfTable; - private String consumerId; - private long nextSnapshotId; - - public ConsumerInfo() {} - - public ConsumerInfo(String consumerId, long nextSnapshotId) { + public ConsumerInfo( + String consumerId, long nextSnapshotId, AmoroSnapshotsOfTable amoroCurrentSnapshotsOfTable) { this.consumerId = consumerId; this.nextSnapshotId = nextSnapshotId; + this.amoroCurrentSnapshotsOfTable = amoroCurrentSnapshotsOfTable; } public String getConsumerId() { return consumerId; } - public void setConsumerId(String consumerId) { - this.consumerId = consumerId; - } - public long getNextSnapshotId() { return nextSnapshotId; } - public void setNextSnapshotId(long nextSnapshotId) { - this.nextSnapshotId = nextSnapshotId; + public AmoroSnapshotsOfTable getAmoroCurrentSnapshotsOfTable() { + return amoroCurrentSnapshotsOfTable; } }