Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Jan 3, 2024
1 parent 1fd47e7 commit 633ec35
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 33 deletions.
4 changes: 2 additions & 2 deletions cdc/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ cmd/cdc/cdc
tiflash-config-preprocessed.toml

# Files generated when running docker-compose
deployments/ticdc/docker-compose/data
deployments/ticdc/docker-compose/logs
deployments/tikv-cdc/docker-compose/data
deployments/tikv-cdc/docker-compose/logs

# Binary file when running intergration test
integration/integration
Expand Down
3 changes: 3 additions & 0 deletions cdc/cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ type MqMessageType int
const (
// MqMessageTypeUnknown is unknown type of message key
MqMessageTypeUnknown MqMessageType = 0

// MqMessageTypeRow is row type of message key
// MqMessageTypeRow MqMessageType = 1

// MqMessageTypeDDL is ddl type of message key
// MqMessageTypeDDL MqMessageType = 2

// MqMessageTypeResolved is resolved type of message key
MqMessageTypeResolved MqMessageType = 3
// MqMessageTypeKv is RawKV entry type of message key
Expand Down
10 changes: 5 additions & 5 deletions cdc/cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type EventBatchEncoder interface {
// EncodeCheckpointEvent appends a checkpoint event into the batch.
// This event will be broadcast to all partitions to signal a global checkpoint.
EncodeCheckpointEvent(ts uint64) (*MQMessage, error)
// AppendChangedEvent appends a row changed event into the batch
// AppendChangedEvent appends a changed event into the batch
AppendChangedEvent(e *model.RawKVEntry) (EncoderResult, error)
// AppendResolvedEvent appends a resolved event into the batch.
// This event is used to tell the encoder that no event prior to ts will be sent.
Expand Down Expand Up @@ -60,7 +60,7 @@ type MQMessage struct {
Ts uint64 // reserved for possible output sorting
Type model.MqMessageType // type
Protocol config.Protocol // protocol
entriesCount int // rows in one MQ Message
entriesCount int // entries in one MQ Message
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
Expand All @@ -80,17 +80,17 @@ func (m *MQMessage) PhysicalTime() time.Time {
return oracle.GetTimeFromTS(m.Ts)
}

// GetEntriesCount returns the number of rows batched in one MQMessage
// GetEntriesCount returns the number of entries batched in one MQMessage
func (m *MQMessage) GetEntriesCount() int {
return m.entriesCount
}

// SetEntriesCount set the number of rows
// SetEntriesCount set the number of entries
func (m *MQMessage) SetEntriesCount(cnt int) {
m.entriesCount = cnt
}

// IncEntriesCount increase the number of rows
// IncEntriesCount increase the number of entries
func (m *MQMessage) IncEntriesCount() {
m.entriesCount++
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/cdc/sink/codec/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func (s *codecInterfaceSuite) TestCreate(c *check.C) {
c.Assert(msg.Type, check.Equals, model.MqMessageTypeKv)
c.Assert(msg.Protocol, check.Equals, config.ProtocolOpen)

msg = newResolvedMQMessage(config.ProtocolOpen, []byte("key1"), nil, 1234)
c.Assert(msg.Key, check.BytesEquals, []byte("key1"))
msg = newResolvedMQMessage(config.ProtocolOpen, nil, nil, 1234)
c.Assert(msg.Key, check.IsNil)
c.Assert(msg.Value, check.IsNil)
c.Assert(msg.Ts, check.Equals, uint64(1234))
c.Assert(msg.Type, check.Equals, model.MqMessageTypeResolved)
Expand Down
34 changes: 17 additions & 17 deletions cdc/cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ func kvEventToMqMessage(e *model.RawKVEntry) (*mqMessageKey, *mqMessageValue) {
}

func mqMessageToKvEvent(key *mqMessageKey, value *mqMessageValue) *model.RawKVEntry {
e := new(model.RawKVEntry)
e.CRTs = key.CRTs
e.Key = key.Key
e.OpType = value.OpType
e.Value = value.Value
e.ExpiredTs = decodeExpiredTs(value.ExpiredTs)
return e
return &model.RawKVEntry{
OpType: value.OpType,
Key: key.Key,
Value: value.Value,
CRTs: key.CRTs,
ExpiredTs: decodeExpiredTs(value.ExpiredTs),
}
}

// JSONEventBatchEncoder encodes the events into the byte of a batch into.
Expand Down Expand Up @@ -413,7 +413,7 @@ func (b *JSONEventBatchMixedDecoder) NextResolvedEvent() (uint64, error) {
return resolvedTs, nil
}

// NextRowChangedEvent implements the EventBatchDecoder interface
// NextChangedEvent implements the EventBatchDecoder interface
func (b *JSONEventBatchMixedDecoder) NextChangedEvent() (*model.RawKVEntry, error) {
if b.nextKey == nil {
if err := b.decodeNextKey(); err != nil {
Expand All @@ -422,18 +422,18 @@ func (b *JSONEventBatchMixedDecoder) NextChangedEvent() (*model.RawKVEntry, erro
}
b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:]
if b.nextKey.Type != model.MqMessageTypeKv {
return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found row event message")
return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found kv event message")
}
valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8])
value := b.mixedBytes[8 : valueLen+8]
b.mixedBytes = b.mixedBytes[valueLen+8:]
rowMsg := new(mqMessageValue)
if err := rowMsg.Decode(value); err != nil {
kvMsg := new(mqMessageValue)
if err := kvMsg.Decode(value); err != nil {
return nil, errors.Trace(err)
}
rowEvent := mqMessageToKvEvent(b.nextKey, rowMsg)
kvEvent := mqMessageToKvEvent(b.nextKey, kvMsg)
b.nextKey = nil
return rowEvent, nil
return kvEvent, nil
}

func (b *JSONEventBatchMixedDecoder) hasNext() bool {
Expand Down Expand Up @@ -505,13 +505,13 @@ func (b *JSONEventBatchDecoder) NextChangedEvent() (*model.RawKVEntry, error) {
valueLen := binary.BigEndian.Uint64(b.valueBytes[:8])
value := b.valueBytes[8 : valueLen+8]
b.valueBytes = b.valueBytes[valueLen+8:]
rowMsg := new(mqMessageValue)
if err := rowMsg.Decode(value); err != nil {
mvMsg := new(mqMessageValue)
if err := mvMsg.Decode(value); err != nil {
return nil, errors.Trace(err)
}
rowEvent := mqMessageToKvEvent(b.nextKey, rowMsg)
kvEvent := mqMessageToKvEvent(b.nextKey, mvMsg)
b.nextKey = nil
return rowEvent, nil
return kvEvent, nil
}

func (b *JSONEventBatchDecoder) hasNext() bool {
Expand Down
10 changes: 5 additions & 5 deletions cdc/cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
if config.AutoCreate {
log.Warn("topic already exist, TiCDC will not create the topic",
log.Warn("topic already exist, TiKV CDC will not create the topic",
zap.String("topic", topic), zap.Any("detail", info))
}

Expand All @@ -388,13 +388,13 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic

brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
log.Warn("TiKV CDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// TiKV CDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
Expand All @@ -421,7 +421,7 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

log.Info("TiCDC create the topic",
log.Info("TiKV CDC create the topic",
zap.Int32("partition-num", config.PartitionNum),
zap.Int16("replication-factor", config.ReplicationFactor))

Expand All @@ -446,7 +446,7 @@ func getBrokerMessageMaxBytes(admin kafka.ClusterAdminClient) (int, error) {
if len(configEntries) == 0 || configEntries[0].Name != kafka.BrokerMessageMaxBytesConfigName {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
"tikv cdc decline to create the topic and changefeed to prevent potential error")
}

result, err := strconv.Atoi(configEntries[0].Value)
Expand Down
2 changes: 1 addition & 1 deletion cdc/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ error = '''
json codec invalid data
'''

["CDC:ErrJSONCodecRowTooLarge"]
["CDC:ErrJSONCodecKvTooLarge"]
error = '''
json codec single key-value too large
'''
Expand Down
2 changes: 1 addition & 1 deletion cdc/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var (
ErrMaxwellDecodeFailed = errors.Normalize("maxwell decode failed", errors.RFCCodeText("CDC:ErrMaxwellDecodeFailed"))
ErrMaxwellInvalidData = errors.Normalize("maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData"))
ErrJSONCodecInvalidData = errors.Normalize("json codec invalid data", errors.RFCCodeText("CDC:ErrJSONCodecInvalidData"))
ErrJSONCodecKvTooLarge = errors.Normalize("json codec single key-value too large", errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge"))
ErrJSONCodecKvTooLarge = errors.Normalize("json codec single key-value too large", errors.RFCCodeText("CDC:ErrJSONCodecKvTooLarge"))
ErrCanalDecodeFailed = errors.Normalize("canal decode failed", errors.RFCCodeText("CDC:ErrCanalDecodeFailed"))
ErrCanalEncodeFailed = errors.Normalize("canal encode failed", errors.RFCCodeText("CDC:ErrCanalEncodeFailed"))
ErrOldValueNotEnabled = errors.Normalize("old value is not enabled", errors.RFCCodeText("CDC:ErrOldValueNotEnabled"))
Expand Down

0 comments on commit 633ec35

Please sign in to comment.