Skip to content

Commit

Permalink
fix: fix db keyword
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo committed Oct 13, 2023
1 parent ef4c873 commit 4e7d68a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class AggOffsetV1DO {
private Long id;
private Date gmtCreate;
private Date gmtModified;
@TableField("`partition`")
private String partition;
@TableField("partition_name")
private String partitionName;
private String consumerGroup;
private long version;
private byte[] data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class AggStateV1DO {
private Long id;
private Date gmtCreate;
private Date gmtModified;
@TableField("`partition`")
private String partition;
@TableField("partition_name")
private String partitionName;
private String consumerGroup;
private byte[] state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void saveState(TopicPartition partition, byte[] state) throws Exception {
d = new AggStateV1DO();
d.setGmtCreate(now);
d.setGmtModified(now);
d.setPartition(partition.toString());
d.setPartitionName(partition.toString());
d.setConsumerGroup(aggProperties.getConsumerGroupId());
d.setState(state);
aggStateV1DOMapper.insert(d);
Expand All @@ -86,7 +86,7 @@ public void saveOffset(TopicPartition partition, OffsetInfo oi) throws Exception
AggOffsetV1DO d = new AggOffsetV1DO();
d.setGmtCreate(now);
d.setGmtModified(now);
d.setPartition(partition.toString());
d.setPartitionName(partition.toString());
d.setConsumerGroup(aggProperties.getConsumerGroupId());
d.setVersion(now.getTime());
d.setData(data);
Expand All @@ -96,7 +96,7 @@ public void saveOffset(TopicPartition partition, OffsetInfo oi) throws Exception
Long count = aggOffsetV1DOMapper.selectCount(wrapper);
if (count >= OFFSET_LIMIT2) {
UpdateWrapper<AggOffsetV1DO> wrapper2 = new UpdateWrapper<>();
wrapper2.eq("`partition`", partition.toString());
wrapper2.eq("partition_name", partition.toString());
wrapper2.eq("consumer_group", aggProperties.getConsumerGroupId());
wrapper2.orderByAsc("id");
wrapper2.last("limit " + (count.intValue() - OFFSET_LIMIT));
Expand Down Expand Up @@ -133,7 +133,7 @@ private AggStateV1DO selectState(TopicPartition partition) throws SQLException {

private <T> QueryWrapper<T> partitionWhere(TopicPartition partition) {
QueryWrapper<T> w = new QueryWrapper<>();
w.eq("`partition`", partition.toString());
w.eq("partition_name", partition.toString());
w.eq("consumer_group", aggProperties.getConsumerGroupId());
return w;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
CREATE TABLE IF NOT EXISTS `agg_offset_v1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`gmt_create` datetime NOT NULL,
`gmt_modified` datetime NOT NULL,
`partition` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`consumer_group` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`version` bigint NOT NULL,
`data` blob,
PRIMARY KEY (`id`),
KEY `k_partition_version` (`partition`,`consumer_group`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT comment '主键',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间',
`partition_name` varchar(255) NOT NULL comment '分区',
`consumer_group` varchar(255) NOT NULL comment '消费者组',
`version` bigint(20) NOT NULL comment '版本号',
`data` blob DEFAULT NULL comment '位点信息',
PRIMARY KEY(`id`),
KEY `k_partition_name_consumer_group`(`partition_name`, `consumer_group`) LOCAL
) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = '消费位点表v1';

CREATE TABLE IF NOT EXISTS `agg_task_v1` (
`id` bigint NOT NULL AUTO_INCREMENT,
Expand All @@ -22,13 +22,12 @@ CREATE TABLE IF NOT EXISTS `agg_task_v1` (
UNIQUE KEY `uk_agg_id_version` (`agg_id`,`version`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

CREATE TABLE IF NOT EXISTS `agg_state_v1` (
`id` bigint NOT NULL AUTO_INCREMENT,
`gmt_create` datetime NOT NULL,
`gmt_modified` datetime NOT NULL,
`partition` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`consumer_group` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
`state` longblob,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_partition` (`partition`, `consumer_group`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE `agg_state_v1` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT comment '主键',
`gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment '创建时间',
`gmt_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment '修改时间',
`partition_name` varchar(1) NOT NULL comment '分区',
`consumer_group` varchar(1) NOT NULL comment '消费者组',
`state` longblob NOT NULL comment '状态',
PRIMARY KEY(`id`)
) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = '聚合状态存储表v1';

0 comments on commit 4e7d68a

Please sign in to comment.