diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggOffsetV1DO.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggOffsetV1DO.java index c2c47472f..fc8f4f6eb 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggOffsetV1DO.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggOffsetV1DO.java @@ -25,8 +25,8 @@ public class AggOffsetV1DO { private Long id; private Date gmtCreate; private Date gmtModified; - @TableField("`partition`") - private String partition; + @TableField("partition_name") + private String partitionName; private String consumerGroup; private long version; private byte[] data; diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggStateV1DO.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggStateV1DO.java index 7b681d5ec..a851682e0 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggStateV1DO.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/mapper/AggStateV1DO.java @@ -25,8 +25,8 @@ public class AggStateV1DO { private Long id; private Date gmtCreate; private Date gmtModified; - @TableField("`partition`") - private String partition; + @TableField("partition_name") + private String partitionName; private String consumerGroup; private byte[] state; } diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/state/JdbcPartitionStateStore.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/state/JdbcPartitionStateStore.java index 053e5a690..720140956 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/state/JdbcPartitionStateStore.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/state/JdbcPartitionStateStore.java @@ -65,7 +65,7 @@ public void saveState(TopicPartition partition, byte[] state) throws Exception { d = new AggStateV1DO(); d.setGmtCreate(now); d.setGmtModified(now); - d.setPartition(partition.toString()); + d.setPartitionName(partition.toString()); d.setConsumerGroup(aggProperties.getConsumerGroupId()); d.setState(state); aggStateV1DOMapper.insert(d); @@ -86,7 +86,7 @@ public void saveOffset(TopicPartition partition, OffsetInfo oi) throws Exception AggOffsetV1DO d = new AggOffsetV1DO(); d.setGmtCreate(now); d.setGmtModified(now); - d.setPartition(partition.toString()); + d.setPartitionName(partition.toString()); d.setConsumerGroup(aggProperties.getConsumerGroupId()); d.setVersion(now.getTime()); d.setData(data); @@ -96,7 +96,7 @@ public void saveOffset(TopicPartition partition, OffsetInfo oi) throws Exception Long count = aggOffsetV1DOMapper.selectCount(wrapper); if (count >= OFFSET_LIMIT2) { UpdateWrapper wrapper2 = new UpdateWrapper<>(); - wrapper2.eq("`partition`", partition.toString()); + wrapper2.eq("partition_name", partition.toString()); wrapper2.eq("consumer_group", aggProperties.getConsumerGroupId()); wrapper2.orderByAsc("id"); wrapper2.last("limit " + (count.intValue() - OFFSET_LIMIT)); @@ -133,7 +133,7 @@ private AggStateV1DO selectState(TopicPartition partition) throws SQLException { private QueryWrapper partitionWhere(TopicPartition partition) { QueryWrapper w = new QueryWrapper<>(); - w.eq("`partition`", partition.toString()); + w.eq("partition_name", partition.toString()); w.eq("consumer_group", aggProperties.getConsumerGroupId()); return w; } diff --git a/server/extension/extension-common-flyway/src/main/resources/db/migration/V22__231011_CREATE_agg_TABLE.sql b/server/extension/extension-common-flyway/src/main/resources/db/migration/V22__231011_CREATE_agg_TABLE.sql index a7f225a0d..bef70b583 100644 --- a/server/extension/extension-common-flyway/src/main/resources/db/migration/V22__231011_CREATE_agg_TABLE.sql +++ b/server/extension/extension-common-flyway/src/main/resources/db/migration/V22__231011_CREATE_agg_TABLE.sql @@ -1,14 +1,14 @@ CREATE TABLE IF NOT EXISTS `agg_offset_v1` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `gmt_create` datetime NOT NULL, - `gmt_modified` datetime NOT NULL, - `partition` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - `consumer_group` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - `version` bigint NOT NULL, - `data` blob, - PRIMARY KEY (`id`), - KEY `k_partition_version` (`partition`,`consumer_group`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT comment '主键', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '创建时间', + `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间', + `partition_name` varchar(255) NOT NULL comment '分区', + `consumer_group` varchar(255) NOT NULL comment '消费者组', + `version` bigint(20) NOT NULL comment '版本号', + `data` blob DEFAULT NULL comment '位点信息', + PRIMARY KEY(`id`), + KEY `k_partition_name_consumer_group`(`partition_name`, `consumer_group`) LOCAL +) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = '消费位点表v1' CREATE TABLE IF NOT EXISTS `agg_task_v1` ( `id` bigint NOT NULL AUTO_INCREMENT, @@ -22,13 +22,14 @@ CREATE TABLE IF NOT EXISTS `agg_task_v1` ( UNIQUE KEY `uk_agg_id_version` (`agg_id`,`version`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; -CREATE TABLE IF NOT EXISTS `agg_state_v1` ( - `id` bigint NOT NULL AUTO_INCREMENT, - `gmt_create` datetime NOT NULL, - `gmt_modified` datetime NOT NULL, - `partition` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - `consumer_group` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, - `state` longblob, - PRIMARY KEY (`id`), - UNIQUE KEY `uk_partition` (`partition`, `consumer_group`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +CREATE TABLE IF NOT EXISTS `agg_offset_v1` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT comment '主键', + `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '创建时间', + `gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间', + `partition_name` varchar(255) NOT NULL comment '分区', + `consumer_group` varchar(255) NOT NULL comment '消费者组', + `version` bigint(20) NOT NULL comment '版本号', + `data` blob DEFAULT NULL comment '位点信息', + PRIMARY KEY(`id`), + KEY `k_partition_name_consumer_group`(`partition_name`, `consumer_group`) LOCAL +) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = '消费位点表v1'