From 3df46fb43f54166223727f66072a00a39b95cf76 Mon Sep 17 00:00:00 2001 From: Soroush Mirzaei Date: Mon, 6 May 2024 13:14:11 -0400 Subject: [PATCH] feat: add message timestamp to view update context --- options.go | 9 +++++++++ partition_table.go | 5 +++-- partition_table_test.go | 25 +++++++++++++++---------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/options.go b/options.go index d93711e7..1e0771cf 100644 --- a/options.go +++ b/options.go @@ -28,6 +28,9 @@ type UpdateContext interface { // It is recommended to lazily evaluate the headers to reduce overhead per message // when headers are not used. Headers() Headers + + // Timestamp returns the timestamp of the input message. + Timestamp() time.Time } // UpdateCallback is invoked upon arrival of a message for a table partition. @@ -95,6 +98,7 @@ type DefaultUpdateContext struct { partition int32 offset int64 headers []*sarama.RecordHeader + timestamp time.Time } // Topic returns the topic of input message. @@ -117,6 +121,11 @@ func (ctx DefaultUpdateContext) Headers() Headers { return HeadersFromSarama(ctx.headers) } +// Timestamp returns the timestamp of the input message. +func (ctx DefaultUpdateContext) Timestamp() time.Time { + return ctx.timestamp +} + /////////////////////////////////////////////////////////////////////////////// // processor options /////////////////////////////////////////////////////////////////////////////// diff --git a/partition_table.go b/partition_table.go index 48bc7b6b..79baf898 100644 --- a/partition_table.go +++ b/partition_table.go @@ -462,7 +462,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition } lastMessage = time.Now() - if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers); err != nil { + if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers, msg.Timestamp); err != nil { return fmt.Errorf("load: error updating storage: %v", err) } @@ -534,12 +534,13 @@ func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int) { }) } -func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader) error { +func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader, timestamp time.Time) error { err := p.st.Update(&DefaultUpdateContext{ topic: p.st.topic, partition: p.st.partition, offset: offset, headers: headers, + timestamp: timestamp, }, key, value) if err != nil { return fmt.Errorf("Error from the update callback while recovering from the log: %v", err) diff --git a/partition_table_test.go b/partition_table_test.go index 94064480..a0db354f 100644 --- a/partition_table_test.go +++ b/partition_table_test.go @@ -708,16 +708,19 @@ func TestPT_loadMessages(t *testing.T) { func TestPT_storeEvent(t *testing.T) { t.Run("succeed", func(t *testing.T) { var ( - localOffset int64 - partition int32 - topic = "some-topic" - key = "some-key" - value = []byte("some-vale") - actualKey string - actualValue []byte - updateCB UpdateCallback = func(ctx UpdateContext, s storage.Storage, k string, v []byte) error { + localOffset int64 + partition int32 + topic = "some-topic" + key = "some-key" + value = []byte("some-vale") + timestamp = time.Now() + actualKey string + actualValue []byte + actualTimestamp time.Time + updateCB UpdateCallback = func(ctx UpdateContext, s storage.Storage, k string, v []byte) error { actualKey = k actualValue = v + actualTimestamp = ctx.Timestamp() return nil } ) @@ -735,9 +738,10 @@ func TestPT_storeEvent(t *testing.T) { defer cancel() err := pt.setup(ctx) require.NoError(t, err) - err = pt.storeEvent(key, value, localOffset, nil) + err = pt.storeEvent(key, value, localOffset, nil, timestamp) require.Equal(t, actualKey, key) require.Equal(t, actualValue, value) + require.Equal(t, actualTimestamp, timestamp) require.NoError(t, err) }) t.Run("fail", func(t *testing.T) { @@ -747,6 +751,7 @@ func TestPT_storeEvent(t *testing.T) { topic = "some-topic" key = "some-key" value = []byte("some-vale") + timestamp = time.Now() updateCB UpdateCallback = updateCallbackNoop retErr error = fmt.Errorf("storage err") ) @@ -764,7 +769,7 @@ func TestPT_storeEvent(t *testing.T) { defer cancel() err := pt.setup(ctx) require.NoError(t, err) - err = pt.storeEvent(key, value, localOffset, nil) + err = pt.storeEvent(key, value, localOffset, nil, timestamp) require.Error(t, err) }) }