Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions aws-v1/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/truora/minidyn/core"
"github.com/truora/minidyn/interpreter"
minidynTypes "github.com/truora/minidyn/types"
)

const (
Expand Down Expand Up @@ -741,3 +742,103 @@ func getMissingSubstrs(s string, substrs []string) []string {

return missingSubstrs
}

// StreamRecord represents a stream record for AWS SDK v1
type StreamRecord struct {
EventID string
EventName string
Keys map[string]*dynamodb.AttributeValue
NewImage map[string]*dynamodb.AttributeValue
OldImage map[string]*dynamodb.AttributeValue
SequenceNumber string
StreamViewType string
}

func mapStringToStreamViewType(viewType string) minidynTypes.StreamViewType {
switch viewType {
case "KEYS_ONLY":
return minidynTypes.StreamViewTypeKeysOnly
case "NEW_IMAGE":
return minidynTypes.StreamViewTypeNewImage
case "OLD_IMAGE":
return minidynTypes.StreamViewTypeOldImage
case "NEW_AND_OLD_IMAGES":
return minidynTypes.StreamViewTypeNewAndOldImages
default:
return minidynTypes.StreamViewTypeKeysOnly
}
}

// EnableStreams enables DynamoDB Streams on a table
func (fd *Client) EnableStreams(tableName string, viewType string) error {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return err
}

vt := mapStringToStreamViewType(viewType)
table.EnableStream(vt)

return nil
}

// DisableStreams disables DynamoDB Streams on a table
func (fd *Client) DisableStreams(tableName string) error {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return err
}

table.DisableStream()

return nil
}

// GetStreamRecords returns all stream records for a table
func (fd *Client) GetStreamRecords(tableName string) ([]StreamRecord, error) {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return nil, err
}

records := table.GetStreamRecords()
result := make([]StreamRecord, len(records))

for i, record := range records {
result[i] = StreamRecord{
EventID: record.EventID,
EventName: string(record.EventName),
Keys: mapAttributeValueToDynamodb(record.Keys),
NewImage: mapAttributeValueToDynamodb(record.NewImage),
OldImage: mapAttributeValueToDynamodb(record.OldImage),
SequenceNumber: record.SequenceNumber,
StreamViewType: string(record.StreamViewType),
}
}

return result, nil
}

// ClearStreamRecords clears all stream records for a table
func (fd *Client) ClearStreamRecords(tableName string) error {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return err
}

table.ClearStreamRecords()

return nil
}
101 changes: 101 additions & 0 deletions aws-v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/smithy-go"
"github.com/truora/minidyn/core"
"github.com/truora/minidyn/interpreter"
minidynTypes "github.com/truora/minidyn/types"
)

const (
Expand Down Expand Up @@ -720,3 +721,103 @@ func getMissingSubstrs(s string, substrs []string) []string {

return missingSubstrs
}

// StreamRecord represents a stream record for AWS SDK v2
type StreamRecord struct {
EventID string
EventName string
Keys map[string]types.AttributeValue
NewImage map[string]types.AttributeValue
OldImage map[string]types.AttributeValue
SequenceNumber string
StreamViewType string
}

func mapStringToStreamViewType(viewType string) minidynTypes.StreamViewType {
switch viewType {
case "KEYS_ONLY":
return minidynTypes.StreamViewTypeKeysOnly
case "NEW_IMAGE":
return minidynTypes.StreamViewTypeNewImage
case "OLD_IMAGE":
return minidynTypes.StreamViewTypeOldImage
case "NEW_AND_OLD_IMAGES":
return minidynTypes.StreamViewTypeNewAndOldImages
default:
return minidynTypes.StreamViewTypeKeysOnly
}
}

// EnableStreams enables DynamoDB Streams on a table
func (fd *Client) EnableStreams(tableName string, viewType string) error {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return err
}

vt := mapStringToStreamViewType(viewType)
table.EnableStream(vt)

return nil
}

// DisableStreams disables DynamoDB Streams on a table
func (fd *Client) DisableStreams(tableName string) error {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return err
}

table.DisableStream()

return nil
}

// GetStreamRecords returns all stream records for a table
func (fd *Client) GetStreamRecords(tableName string) ([]StreamRecord, error) {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return nil, err
}

records := table.GetStreamRecords()
result := make([]StreamRecord, len(records))

for i, record := range records {
result[i] = StreamRecord{
EventID: record.EventID,
EventName: string(record.EventName),
Keys: mapTypesToDynamoMapItem(record.Keys),
NewImage: mapTypesToDynamoMapItem(record.NewImage),
OldImage: mapTypesToDynamoMapItem(record.OldImage),
SequenceNumber: record.SequenceNumber,
StreamViewType: string(record.StreamViewType),
}
}

return result, nil
}

// ClearStreamRecords clears all stream records for a table
func (fd *Client) ClearStreamRecords(tableName string) error {
fd.mu.Lock()
defer fd.mu.Unlock()

table, err := fd.getTable(tableName)
if err != nil {
return err
}

table.ClearStreamRecords()

return nil
}
34 changes: 34 additions & 0 deletions core/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package core

