Skip to content

Commit

Permalink
feat: add server time to pq message
Browse files Browse the repository at this point in the history
  • Loading branch information
3n0ugh committed Jun 28, 2024
1 parent 008b124 commit 198501d
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 15 deletions.
8 changes: 6 additions & 2 deletions pq/message/format/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package format

import (
"encoding/binary"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/tuple"
"github.com/go-playground/errors"
)

type Delete struct {
MessageTime time.Time
OldTupleData *tuple.Data
OldDecoded map[string]any
TableNamespace string
Expand All @@ -17,8 +19,10 @@ type Delete struct {
OldTupleType uint8
}

func NewDelete(data []byte, streamedTransaction bool, relation map[uint32]*Relation) (*Delete, error) {
msg := &Delete{}
func NewDelete(data []byte, streamedTransaction bool, relation map[uint32]*Relation, serverTime time.Time) (*Delete, error) {
msg := &Delete{
MessageTime: serverTime,
}
if err := msg.decode(data, streamedTransaction); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pq/message/format/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package format

import (
"testing"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/tuple"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -35,7 +36,8 @@ func TestDelete_New(t *testing.T) {
},
}

msg, err := NewDelete(data, false, rel)
now := time.Now()
msg, err := NewDelete(data, false, rel, now)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -66,6 +68,7 @@ func TestDelete_New(t *testing.T) {
},
TableNamespace: "public",
TableName: "t",
MessageTime: now,
}

assert.Equal(t, expected, msg)
Expand Down
8 changes: 6 additions & 2 deletions pq/message/format/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package format

import (
"encoding/binary"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/tuple"
"github.com/go-playground/errors"
Expand All @@ -12,6 +13,7 @@ const (
)

type Insert struct {
MessageTime time.Time
TupleData *tuple.Data
Decoded map[string]any
TableNamespace string
Expand All @@ -20,8 +22,10 @@ type Insert struct {
XID uint32
}

func NewInsert(data []byte, streamedTransaction bool, relation map[uint32]*Relation) (*Insert, error) {
msg := &Insert{}
func NewInsert(data []byte, streamedTransaction bool, relation map[uint32]*Relation, serverTime time.Time) (*Insert, error) {
msg := &Insert{
MessageTime: serverTime,
}
if err := msg.decode(data, streamedTransaction); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pq/message/format/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package format

import (
"testing"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/tuple"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -35,7 +36,8 @@ func TestInsert_New(t *testing.T) {
},
}

msg, err := NewInsert(data, false, rel)
now := time.Now()
msg, err := NewInsert(data, false, rel, now)
if err != nil {
t.Fatal(err)
}
Expand All @@ -54,6 +56,7 @@ func TestInsert_New(t *testing.T) {
Decoded: map[string]any{"id": int32(605), "name": "foo"},
TableNamespace: "public",
TableName: "t",
MessageTime: now,
}

assert.EqualValues(t, expected, msg)
Expand Down
8 changes: 6 additions & 2 deletions pq/message/format/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package format

import (
"encoding/binary"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/tuple"
"github.com/go-playground/errors"
Expand All @@ -14,6 +15,7 @@ const (
)

type Update struct {
MessageTime time.Time
NewTupleData *tuple.Data
NewDecoded map[string]any
OldTupleData *tuple.Data
Expand All @@ -25,8 +27,10 @@ type Update struct {
OldTupleType uint8
}

func NewUpdate(data []byte, streamedTransaction bool, relation map[uint32]*Relation) (*Update, error) {
msg := &Update{}
func NewUpdate(data []byte, streamedTransaction bool, relation map[uint32]*Relation, serverTime time.Time) (*Update, error) {
msg := &Update{
MessageTime: serverTime,
}
if err := msg.decode(data, streamedTransaction); err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pq/message/format/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package format

import (
"testing"
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/tuple"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -35,7 +36,8 @@ func TestUpdate_New(t *testing.T) {
},
}

msg, err := NewUpdate(data, false, rel)
now := time.Now()
msg, err := NewUpdate(data, false, rel, now)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -86,6 +88,7 @@ func TestUpdate_New(t *testing.T) {
},
TableNamespace: "public",
TableName: "t",
MessageTime: now,
}

assert.Equal(t, expected, msg)
Expand Down
10 changes: 6 additions & 4 deletions pq/message/message.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package message

import (
"time"

"github.com/Trendyol/go-pq-cdc/pq/message/format"
"github.com/go-playground/errors"
)
Expand Down Expand Up @@ -33,14 +35,14 @@ type Type uint8

var streamedTransaction bool

func New(data []byte, relation map[uint32]*format.Relation) (any, error) {
func New(data []byte, serverTime time.Time, relation map[uint32]*format.Relation) (any, error) {
switch Type(data[0]) {
case InsertByte:
return format.NewInsert(data, streamedTransaction, relation)
return format.NewInsert(data, streamedTransaction, relation, serverTime)
case UpdateByte:
return format.NewUpdate(data, streamedTransaction, relation)
return format.NewUpdate(data, streamedTransaction, relation, serverTime)
case DeleteByte:
return format.NewDelete(data, streamedTransaction, relation)
return format.NewDelete(data, streamedTransaction, relation, serverTime)
case StreamStopByte, StreamAbortByte, StreamCommitByte:
streamedTransaction = false
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion pq/replication/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (s *stream) sink(ctx context.Context) {
s.metric.SetCDCLatency(time.Now().UTC().Sub(xld.ServerTime).Nanoseconds())

var decodedMsg any
decodedMsg, err = message.New(xld.WALData, s.relation)
decodedMsg, err = message.New(xld.WALData, xld.ServerTime, s.relation)
if err != nil || decodedMsg == nil {
logger.Debug("wal data message parsing error", "error", err)
continue
Expand Down
3 changes: 2 additions & 1 deletion pq/replication/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package replication

import (
"encoding/binary"
"github.com/go-playground/errors"
"time"

"github.com/go-playground/errors"

"github.com/Trendyol/go-pq-cdc/pq"
)

Expand Down

0 comments on commit 198501d

Please sign in to comment.