Skip to content

Commit abef6eb

Browse files
authored
Merge branch 'main' into feature/connector-failed-modal
2 parents 8c9ecf8 + bbffa70 commit abef6eb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1376
-159
lines changed

.github/workflows/e2e-playwright-run.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ jobs:
3232
uses: actions/setup-node@v3
3333
with:
3434
node-version: 18
35+
cache-dependency-path: ./e2e-playwright/package-lock.json
3536
cache: 'npm'
3637

3738
- name: Install NPM dependencies

api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ dependencies {
5454
antlr libs.antlr
5555
implementation libs.antlr.runtime
5656

57+
implementation libs.lucene
58+
implementation libs.lucene.queryparser
59+
implementation libs.lucene.analysis.common
60+
5761
implementation libs.opendatadiscovery.oddrn
5862
implementation(libs.opendatadiscovery.client) {
5963
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'
@@ -74,6 +78,7 @@ dependencies {
7478
// END Fixes https://www.cve.org/CVERecord?id=CVE-2025-58056 and https://www.cve.org/CVERecord?id=CVE-2025-58057
7579
// CVE Fixes End
7680

81+
7782
implementation libs.modelcontextprotocol.spring.webflux
7883
implementation libs.victools.jsonschema.generator
7984

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ClustersProperties {
4141
MetricsStorage defaultMetricsStorage = new MetricsStorage();
4242

4343
CacheProperties cache = new CacheProperties();
44+
ClusterFtsProperties fts = new ClusterFtsProperties();
4445

4546
@Data
4647
public static class Cluster {
@@ -217,6 +218,25 @@ public static class CacheProperties {
217218
Duration connectClusterCacheExpiry = Duration.ofHours(24);
218219
}
219220

221+
@Data
222+
@NoArgsConstructor
223+
@AllArgsConstructor
224+
public static class NgramProperties {
225+
int ngramMin = 1;
226+
int ngramMax = 4;
227+
}
228+
229+
@Data
230+
@NoArgsConstructor
231+
@AllArgsConstructor
232+
public static class ClusterFtsProperties {
233+
boolean enabled = false;
234+
NgramProperties schemas = new NgramProperties(1, 4);
235+
NgramProperties consumers = new NgramProperties(1, 4);
236+
NgramProperties connect = new NgramProperties(1, 4);
237+
NgramProperties acl = new NgramProperties(1, 4);
238+
}
239+
220240
@PostConstruct
221241
public void validateAndSetDefaults() {
222242
if (clusters != null) {

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
package io.kafbat.ui.controller;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import io.kafbat.ui.api.SchemasApi;
4+
import io.kafbat.ui.api.model.SchemaColumnsToSort;
5+
import io.kafbat.ui.config.ClustersProperties;
66
import io.kafbat.ui.exception.ValidationException;
77
import io.kafbat.ui.mapper.KafkaSrMapper;
88
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
99
import io.kafbat.ui.model.CompatibilityCheckResponseDTO;
1010
import io.kafbat.ui.model.CompatibilityLevelDTO;
11+
import io.kafbat.ui.model.InternalTopic;
1112
import io.kafbat.ui.model.KafkaCluster;
1213
import io.kafbat.ui.model.NewSchemaSubjectDTO;
14+
import io.kafbat.ui.model.SchemaColumnsToSortDTO;
1315
import io.kafbat.ui.model.SchemaSubjectDTO;
1416
import io.kafbat.ui.model.SchemaSubjectsResponseDTO;
17+
import io.kafbat.ui.model.SortOrderDTO;
1518
import io.kafbat.ui.model.rbac.AccessContext;
1619
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1720
import io.kafbat.ui.service.SchemaRegistryService;
21+
import io.kafbat.ui.service.SchemaRegistryService.SubjectWithCompatibilityLevel;
22+
import io.kafbat.ui.service.index.SchemasFilter;
1823
import io.kafbat.ui.service.mcp.McpTool;
24+
import java.util.Comparator;
1925
import java.util.List;
2026
import java.util.Map;
2127
import javax.validation.Valid;
2228
import lombok.RequiredArgsConstructor;
2329
import lombok.extern.slf4j.Slf4j;
24-
import org.apache.commons.lang3.StringUtils;
2530
import org.springframework.http.ResponseEntity;
2631
import org.springframework.web.bind.annotation.RestController;
2732
import org.springframework.web.server.ServerWebExchange;
@@ -38,6 +43,7 @@ public class SchemasController extends AbstractController implements SchemasApi,
3843
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
3944

4045
private final SchemaRegistryService schemaRegistryService;
46+
private final ClustersProperties clustersProperties;
4147

4248
@Override
4349
protected KafkaCluster getCluster(String clusterName) {
@@ -208,12 +214,16 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
208214
@Valid Integer pageNum,
209215
@Valid Integer perPage,
210216
@Valid String search,
217+
SchemaColumnsToSortDTO orderBy,
218+
SortOrderDTO sortOrder,
211219
ServerWebExchange serverWebExchange) {
212220
var context = AccessContext.builder()
213221
.cluster(clusterName)
214222
.operationName("getSchemas")
215223
.build();
216224

225+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
226+
217227
return schemaRegistryService
218228
.getAllSubjectNames(getCluster(clusterName))
219229
.flatMapIterable(l -> l)
@@ -222,23 +232,72 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
222232
.flatMap(subjects -> {
223233
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
224234
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
225-
List<String> filteredSubjects = subjects
226-
.stream()
227-
.filter(subj -> search == null || CI.contains(subj, search))
228-
.sorted().toList();
235+
236+
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
237+
List<String> filteredSubjects = filter.find(search);
238+
229239
var totalPages = (filteredSubjects.size() / pageSize)
230240
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
231-
List<String> subjectsToRender = filteredSubjects.stream()
232-
.skip(subjectToSkip)
233-
.limit(pageSize)
234-
.toList();
235-
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
236-
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
241+
242+
List<String> subjectsToRetrieve;
243+
boolean paginate = true;
244+
var schemaComparator = getComparatorForSchema(orderBy);
245+
final Comparator<SubjectWithCompatibilityLevel> comparator =
246+
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
247+
? schemaComparator : schemaComparator.reversed();
248+
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
249+
if (SortOrderDTO.DESC.equals(sortOrder)) {
250+
filteredSubjects.sort(Comparator.reverseOrder());
251+
}
252+
subjectsToRetrieve = filteredSubjects.stream()
253+
.skip(subjectToSkip)
254+
.limit(pageSize)
255+
.toList();
256+
paginate = false;
257+
} else {
258+
subjectsToRetrieve = filteredSubjects;
259+
}
260+
261+
final boolean shouldPaginate = paginate;
262+
263+
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRetrieve, pageSize)
264+
.map(subjs ->
265+
paginateSchemas(subjs, comparator, shouldPaginate, pageSize, subjectToSkip)
266+
).map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
237267
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
238268
}).map(ResponseEntity::ok)
239269
.doOnEach(sig -> audit(context, sig));
240270
}
241271

272+
private List<SubjectWithCompatibilityLevel> paginateSchemas(
273+
List<SubjectWithCompatibilityLevel> subjects,
274+
Comparator<SubjectWithCompatibilityLevel> comparator,
275+
boolean paginate,
276+
int pageSize,
277+
int subjectToSkip) {
278+
subjects.sort(comparator);
279+
if (paginate) {
280+
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
281+
} else {
282+
return subjects;
283+
}
284+
}
285+
286+
private Comparator<SubjectWithCompatibilityLevel> getComparatorForSchema(
287+
@Valid SchemaColumnsToSortDTO orderBy) {
288+
var defaultComparator = Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
289+
if (orderBy == null) {
290+
return defaultComparator;
291+
}
292+
return switch (orderBy) {
293+
case SUBJECT -> Comparator.comparing(SubjectWithCompatibilityLevel::getSubject);
294+
case ID -> Comparator.comparing(SubjectWithCompatibilityLevel::getId);
295+
case TYPE -> Comparator.comparing(SubjectWithCompatibilityLevel::getSchemaType);
296+
case COMPATIBILITY -> Comparator.comparing(SubjectWithCompatibilityLevel::getCompatibility);
297+
default -> defaultComparator;
298+
};
299+
}
300+
242301
@Override
243302
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
244303
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
import static io.kafbat.ui.model.rbac.permission.TopicAction.EDIT;
88
import static io.kafbat.ui.model.rbac.permission.TopicAction.VIEW;
99
import static java.util.stream.Collectors.toList;
10-
import static org.apache.commons.lang3.Strings.CI;
1110

1211
import io.kafbat.ui.api.TopicsApi;
12+
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.mapper.ClusterMapper;
1414
import io.kafbat.ui.model.InternalTopic;
1515
import io.kafbat.ui.model.InternalTopicConfig;
@@ -37,7 +37,6 @@
3737
import javax.validation.Valid;
3838
import lombok.RequiredArgsConstructor;
3939
import lombok.extern.slf4j.Slf4j;
40-
import org.apache.commons.lang3.StringUtils;
4140
import org.springframework.http.HttpStatus;
4241
import org.springframework.http.ResponseEntity;
4342
import org.springframework.web.bind.annotation.RestController;
@@ -55,6 +54,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5554
private final TopicsService topicsService;
5655
private final TopicAnalysisService topicAnalysisService;
5756
private final ClusterMapper clusterMapper;
57+
private final ClustersProperties clustersProperties;
5858

5959
@Override
6060
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -181,23 +181,23 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
181181
.operationName("getTopics")
182182
.build();
183183

184-
return topicsService.getTopicsForPagination(getCluster(clusterName))
184+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185185
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186186
.flatMap(topics -> {
187187
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188188
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
189+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
190+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
189191
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
190-
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
191-
List<InternalTopic> filtered = topics.stream()
192-
.filter(topic -> !topic.isInternal()
193-
|| showInternal != null && showInternal)
194-
.filter(topic -> search == null || CI.contains(topic.getName(), search))
195-
.sorted(comparator)
196-
.toList();
192+
? comparatorForTopic : comparatorForTopic.reversed();
193+
194+
List<InternalTopic> filtered = topics.stream().sorted(comparator).toList();
195+
197196
var totalPages = (filtered.size() / pageSize)
198197
+ (filtered.size() % pageSize == 0 ? 0 : 1);
199198

200199
List<String> topicsPage = filtered.stream()
200+
.filter(t -> !t.isInternal() || showInternal != null && showInternal)
201201
.skip(topicsToSkip)
202202
.limit(pageSize)
203203
.map(InternalTopic::getName)
@@ -348,16 +348,23 @@ public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates
348348
}
349349

350350
private Comparator<InternalTopic> getComparatorForTopic(
351-
TopicColumnsToSortDTO orderBy) {
351+
TopicColumnsToSortDTO orderBy,
352+
boolean ftsEnabled) {
352353
var defaultComparator = Comparator.comparing(InternalTopic::getName);
353-
if (orderBy == null) {
354+
if (orderBy == null && ftsEnabled) {
355+
return (o1, o2) -> 0;
356+
} else if (orderBy == null) {
354357
return defaultComparator;
355358
}
356359
return switch (orderBy) {
357360
case TOTAL_PARTITIONS -> Comparator.comparing(InternalTopic::getPartitionCount);
358361
case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
359362
case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor);
360363
case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize);
364+
case MESSAGES_COUNT -> Comparator.comparing(
365+
InternalTopic::getMessagesCount,
366+
Comparator.nullsLast(Long::compareTo)
367+
);
361368
default -> defaultComparator;
362369
};
363370
}

api/src/main/java/io/kafbat/ui/mapper/ClusterMapper.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.kafka.common.resource.ResourceType;
5252
import org.mapstruct.Mapper;
5353
import org.mapstruct.Mapping;
54+
import org.openapitools.jackson.nullable.JsonNullable;
5455

5556
@Mapper(componentModel = "spring")
5657
public interface ClusterMapper {
@@ -104,6 +105,14 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
104105

105106
TopicDTO toTopic(InternalTopic topic);
106107

108+
default <T> JsonNullable<T> toJsonNullable(T value) {
109+
if (value == null) {
110+
return JsonNullable.undefined();
111+
} else {
112+
return JsonNullable.of(value);
113+
}
114+
}
115+
107116
PartitionDTO toPartition(InternalPartition topic);
108117

109118
BrokerDTO toBrokerDto(InternalBroker broker);

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import lombok.Data;
1111
import org.apache.kafka.clients.admin.ConfigEntry;
1212
import org.apache.kafka.clients.admin.TopicDescription;
13-
import org.apache.kafka.common.TopicPartition;
1413

1514
@Data
1615
@Builder(toBuilder = true)
@@ -38,6 +37,16 @@ public class InternalTopic {
3837
private final long segmentSize;
3938
private final long segmentCount;
4039

40+
41+
public InternalTopic withMetrics(Metrics metrics) {
42+
var builder = toBuilder();
43+
if (metrics != null) {
44+
builder.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(this.name));
45+
builder.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(this.name));
46+
}
47+
return builder.build();
48+
}
49+
4150
public static InternalTopic from(TopicDescription topicDescription,
4251
List<ConfigEntry> configs,
4352
InternalPartitionsOffsets partitionsOffsets,
@@ -113,8 +122,10 @@ public static InternalTopic from(TopicDescription topicDescription,
113122
topic.segmentSize(stats.getSegmentSize());
114123
});
115124

116-
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
117-
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
125+
if (metrics != null) {
126+
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
127+
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
128+
}
118129

119130
topic.topicConfigs(
120131
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
@@ -131,4 +142,16 @@ public static InternalTopic from(TopicDescription topicDescription,
131142
return topic.build();
132143
}
133144

145+
public @Nullable Long getMessagesCount() {
146+
Long result = null;
147+
if (cleanUpPolicy.equals(CleanupPolicy.DELETE)) {
148+
result = 0L;
149+
if (partitions != null && !partitions.isEmpty()) {
150+
for (InternalPartition partition : partitions.values()) {
151+
result += (partition.getOffsetMax() - partition.getOffsetMin());
152+
}
153+
}
154+
}
155+
return result;
156+
}
134157
}

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
@Value
1313
@Builder(toBuilder = true)
14-
public class Statistics {
14+
public class Statistics implements AutoCloseable {
1515
ServerStatusDTO status;
1616
Throwable lastKafkaException;
1717
String version;
@@ -46,4 +46,11 @@ public Stream<TopicDescription> topicDescriptions() {
4646
public Statistics withClusterState(UnaryOperator<ScrapedClusterState> stateUpdate) {
4747
return toBuilder().clusterState(stateUpdate.apply(clusterState)).build();
4848
}
49+
50+
@Override
51+
public void close() throws Exception {
52+
if (clusterState != null) {
53+
clusterState.close();
54+
}
55+
}
4956
}

0 commit comments

Comments
 (0)