Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 250 additions & 0 deletions aws-v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/aws/smithy-go"
"github.com/truora/minidyn/core"
"github.com/truora/minidyn/interpreter"
"github.com/truora/minidyn/interpreter/partiql"
minidynTypes "github.com/truora/minidyn/types"
)

const (
Expand Down Expand Up @@ -48,6 +50,9 @@ type FakeClient interface {
BatchWriteItem(ctx context.Context, input *dynamodb.BatchWriteItemInput, opts ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
BatchGetItem(ctx context.Context, input *dynamodb.BatchGetItemInput, opts ...func(*dynamodb.Options)) (*dynamodb.BatchGetItemOutput, error)
TransactWriteItems(ctx context.Context, input *dynamodb.TransactWriteItemsInput, opts ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
ExecuteStatement(ctx context.Context, input *dynamodb.ExecuteStatementInput, opts ...func(*dynamodb.Options)) (*dynamodb.ExecuteStatementOutput, error)
BatchExecuteStatement(ctx context.Context, input *dynamodb.BatchExecuteStatementInput, opts ...func(*dynamodb.Options)) (*dynamodb.BatchExecuteStatementOutput, error)
ExecuteTransaction(ctx context.Context, input *dynamodb.ExecuteTransactionInput, opts ...func(*dynamodb.Options)) (*dynamodb.ExecuteTransactionOutput, error)
}

// Client define a mock struct to be used
Expand Down Expand Up @@ -720,3 +725,248 @@ func getMissingSubstrs(s string, substrs []string) []string {

return missingSubstrs
}

// ExecuteStatement executes a PartiQL statement
func (fd *Client) ExecuteStatement(ctx context.Context, input *dynamodb.ExecuteStatementInput, opts ...func(*dynamodb.Options)) (*dynamodb.ExecuteStatementOutput, error) {
fd.mu.Lock()
defer fd.mu.Unlock()

if fd.forceFailureErr != nil {
return nil, fd.forceFailureErr
}

if input.Statement == nil {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: "Statement is required"}
}

// Parse the PartiQL statement
lexer := partiql.NewLexer(*input.Statement)
parser := partiql.NewParser(lexer)
stmt := parser.ParseStatement()

if len(parser.Errors()) > 0 {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: fmt.Sprintf("PartiQL syntax error: %s", strings.Join(parser.Errors(), "; "))}
}

// Convert parameters to a format the evaluator can use
params := make([]interface{}, len(input.Parameters))
for i, param := range input.Parameters {
// Extract the value from the AttributeValue
params[i] = extractAttributeValue(param)
}

evaluator := partiql.NewEvaluator(params)

// Execute based on statement type
switch s := stmt.(type) {
case *partiql.SelectStatement:
return fd.executeSelect(ctx, s, evaluator)
case *partiql.InsertStatement:
return fd.executeInsert(ctx, s, evaluator)
case *partiql.UpdateStatement:
return fd.executeUpdate(ctx, s, evaluator)
case *partiql.DeleteStatement:
return fd.executeDelete(ctx, s, evaluator)
default:
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: "Unsupported statement type"}
}
}

func (fd *Client) executeSelect(ctx context.Context, stmt *partiql.SelectStatement, eval *partiql.Evaluator) (*dynamodb.ExecuteStatementOutput, error) {
query, err := eval.TranslateSelectToQuery(stmt)
if err != nil {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: err.Error()}
}

// Determine if this is a Query or Scan
isQuery, _ := query["IsQuery"].(bool)

if isQuery {
// Execute as Query
queryInput := &dynamodb.QueryInput{
TableName: aws.String(query["TableName"].(string)),
}

if keyCondition, ok := query["KeyConditionExpression"].(string); ok {
queryInput.KeyConditionExpression = aws.String(keyCondition)
}
if filterExpr, ok := query["FilterExpression"].(string); ok {
queryInput.FilterExpression = aws.String(filterExpr)
}
if limit, ok := query["Limit"].(int64); ok {
queryInput.Limit = aws.Int32(int32(limit))
}
if exprValues, ok := query["ExpressionAttributeValues"].(map[string]*minidynTypes.Item); ok {
queryInput.ExpressionAttributeValues = mapTypesToDynamoMapItem(exprValues)
}
if exprNames, ok := query["ExpressionAttributeNames"].(map[string]string); ok {
queryInput.ExpressionAttributeNames = exprNames
}

result, err := fd.Query(ctx, queryInput)
if err != nil {
return nil, err
}

return &dynamodb.ExecuteStatementOutput{
Items: result.Items,
LastEvaluatedKey: result.LastEvaluatedKey,
}, nil
}

// Execute as Scan
scanInput := &dynamodb.ScanInput{
TableName: aws.String(query["TableName"].(string)),
}

if filterExpr, ok := query["FilterExpression"].(string); ok {
scanInput.FilterExpression = aws.String(filterExpr)
}
if limit, ok := query["Limit"].(int64); ok {
scanInput.Limit = aws.Int32(int32(limit))
}

