From cd45977dcf7100476d04f177b09095331b1841f2 Mon Sep 17 00:00:00 2001 From: omkar-dev Date: Mon, 20 Nov 2023 11:00:08 +1100 Subject: [PATCH 1/4] feat: add a support for transact items in dynago to do put and delete operations synchronously Signed-off-by: omkar-till --- interface.go | 1 + tests/transact_items_test.go | 122 +++++++++++++++++++++++++++++++++++ transaction_items.go | 54 ++++++++++++++++ 3 files changed, 177 insertions(+) create mode 100644 tests/transact_items_test.go create mode 100644 transaction_items.go diff --git a/interface.go b/interface.go index 3f8b556..341752c 100644 --- a/interface.go +++ b/interface.go @@ -29,6 +29,7 @@ type WriteAPI interface { type TransactionAPI interface { TransactPutItems(ctx context.Context, items []*TransactPutItemsInput) error + TransactItems(ctx context.Context, items []*TransactPutItemsInput) error } type ReadAPI interface { diff --git a/tests/transact_items_test.go b/tests/transact_items_test.go new file mode 100644 index 0000000..4ddfc4b --- /dev/null +++ b/tests/transact_items_test.go @@ -0,0 +1,122 @@ +package tests + +import ( + "context" + "reflect" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/oolio-group/dynago" +) + +type Terminal struct { + Id string + Pk string + Sk string +} + +func TestTransactItems(t *testing.T) { + endoint, purge := startLocalDatabase(t) + defer purge() + + table := prepareTable(t, endoint, "transcact_item_test") + + testCases := []struct { + title string + condition string + keys map[string]types.AttributeValue + opts []dynago.QueryOptions + source1 []Terminal + source2 []Terminal + expected []Terminal + expectedErr error + }{{ + title: "assign terminal", + condition: "pk = :pk", + keys: map[string]types.AttributeValue{ + ":pk": &types.AttributeValueMemberS{Value: "terminal1"}, + }, + source1: []Terminal{ + { + Id: "1", + Pk: "terminal1", + Sk: "merchant1", + }, + }, + source2: []Terminal{ + { + Id: "1", + Pk: "terminal1", + Sk: "merchant2", + }, + }, + expected: []Terminal{ + { + Id: "1", + Pk: "terminal1", + Sk: "merchant2", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.title, func(t *testing.T) { + t.Helper() + ctx := context.TODO() + // Create Item + if len(tc.source1) > 0 { + items := make([]*dynago.TransactPutItemsInput, 0, len(tc.source1)) + for _, item := range tc.source1 { + items = append(items, &dynago.TransactPutItemsInput{ + dynago.StringValue(item.Pk), dynago.StringValue(item.Sk), item, + }) + } + err := table.TransactPutItems(ctx, items) + if err != nil { + t.Fatalf("prepare table failed; got %s", err) + } + } + // Update Item + items := make([]types.TransactWriteItem, 0, len(tc.source1)+len(tc.source2)) + if len(tc.source1) > 0 { + for _, item := range tc.source1 { + items = append(items, table.WithDeleteItem(ctx, item.Pk, + item.Sk)) + } + } + if len(tc.source2) > 0 { + for _, item := range tc.source2 { + items = append(items, table.WithPutItem(ctx, item.Pk, + item.Sk, + item)) + } + } + err := table.TransactItems(ctx, items) + if err != nil { + t.Fatalf("error occurred %s", err) + } + + var out []Terminal + _, err = table.Query(ctx, tc.condition, tc.keys, &out) + if tc.expectedErr != nil { + if err == nil { + t.Fatalf("expected query to fail with %s", tc.expectedErr) + } + if !strings.Contains(err.Error(), tc.expectedErr.Error()) { + t.Fatalf("expected query to fail with %s; got %s", tc.expectedErr, err) + } + return + } + if err != nil { + t.Fatalf("expected query to succeed; got %s", err) + } + if !reflect.DeepEqual(tc.expected, out) { + t.Errorf("expected query to return %v; got %v", tc.expected, out) + } + + }) + + } + +} diff --git a/transaction_items.go b/transaction_items.go new file mode 100644 index 0000000..307de24 --- /dev/null +++ b/transaction_items.go @@ -0,0 +1,54 @@ +package dynago + +import ( + "context" + "log" + + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +func (t *Client) WithDeleteItem(ctx context.Context, pk string, sk string) types.TransactWriteItem { + return types.TransactWriteItem{ + Delete: &types.Delete{ + TableName: &t.TableName, + Key: map[string]types.AttributeValue{ + "pk": &types.AttributeValueMemberS{Value: pk}, + "sk": &types.AttributeValueMemberS{Value: sk}, + }, + }, + } + +} + +func (t *Client) WithPutItem(ctx context.Context, pk string, sk string, item interface{}) types.TransactWriteItem { + av, err := attributevalue.MarshalMap(item) + if err != nil { + log.Println("Failed to Marshal item" + err.Error()) + return types.TransactWriteItem{} + } + keys := map[string]types.AttributeValue{ + "pk": &types.AttributeValueMemberS{Value: pk}, + "sk": &types.AttributeValueMemberS{Value: sk}, + } + for k, v := range keys { + av[k] = v + } + return types.TransactWriteItem{ + Put: &types.Put{ + TableName: &t.TableName, + Item: av, + }, + } + +} + +// TransactItems is a synchronous for writing or deletion operation performed in dynamodb grouped together + +func (t *Client) TransactItems(ctx context.Context, input []types.TransactWriteItem) error { + _, err := t.client.TransactWriteItems(ctx, &dynamodb.TransactWriteItemsInput{ + TransactItems: input, + }) + return err +} From d518e3847d0e52335d38076c7ac7f55c0e02e1e4 Mon Sep 17 00:00:00 2001 From: omkar-dev Date: Mon, 20 Nov 2023 12:11:50 +1100 Subject: [PATCH 2/4] fix: address comment update test cases Signed-off-by: omkar-till --- tests/transact_items_test.go | 86 ++++++++++++++++++++---------------- transaction_items.go | 4 +- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/tests/transact_items_test.go b/tests/transact_items_test.go index 4ddfc4b..7d1c46c 100644 --- a/tests/transact_items_test.go +++ b/tests/transact_items_test.go @@ -23,82 +23,94 @@ func TestTransactItems(t *testing.T) { table := prepareTable(t, endoint, "transcact_item_test") testCases := []struct { - title string - condition string - keys map[string]types.AttributeValue - opts []dynago.QueryOptions - source1 []Terminal - source2 []Terminal + title string + condition string + keys map[string]types.AttributeValue + opts []dynago.QueryOptions + //items to be added + newItems []Terminal + operations []types.TransactWriteItem + //items expected to exist in table after transaction operation expected []Terminal expectedErr error }{{ - title: "assign terminal", + title: "assign terminal - only add a terminal", condition: "pk = :pk", keys: map[string]types.AttributeValue{ ":pk": &types.AttributeValueMemberS{Value: "terminal1"}, }, - source1: []Terminal{ - { + newItems: []Terminal{}, + operations: []types.TransactWriteItem{ + table.WithPutItem("terminal1", "merchant2", Terminal{ Id: "1", Pk: "terminal1", Sk: "merchant1", - }, + }), }, - source2: []Terminal{ + expected: []Terminal{ { Id: "1", Pk: "terminal1", - Sk: "merchant2", + Sk: "merchant1", }, }, - expected: []Terminal{ - { + }, + { + title: "assign terminal - delete existing and update with new", + condition: "pk = :pk", + keys: map[string]types.AttributeValue{ + ":pk": &types.AttributeValueMemberS{Value: "terminal1"}, + }, + newItems: []Terminal{{ Id: "1", Pk: "terminal1", Sk: "merchant2", + }}, + operations: []types.TransactWriteItem{ + table.WithDeleteItem("terminal1", "merchant1"), + table.WithPutItem("terminal1", "merchant2", Terminal{ + Id: "1", + Pk: "terminal1", + Sk: "merchant2", + }), + }, + expected: []Terminal{ + { + Id: "1", + Pk: "terminal1", + Sk: "merchant2", + }, }, }, - }, } for _, tc := range testCases { t.Run(tc.title, func(t *testing.T) { t.Helper() ctx := context.TODO() // Create Item - if len(tc.source1) > 0 { - items := make([]*dynago.TransactPutItemsInput, 0, len(tc.source1)) - for _, item := range tc.source1 { + if len(tc.newItems) > 0 { + items := make([]*dynago.TransactPutItemsInput, 0, len(tc.newItems)) + for _, item := range tc.newItems { items = append(items, &dynago.TransactPutItemsInput{ dynago.StringValue(item.Pk), dynago.StringValue(item.Sk), item, }) } err := table.TransactPutItems(ctx, items) if err != nil { - t.Fatalf("prepare table failed; got %s", err) - } - } - // Update Item - items := make([]types.TransactWriteItem, 0, len(tc.source1)+len(tc.source2)) - if len(tc.source1) > 0 { - for _, item := range tc.source1 { - items = append(items, table.WithDeleteItem(ctx, item.Pk, - item.Sk)) + t.Fatalf("transaction put items failed; got %s", err) } } - if len(tc.source2) > 0 { - for _, item := range tc.source2 { - items = append(items, table.WithPutItem(ctx, item.Pk, - item.Sk, - item)) + //perform operations + if len(tc.operations) > 0 { + err := table.TransactItems(ctx, tc.operations) + if err != nil { + t.Fatalf("error occurred %s", err) } - } - err := table.TransactItems(ctx, items) - if err != nil { - t.Fatalf("error occurred %s", err) + } var out []Terminal - _, err = table.Query(ctx, tc.condition, tc.keys, &out) + _, err := table.Query(ctx, tc.condition, tc.keys, &out) if tc.expectedErr != nil { if err == nil { t.Fatalf("expected query to fail with %s", tc.expectedErr) diff --git a/transaction_items.go b/transaction_items.go index 307de24..79ded31 100644 --- a/transaction_items.go +++ b/transaction_items.go @@ -9,7 +9,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) -func (t *Client) WithDeleteItem(ctx context.Context, pk string, sk string) types.TransactWriteItem { +func (t *Client) WithDeleteItem(pk string, sk string) types.TransactWriteItem { return types.TransactWriteItem{ Delete: &types.Delete{ TableName: &t.TableName, @@ -22,7 +22,7 @@ func (t *Client) WithDeleteItem(ctx context.Context, pk string, sk string) types } -func (t *Client) WithPutItem(ctx context.Context, pk string, sk string, item interface{}) types.TransactWriteItem { +func (t *Client) WithPutItem(pk string, sk string, item interface{}) types.TransactWriteItem { av, err := attributevalue.MarshalMap(item) if err != nil { log.Println("Failed to Marshal item" + err.Error()) From e0005f24b3471bf84f4a08d614743ad71966f5a1 Mon Sep 17 00:00:00 2001 From: omkar-dev Date: Mon, 20 Nov 2023 12:29:09 +1100 Subject: [PATCH 3/4] fix: update test Signed-off-by: omkar-till --- tests/transact_items_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/transact_items_test.go b/tests/transact_items_test.go index 7d1c46c..5292c5d 100644 --- a/tests/transact_items_test.go +++ b/tests/transact_items_test.go @@ -41,7 +41,7 @@ func TestTransactItems(t *testing.T) { }, newItems: []Terminal{}, operations: []types.TransactWriteItem{ - table.WithPutItem("terminal1", "merchant2", Terminal{ + table.WithPutItem("terminal1", "merchant1", Terminal{ Id: "1", Pk: "terminal1", Sk: "merchant1", From fc1a8c66320bd0f73ee097c5482ecaff6d5a5c21 Mon Sep 17 00:00:00 2001 From: omkar-dev Date: Mon, 20 Nov 2023 13:58:50 +1100 Subject: [PATCH 4/4] fix: update interface value Signed-off-by: omkar-till --- interface.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interface.go b/interface.go index 341752c..ebbcf39 100644 --- a/interface.go +++ b/interface.go @@ -29,7 +29,7 @@ type WriteAPI interface { type TransactionAPI interface { TransactPutItems(ctx context.Context, items []*TransactPutItemsInput) error - TransactItems(ctx context.Context, items []*TransactPutItemsInput) error + TransactItems(ctx context.Context, input []types.TransactWriteItem) error } type ReadAPI interface {