Skip to content

Commit d3aee73

Browse files
authored
codec(ticdc): canal-json message do not end by terminator if using kafka sink (#11708)
close #11707
1 parent d5e798d commit d3aee73

File tree

2 files changed

+5
-1
lines changed

2 files changed

+5
-1
lines changed

cdc/sink/codec/canal/canal_json_encoder.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,8 @@ func (c *JSONBatchEncoder) AppendRowChangedEvent(
349349
if err != nil {
350350
return errors.Trace(err)
351351
}
352-
if len(c.config.Terminator) > 0 {
352+
353+
if c.config.IsStorageScheme && len(c.config.Terminator) > 0 {
353354
value = append(value, c.config.Terminator...)
354355
}
355356

cdc/sink/codec/common/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pingcap/log"
2222
"github.com/pingcap/tiflow/pkg/config"
2323
cerror "github.com/pingcap/tiflow/pkg/errors"
24+
"github.com/pingcap/tiflow/pkg/sink"
2425
"go.uber.org/zap"
2526
)
2627

@@ -47,6 +48,7 @@ type Config struct {
4748
AvroBigintUnsignedHandlingMode string
4849

4950
// for sinking to cloud storage
51+
IsStorageScheme bool
5052
Delimiter string
5153
Quote string
5254
NullString string
@@ -162,6 +164,7 @@ func (c *Config) Apply(sinkURI *url.URL, config *config.ReplicaConfig) error {
162164

163165
}
164166

167+
c.IsStorageScheme = sink.IsStorageScheme(sinkURI.Scheme)
165168
c.DeleteOnlyHandleKeyColumns = !config.EnableOldValue
166169
return nil
167170
}

0 commit comments

Comments
 (0)