diff --git a/aws-v1/client/client.go b/aws-v1/client/client.go index c0ea1d4..d19fa93 100644 --- a/aws-v1/client/client.go +++ b/aws-v1/client/client.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/truora/minidyn/core" "github.com/truora/minidyn/interpreter" + minidynTypes "github.com/truora/minidyn/types" ) const ( @@ -741,3 +742,103 @@ func getMissingSubstrs(s string, substrs []string) []string { return missingSubstrs } + +// StreamRecord represents a stream record for AWS SDK v1 +type StreamRecord struct { + EventID string + EventName string + Keys map[string]*dynamodb.AttributeValue + NewImage map[string]*dynamodb.AttributeValue + OldImage map[string]*dynamodb.AttributeValue + SequenceNumber string + StreamViewType string +} + +func mapStringToStreamViewType(viewType string) minidynTypes.StreamViewType { + switch viewType { + case "KEYS_ONLY": + return minidynTypes.StreamViewTypeKeysOnly + case "NEW_IMAGE": + return minidynTypes.StreamViewTypeNewImage + case "OLD_IMAGE": + return minidynTypes.StreamViewTypeOldImage + case "NEW_AND_OLD_IMAGES": + return minidynTypes.StreamViewTypeNewAndOldImages + default: + return minidynTypes.StreamViewTypeKeysOnly + } +} + +// EnableStreams enables DynamoDB Streams on a table +func (fd *Client) EnableStreams(tableName string, viewType string) error { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return err + } + + vt := mapStringToStreamViewType(viewType) + table.EnableStream(vt) + + return nil +} + +// DisableStreams disables DynamoDB Streams on a table +func (fd *Client) DisableStreams(tableName string) error { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return err + } + + table.DisableStream() + + return nil +} + +// GetStreamRecords returns all stream records for a table +func (fd *Client) GetStreamRecords(tableName string) ([]StreamRecord, error) { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return nil, err + } + + records := table.GetStreamRecords() + result := make([]StreamRecord, len(records)) + + for i, record := range records { + result[i] = StreamRecord{ + EventID: record.EventID, + EventName: string(record.EventName), + Keys: mapAttributeValueToDynamodb(record.Keys), + NewImage: mapAttributeValueToDynamodb(record.NewImage), + OldImage: mapAttributeValueToDynamodb(record.OldImage), + SequenceNumber: record.SequenceNumber, + StreamViewType: string(record.StreamViewType), + } + } + + return result, nil +} + +// ClearStreamRecords clears all stream records for a table +func (fd *Client) ClearStreamRecords(tableName string) error { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return err + } + + table.ClearStreamRecords() + + return nil +} diff --git a/aws-v2/client/client.go b/aws-v2/client/client.go index 05f3740..feb1806 100644 --- a/aws-v2/client/client.go +++ b/aws-v2/client/client.go @@ -14,6 +14,7 @@ import ( "github.com/aws/smithy-go" "github.com/truora/minidyn/core" "github.com/truora/minidyn/interpreter" + minidynTypes "github.com/truora/minidyn/types" ) const ( @@ -720,3 +721,103 @@ func getMissingSubstrs(s string, substrs []string) []string { return missingSubstrs } + +// StreamRecord represents a stream record for AWS SDK v2 +type StreamRecord struct { + EventID string + EventName string + Keys map[string]types.AttributeValue + NewImage map[string]types.AttributeValue + OldImage map[string]types.AttributeValue + SequenceNumber string + StreamViewType string +} + +func mapStringToStreamViewType(viewType string) minidynTypes.StreamViewType { + switch viewType { + case "KEYS_ONLY": + return minidynTypes.StreamViewTypeKeysOnly + case "NEW_IMAGE": + return minidynTypes.StreamViewTypeNewImage + case "OLD_IMAGE": + return minidynTypes.StreamViewTypeOldImage + case "NEW_AND_OLD_IMAGES": + return minidynTypes.StreamViewTypeNewAndOldImages + default: + return minidynTypes.StreamViewTypeKeysOnly + } +} + +// EnableStreams enables DynamoDB Streams on a table +func (fd *Client) EnableStreams(tableName string, viewType string) error { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return err + } + + vt := mapStringToStreamViewType(viewType) + table.EnableStream(vt) + + return nil +} + +// DisableStreams disables DynamoDB Streams on a table +func (fd *Client) DisableStreams(tableName string) error { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return err + } + + table.DisableStream() + + return nil +} + +// GetStreamRecords returns all stream records for a table +func (fd *Client) GetStreamRecords(tableName string) ([]StreamRecord, error) { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return nil, err + } + + records := table.GetStreamRecords() + result := make([]StreamRecord, len(records)) + + for i, record := range records { + result[i] = StreamRecord{ + EventID: record.EventID, + EventName: string(record.EventName), + Keys: mapTypesToDynamoMapItem(record.Keys), + NewImage: mapTypesToDynamoMapItem(record.NewImage), + OldImage: mapTypesToDynamoMapItem(record.OldImage), + SequenceNumber: record.SequenceNumber, + StreamViewType: string(record.StreamViewType), + } + } + + return result, nil +} + +// ClearStreamRecords clears all stream records for a table +func (fd *Client) ClearStreamRecords(tableName string) error { + fd.mu.Lock() + defer fd.mu.Unlock() + + table, err := fd.getTable(tableName) + if err != nil { + return err + } + + table.ClearStreamRecords() + + return nil +} diff --git a/core/streams.go b/core/streams.go new file mode 100644 index 0000000..d64a30c --- /dev/null +++ b/core/streams.go @@ -0,0 +1,34 @@ +package core + +import "github.com/truora/minidyn/types" + +// EnableStream enables DynamoDB Streams on the table +func (t *Table) EnableStream(viewType types.StreamViewType) { + t.StreamEnabled = true + t.StreamViewType = viewType +} + +// DisableStream disables DynamoDB Streams on the table +func (t *Table) DisableStream() { + t.StreamEnabled = false +} + +// GetStreamRecords returns all stream records +func (t *Table) GetStreamRecords() []types.StreamRecord { + return t.StreamRecords +} + +// ClearStreamRecords clears all stream records +func (t *Table) ClearStreamRecords() { + t.StreamRecords = []types.StreamRecord{} +} + +// IsStreamEnabled returns whether streams are enabled +func (t *Table) IsStreamEnabled() bool { + return t.StreamEnabled +} + +// GetStreamViewType returns the current stream view type +func (t *Table) GetStreamViewType() types.StreamViewType { + return t.StreamViewType +} diff --git a/core/table.go b/core/table.go index fd21230..b3c2d92 100644 --- a/core/table.go +++ b/core/table.go @@ -35,6 +35,10 @@ type Table struct { UseNativeInterpreter bool NativeInterpreter interpreter.Native LangInterpreter interpreter.Language + StreamEnabled bool + StreamViewType types.StreamViewType + StreamRecords []types.StreamRecord + streamSequence int64 } // NewTable creates a new Table @@ -45,6 +49,8 @@ func NewTable(name string) *Table { AttributesDef: map[string]string{}, SortedKeys: []string{}, Data: map[string]map[string]*types.Item{}, + StreamRecords: []types.StreamRecord{}, + streamSequence: 0, } } @@ -479,6 +485,10 @@ func (t *Table) Put(input *types.PutItemInput) (map[string]*types.Item, error) { return item, types.NewError("ValidationException", err.Error(), nil) } + // Get old item if exists for streams + oldItem := t.getItem(key) + itemExists := len(oldItem) > 0 + // support conditional writes if input.ConditionExpression != nil { _, matched := t.matchKey(QueryInput{ @@ -487,7 +497,7 @@ func (t *Table) Put(input *types.PutItemInput) (map[string]*types.Item, error) { Aliases: input.ExpressionAttributeNames, Limit: 1, ConditionExpression: input.ConditionExpression, - }, t.getItem(key)) + }, oldItem) if !matched { return item, types.NewError("ConditionalCheckFailedException", ErrConditionalRequestFailed.Error(), nil) @@ -503,6 +513,14 @@ func (t *Table) Put(input *types.PutItemInput) (map[string]*types.Item, error) { } } + // Emit stream record + keyItem := t.KeySchema.getKeyItem(item) + if itemExists { + t.emitStreamRecord(types.StreamEventModify, keyItem, oldItem, item) + } else { + t.emitStreamRecord(types.StreamEventInsert, keyItem, nil, item) + } + return item, nil } @@ -578,6 +596,10 @@ func (t *Table) Update(input *types.UpdateItemInput) (map[string]*types.Item, er } } + // Emit stream record for MODIFY + keyItem := t.KeySchema.getKeyItem(item) + t.emitStreamRecord(types.StreamEventModify, keyItem, oldItem, item) + return copyItem(item), nil } @@ -616,6 +638,10 @@ func (t *Table) Delete(input *types.DeleteItemInput) (map[string]*types.Item, er } } + // Emit stream record for REMOVE + keyItem := t.KeySchema.getKeyItem(item) + t.emitStreamRecord(types.StreamEventRemove, keyItem, item, nil) + return item, nil } @@ -672,3 +698,42 @@ func handleConditionalCheckError(input *types.UpdateItemInput, checkErr *types.C checkErr.Item = item } } + +// emitStreamRecord generates and stores a stream record if streams are enabled +func (t *Table) emitStreamRecord(eventType types.StreamEventType, keys map[string]*types.Item, oldImage, newImage map[string]*types.Item) { + if !t.StreamEnabled { + return + } + + t.streamSequence++ + record := types.StreamRecord{ + EventID: fmt.Sprintf("stream-%d", t.streamSequence), + EventName: eventType, + SequenceNumber: fmt.Sprintf("%d", t.streamSequence), + StreamViewType: t.StreamViewType, + Keys: copyItem(keys), + } + + // Set images based on view type + switch t.StreamViewType { + case types.StreamViewTypeKeysOnly: + // Keys already set, nothing else needed + case types.StreamViewTypeNewImage: + if newImage != nil { + record.NewImage = copyItem(newImage) + } + case types.StreamViewTypeOldImage: + if oldImage != nil { + record.OldImage = copyItem(oldImage) + } + case types.StreamViewTypeNewAndOldImages: + if newImage != nil { + record.NewImage = copyItem(newImage) + } + if oldImage != nil { + record.OldImage = copyItem(oldImage) + } + } + + t.StreamRecords = append(t.StreamRecords, record) +} diff --git a/types/types.go b/types/types.go index 87dd100..0b6bb2f 100644 --- a/types/types.go +++ b/types/types.go @@ -322,3 +322,49 @@ func StringValue(str *string) string { return *str } + +// StreamViewType defines the type of data from the table included in the stream +type StreamViewType string + +const ( + // StreamViewTypeKeysOnly only the key attributes of the modified item + StreamViewTypeKeysOnly StreamViewType = "KEYS_ONLY" + // StreamViewTypeNewImage the entire item, as it appears after it was modified + StreamViewTypeNewImage StreamViewType = "NEW_IMAGE" + // StreamViewTypeOldImage the entire item, as it appeared before it was modified + StreamViewTypeOldImage StreamViewType = "OLD_IMAGE" + // StreamViewTypeNewAndOldImages both the new and the old images of the item + StreamViewTypeNewAndOldImages StreamViewType = "NEW_AND_OLD_IMAGES" +) + +// StreamEventType defines the type of event +type StreamEventType string + +const ( + // StreamEventInsert a new item was added to the table + StreamEventInsert StreamEventType = "INSERT" + // StreamEventModify an item was modified + StreamEventModify StreamEventType = "MODIFY" + // StreamEventRemove an item was deleted from the table + StreamEventRemove StreamEventType = "REMOVE" +) + +// StreamRecord contains details about a change to a DynamoDB table +type StreamRecord struct { + _ struct{} `type:"structure"` + EventID string `type:"string"` + EventName StreamEventType `type:"string"` + Keys map[string]*Item `type:"map"` + NewImage map[string]*Item `type:"map"` + OldImage map[string]*Item `type:"map"` + SequenceNumber string `type:"string"` + SizeBytes int64 `type:"long"` + StreamViewType StreamViewType `type:"string"` +} + +// StreamConfiguration represents the DynamoDB Streams configuration for a table +type StreamConfiguration struct { + _ struct{} `type:"structure"` + StreamEnabled bool `type:"boolean"` + StreamViewType StreamViewType `type:"string"` +}