result, err := fd.Scan(ctx, scanInput)
if err != nil {
return nil, err
}

return &dynamodb.ExecuteStatementOutput{
Items: result.Items,
LastEvaluatedKey: result.LastEvaluatedKey,
}, nil
}

func (fd *Client) executeInsert(ctx context.Context, stmt *partiql.InsertStatement, eval *partiql.Evaluator) (*dynamodb.ExecuteStatementOutput, error) {
item, err := eval.TranslateInsertToPutItem(stmt)
if err != nil {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: err.Error()}
}

putInput := &dynamodb.PutItemInput{
TableName: aws.String(stmt.TableName),
Item: mapTypesToDynamoMapItem(item),
}

_, err = fd.PutItem(ctx, putInput)
if err != nil {
return nil, err
}

return &dynamodb.ExecuteStatementOutput{}, nil
}

func (fd *Client) executeUpdate(ctx context.Context, stmt *partiql.UpdateStatement, eval *partiql.Evaluator) (*dynamodb.ExecuteStatementOutput, error) {
_, err := eval.TranslateUpdateToUpdateItem(stmt)
if err != nil {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: err.Error()}
}

// Extract key from WHERE clause (simplified - in practice would need proper parsing)
// For now, return an error indicating partial implementation
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: "UPDATE via PartiQL requires further implementation for key extraction"}
}

func (fd *Client) executeDelete(ctx context.Context, stmt *partiql.DeleteStatement, eval *partiql.Evaluator) (*dynamodb.ExecuteStatementOutput, error) {
_, err := eval.TranslateDeleteToDeleteItem(stmt)
if err != nil {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: err.Error()}
}

// Extract key from WHERE clause (simplified - in practice would need proper parsing)
// For now, return an error indicating partial implementation
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: "DELETE via PartiQL requires further implementation for key extraction"}
}

func extractAttributeValue(attr types.AttributeValue) interface{} {
switch v := attr.(type) {
case *types.AttributeValueMemberS:
return v.Value
case *types.AttributeValueMemberN:
return v.Value
case *types.AttributeValueMemberBOOL:
return v.Value
case *types.AttributeValueMemberNULL:
return nil
default:
return nil
}
}

// BatchExecuteStatement executes a batch of PartiQL statements
func (fd *Client) BatchExecuteStatement(ctx context.Context, input *dynamodb.BatchExecuteStatementInput, opts ...func(*dynamodb.Options)) (*dynamodb.BatchExecuteStatementOutput, error) {
if fd.forceFailureErr != nil {
return nil, fd.forceFailureErr
}

if len(input.Statements) > batchRequestsLimit {
return nil, &smithy.GenericAPIError{Code: "ValidationException", Message: "Too many statements for batch execution"}
}

responses := make([]types.BatchStatementResponse, len(input.Statements))

for i, stmtRequest := range input.Statements {
// Execute each statement
executeInput := &dynamodb.ExecuteStatementInput{
Statement: stmtRequest.Statement,
Parameters: stmtRequest.Parameters,
}

result, err := fd.ExecuteStatement(ctx, executeInput)
if err != nil {
responses[i] = types.BatchStatementResponse{
Error: &types.BatchStatementError{
Code: types.BatchStatementErrorCodeEnumValidationError,
Message: aws.String(err.Error()),
},
}
} else {
responses[i] = types.BatchStatementResponse{
Item: getFirstItemOrNil(result.Items),
}
}
}

return &dynamodb.BatchExecuteStatementOutput{
Responses: responses,
}, nil
}

func getFirstItemOrNil(items []map[string]types.AttributeValue) map[string]types.AttributeValue {
if len(items) > 0 {
return items[0]
}
return nil
}

// ExecuteTransaction executes a transactional PartiQL statement
func (fd *Client) ExecuteTransaction(ctx context.Context, input *dynamodb.ExecuteTransactionInput, opts ...func(*dynamodb.Options)) (*dynamodb.ExecuteTransactionOutput, error) {
if fd.forceFailureErr != nil {
return nil, fd.forceFailureErr
}

// Basic implementation - execute all statements atomically
// In a full implementation, this would need proper transaction support
responses := make([]types.ItemResponse, len(input.TransactStatements))

for i, stmtRequest := range input.TransactStatements {
executeInput := &dynamodb.ExecuteStatementInput{
Statement: stmtRequest.Statement,
Parameters: stmtRequest.Parameters,
}

result, err := fd.ExecuteStatement(ctx, executeInput)
if err != nil {
// In a real transaction, we'd rollback all changes
return nil, &smithy.GenericAPIError{Code: "TransactionCanceledException", Message: fmt.Sprintf("Transaction cancelled due to statement error: %s", err.Error())}
}

responses[i] = types.ItemResponse{
Item: getFirstItemOrNil(result.Items),
}
}

return &dynamodb.ExecuteTransactionOutput{
Responses: responses,
}, nil
}
Loading
Loading