Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Dec 31, 2023
1 parent 6912e0a commit 63fc1b3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cdc/cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package model

// go:generate msgp
//go:generate msgp

// MqMessageType is the type of message
type MqMessageType int
Expand Down
59 changes: 59 additions & 0 deletions cdc/cdc/model/sink_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cdc/cdc/model/sink_gen_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 29 additions & 31 deletions cdc/cdc/sink/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package codec

import (
"bytes"
"compress/zlib"
"testing"

"github.com/pingcap/check"
Expand Down Expand Up @@ -97,26 +95,26 @@ var _ = check.Suite(&codecTestSuite{})

type codecTestSuite struct{}

func (s *codecTestSuite) checkCompressedSize(messages []*MQMessage) (int, int) {
var buff bytes.Buffer
writer := zlib.NewWriter(&buff)
originalSize := 0
for _, message := range messages {
originalSize += len(message.Key) + len(message.Value)
if len(message.Key) > 0 {
_, _ = writer.Write(message.Key)
}
_, _ = writer.Write(message.Value)
}
writer.Close()
return originalSize, buff.Len()
}
// func (s *codecTestSuite) checkCompressedSize(messages []*MQMessage) (int, int) {
// var buff bytes.Buffer
// writer := zlib.NewWriter(&buff)
// originalSize := 0
// for _, message := range messages {
// originalSize += len(message.Key) + len(message.Value)
// if len(message.Key) > 0 {
// _, _ = writer.Write(message.Key)
// }
// _, _ = writer.Write(message.Value)
// }
// writer.Close()
// return originalSize, buff.Len()
// }

func (s *codecTestSuite) encodeKvCase(c *check.C, encoder EventBatchEncoder, events []*model.RawKVEntry) []*MQMessage {
msg, err := codecEncodeKvCase(encoder, events)
c.Assert(err, check.IsNil)
return msg
}
// func (s *codecTestSuite) encodeKvCase(c *check.C, encoder EventBatchEncoder, events []*model.RawKVEntry) []*MQMessage {
// msg, err := codecEncodeKvCase(encoder, events)
// c.Assert(err, check.IsNil)
// return msg
// }

// func (s *codecTestSuite) TestJsonVsCraftVsPB(c *check.C) {
// defer testleak.AfterTest(c)()
Expand Down Expand Up @@ -335,17 +333,17 @@ func BenchmarkJsonEncoding(b *testing.B) {
func BenchmarkJsonDecoding(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, message := range codecJSONEncodedKvChanges {
if decoder, err := NewJSONEventBatchDecoder(message.Key, message.Value); err != nil {
decoder, err := NewJSONEventBatchDecoder(message.Key, message.Value)
if err != nil {
panic(err)
} else {
for {
if _, hasNext, err := decoder.HasNext(); err != nil {
panic(err)
} else if hasNext {
_, _ = decoder.NextChangedEvent()
} else {
break
}
}
for {
if _, hasNext, err := decoder.HasNext(); err != nil {
panic(err)
} else if hasNext {
_, _ = decoder.NextChangedEvent()
} else {
break
}
}
}
Expand Down

0 comments on commit 63fc1b3

Please sign in to comment.