diff --git a/pq/message/format/delete.go b/pq/message/format/delete.go index 8d6072a..4abf6ef 100644 --- a/pq/message/format/delete.go +++ b/pq/message/format/delete.go @@ -2,12 +2,14 @@ package format import ( "encoding/binary" + "time" "github.com/Trendyol/go-pq-cdc/pq/message/tuple" "github.com/go-playground/errors" ) type Delete struct { + MessageTime time.Time OldTupleData *tuple.Data OldDecoded map[string]any TableNamespace string @@ -17,8 +19,10 @@ type Delete struct { OldTupleType uint8 } -func NewDelete(data []byte, streamedTransaction bool, relation map[uint32]*Relation) (*Delete, error) { - msg := &Delete{} +func NewDelete(data []byte, streamedTransaction bool, relation map[uint32]*Relation, serverTime time.Time) (*Delete, error) { + msg := &Delete{ + MessageTime: serverTime, + } if err := msg.decode(data, streamedTransaction); err != nil { return nil, err } diff --git a/pq/message/format/delete_test.go b/pq/message/format/delete_test.go index 2c33d8f..007a4e3 100644 --- a/pq/message/format/delete_test.go +++ b/pq/message/format/delete_test.go @@ -2,6 +2,7 @@ package format import ( "testing" + "time" "github.com/Trendyol/go-pq-cdc/pq/message/tuple" "github.com/stretchr/testify/assert" @@ -35,7 +36,8 @@ func TestDelete_New(t *testing.T) { }, } - msg, err := NewDelete(data, false, rel) + now := time.Now() + msg, err := NewDelete(data, false, rel, now) if err != nil { t.Fatal(err) } @@ -66,6 +68,7 @@ func TestDelete_New(t *testing.T) { }, TableNamespace: "public", TableName: "t", + MessageTime: now, } assert.Equal(t, expected, msg) diff --git a/pq/message/format/insert.go b/pq/message/format/insert.go index b823dde..715054a 100644 --- a/pq/message/format/insert.go +++ b/pq/message/format/insert.go @@ -2,6 +2,7 @@ package format import ( "encoding/binary" + "time" "github.com/Trendyol/go-pq-cdc/pq/message/tuple" "github.com/go-playground/errors" @@ -12,6 +13,7 @@ const ( ) type Insert struct { + MessageTime time.Time TupleData *tuple.Data Decoded map[string]any TableNamespace string @@ -20,8 +22,10 @@ type Insert struct { XID uint32 } -func NewInsert(data []byte, streamedTransaction bool, relation map[uint32]*Relation) (*Insert, error) { - msg := &Insert{} +func NewInsert(data []byte, streamedTransaction bool, relation map[uint32]*Relation, serverTime time.Time) (*Insert, error) { + msg := &Insert{ + MessageTime: serverTime, + } if err := msg.decode(data, streamedTransaction); err != nil { return nil, err } diff --git a/pq/message/format/insert_test.go b/pq/message/format/insert_test.go index a93d321..60a34a4 100644 --- a/pq/message/format/insert_test.go +++ b/pq/message/format/insert_test.go @@ -2,6 +2,7 @@ package format import ( "testing" + "time" "github.com/Trendyol/go-pq-cdc/pq/message/tuple" "github.com/stretchr/testify/assert" @@ -35,7 +36,8 @@ func TestInsert_New(t *testing.T) { }, } - msg, err := NewInsert(data, false, rel) + now := time.Now() + msg, err := NewInsert(data, false, rel, now) if err != nil { t.Fatal(err) } @@ -54,6 +56,7 @@ func TestInsert_New(t *testing.T) { Decoded: map[string]any{"id": int32(605), "name": "foo"}, TableNamespace: "public", TableName: "t", + MessageTime: now, } assert.EqualValues(t, expected, msg) diff --git a/pq/message/format/update.go b/pq/message/format/update.go index 6d10cfb..e56e7a9 100644 --- a/pq/message/format/update.go +++ b/pq/message/format/update.go @@ -2,6 +2,7 @@ package format import ( "encoding/binary" + "time" "github.com/Trendyol/go-pq-cdc/pq/message/tuple" "github.com/go-playground/errors" @@ -14,6 +15,7 @@ const ( ) type Update struct { + MessageTime time.Time NewTupleData *tuple.Data NewDecoded map[string]any OldTupleData *tuple.Data @@ -25,8 +27,10 @@ type Update struct { OldTupleType uint8 } -func NewUpdate(data []byte, streamedTransaction bool, relation map[uint32]*Relation) (*Update, error) { - msg := &Update{} +func NewUpdate(data []byte, streamedTransaction bool, relation map[uint32]*Relation, serverTime time.Time) (*Update, error) { + msg := &Update{ + MessageTime: serverTime, + } if err := msg.decode(data, streamedTransaction); err != nil { return nil, err } diff --git a/pq/message/format/update_test.go b/pq/message/format/update_test.go index 66d25e8..e1aee05 100644 --- a/pq/message/format/update_test.go +++ b/pq/message/format/update_test.go @@ -2,6 +2,7 @@ package format import ( "testing" + "time" "github.com/Trendyol/go-pq-cdc/pq/message/tuple" "github.com/stretchr/testify/assert" @@ -35,7 +36,8 @@ func TestUpdate_New(t *testing.T) { }, } - msg, err := NewUpdate(data, false, rel) + now := time.Now() + msg, err := NewUpdate(data, false, rel, now) if err != nil { t.Fatal(err) } @@ -86,6 +88,7 @@ func TestUpdate_New(t *testing.T) { }, TableNamespace: "public", TableName: "t", + MessageTime: now, } assert.Equal(t, expected, msg) diff --git a/pq/message/message.go b/pq/message/message.go index 5cf6e54..ed68a8a 100644 --- a/pq/message/message.go +++ b/pq/message/message.go @@ -1,6 +1,8 @@ package message import ( + "time" + "github.com/Trendyol/go-pq-cdc/pq/message/format" "github.com/go-playground/errors" ) @@ -33,14 +35,14 @@ type Type uint8 var streamedTransaction bool -func New(data []byte, relation map[uint32]*format.Relation) (any, error) { +func New(data []byte, serverTime time.Time, relation map[uint32]*format.Relation) (any, error) { switch Type(data[0]) { case InsertByte: - return format.NewInsert(data, streamedTransaction, relation) + return format.NewInsert(data, streamedTransaction, relation, serverTime) case UpdateByte: - return format.NewUpdate(data, streamedTransaction, relation) + return format.NewUpdate(data, streamedTransaction, relation, serverTime) case DeleteByte: - return format.NewDelete(data, streamedTransaction, relation) + return format.NewDelete(data, streamedTransaction, relation, serverTime) case StreamStopByte, StreamAbortByte, StreamCommitByte: streamedTransaction = false return nil, nil diff --git a/pq/replication/stream.go b/pq/replication/stream.go index 2344f40..118bfdb 100644 --- a/pq/replication/stream.go +++ b/pq/replication/stream.go @@ -164,7 +164,7 @@ func (s *stream) sink(ctx context.Context) { s.metric.SetCDCLatency(time.Now().UTC().Sub(xld.ServerTime).Nanoseconds()) var decodedMsg any - decodedMsg, err = message.New(xld.WALData, s.relation) + decodedMsg, err = message.New(xld.WALData, xld.ServerTime, s.relation) if err != nil || decodedMsg == nil { logger.Debug("wal data message parsing error", "error", err) continue diff --git a/pq/replication/wal.go b/pq/replication/wal.go index 2795bc1..3399da5 100644 --- a/pq/replication/wal.go +++ b/pq/replication/wal.go @@ -2,9 +2,10 @@ package replication import ( "encoding/binary" - "github.com/go-playground/errors" "time" + "github.com/go-playground/errors" + "github.com/Trendyol/go-pq-cdc/pq" )