import "github.com/truora/minidyn/types"

// EnableStream enables DynamoDB Streams on the table
func (t *Table) EnableStream(viewType types.StreamViewType) {
t.StreamEnabled = true
t.StreamViewType = viewType
}

// DisableStream disables DynamoDB Streams on the table
func (t *Table) DisableStream() {
t.StreamEnabled = false
}

// GetStreamRecords returns all stream records
func (t *Table) GetStreamRecords() []types.StreamRecord {
return t.StreamRecords
}

// ClearStreamRecords clears all stream records
func (t *Table) ClearStreamRecords() {
t.StreamRecords = []types.StreamRecord{}
}

// IsStreamEnabled returns whether streams are enabled
func (t *Table) IsStreamEnabled() bool {
return t.StreamEnabled
}

// GetStreamViewType returns the current stream view type
func (t *Table) GetStreamViewType() types.StreamViewType {
return t.StreamViewType
}
67 changes: 66 additions & 1 deletion core/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type Table struct {
UseNativeInterpreter bool
NativeInterpreter interpreter.Native
LangInterpreter interpreter.Language
StreamEnabled bool
StreamViewType types.StreamViewType
StreamRecords []types.StreamRecord
streamSequence int64
}

// NewTable creates a new Table
Expand All @@ -45,6 +49,8 @@ func NewTable(name string) *Table {
AttributesDef: map[string]string{},
SortedKeys: []string{},
Data: map[string]map[string]*types.Item{},
StreamRecords: []types.StreamRecord{},
streamSequence: 0,
}
}

Expand Down Expand Up @@ -479,6 +485,10 @@ func (t *Table) Put(input *types.PutItemInput) (map[string]*types.Item, error) {
return item, types.NewError("ValidationException", err.Error(), nil)
}

// Get old item if exists for streams
oldItem := t.getItem(key)
itemExists := len(oldItem) > 0

// support conditional writes
if input.ConditionExpression != nil {
_, matched := t.matchKey(QueryInput{
Expand All @@ -487,7 +497,7 @@ func (t *Table) Put(input *types.PutItemInput) (map[string]*types.Item, error) {
Aliases: input.ExpressionAttributeNames,
Limit: 1,
ConditionExpression: input.ConditionExpression,
}, t.getItem(key))
}, oldItem)

if !matched {
return item, types.NewError("ConditionalCheckFailedException", ErrConditionalRequestFailed.Error(), nil)
Expand All @@ -503,6 +513,14 @@ func (t *Table) Put(input *types.PutItemInput) (map[string]*types.Item, error) {
}
}

// Emit stream record
keyItem := t.KeySchema.getKeyItem(item)
if itemExists {
t.emitStreamRecord(types.StreamEventModify, keyItem, oldItem, item)
} else {
t.emitStreamRecord(types.StreamEventInsert, keyItem, nil, item)
}

return item, nil
}

Expand Down Expand Up @@ -578,6 +596,10 @@ func (t *Table) Update(input *types.UpdateItemInput) (map[string]*types.Item, er
}
}

// Emit stream record for MODIFY
keyItem := t.KeySchema.getKeyItem(item)
t.emitStreamRecord(types.StreamEventModify, keyItem, oldItem, item)

return copyItem(item), nil
}

Expand Down Expand Up @@ -616,6 +638,10 @@ func (t *Table) Delete(input *types.DeleteItemInput) (map[string]*types.Item, er
}
}

// Emit stream record for REMOVE
keyItem := t.KeySchema.getKeyItem(item)
t.emitStreamRecord(types.StreamEventRemove, keyItem, item, nil)

return item, nil
}

Expand Down Expand Up @@ -672,3 +698,42 @@ func handleConditionalCheckError(input *types.UpdateItemInput, checkErr *types.C
checkErr.Item = item
}
}

// emitStreamRecord generates and stores a stream record if streams are enabled
func (t *Table) emitStreamRecord(eventType types.StreamEventType, keys map[string]*types.Item, oldImage, newImage map[string]*types.Item) {
if !t.StreamEnabled {
return
}

t.streamSequence++
record := types.StreamRecord{
EventID: fmt.Sprintf("stream-%d", t.streamSequence),
EventName: eventType,
SequenceNumber: fmt.Sprintf("%d", t.streamSequence),
StreamViewType: t.StreamViewType,
Keys: copyItem(keys),
}

// Set images based on view type
switch t.StreamViewType {
case types.StreamViewTypeKeysOnly:
// Keys already set, nothing else needed
case types.StreamViewTypeNewImage:
if newImage != nil {
record.NewImage = copyItem(newImage)
}
case types.StreamViewTypeOldImage:
if oldImage != nil {
record.OldImage = copyItem(oldImage)
}
case types.StreamViewTypeNewAndOldImages:
if newImage != nil {
record.NewImage = copyItem(newImage)
}
if oldImage != nil {
record.OldImage = copyItem(oldImage)
}
}

t.StreamRecords = append(t.StreamRecords, record)
}
Loading
Loading