Skip to content

Commit

Permalink
fix dynamoevents iterator compat (#50820)
Browse files Browse the repository at this point in the history
  • Loading branch information
fspmarshall authored Jan 7, 2025
1 parent a0a1b87 commit 885de7b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
58 changes: 58 additions & 0 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
autoscalingtypes "github.com/aws/aws-sdk-go-v2/service/applicationautoscaling/types"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
dynamodbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
legacydynamo "github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/smithy-go"
"github.com/google/uuid"
"github.com/gravitational/trace"
Expand Down Expand Up @@ -689,6 +691,24 @@ type checkpointKey struct {
EventKey string `json:"event_key,omitempty"`
}

// legacyCheckpointKey is the old checkpoint key returned by older auth versions. Used to decode
// checkpoints originating from old auths. Commonly we don't bother supporting pagination/cursors
// across teleport versions since the benefit of doing so is usually minimal, but this value is used
// as on-disk state by long running event export operations, and so must be supported.
//
// DELETE IN: 19.0.0
type legacyCheckpointKey struct {
// The date that the Dynamo iterator corresponds to.
Date string `json:"date,omitempty"`

// A DynamoDB query iterator. Allows us to resume a partial query.
Iterator map[string]*legacydynamo.AttributeValue `json:"iterator,omitempty"`

// EventKey is a derived identifier for an event used for resuming
// sub-page breaks due to size constraints.
EventKey string `json:"event_key,omitempty"`
}

// SearchEvents is a flexible way to find events.
//
// Event types to filter can be specified and pagination is handled by an iterator key that allows
Expand Down Expand Up @@ -936,11 +956,49 @@ func getCheckpointFromStartKey(startKey string) (checkpointKey, error) {
}
// If a checkpoint key is provided, unmarshal it so we can work with it's parts.
if err := json.Unmarshal([]byte(startKey), &checkpoint); err != nil {
// attempt to decode as legacy format.
if checkpoint, err = getCheckpointFromLegacyStartKey(startKey); err == nil {
return checkpoint, nil
}
return checkpointKey{}, trace.Wrap(err)
}
return checkpoint, nil
}

// getCheckpointFromLegacyStartKey is a helper function that decodes a legacy checkpoint key
// into the new format. The old format used raw dynamo attribute values for the iterator, where
// the new format uses a json-serialized map with bare values.
//
// DELETE IN: 19.0.0
func getCheckpointFromLegacyStartKey(startKey string) (checkpointKey, error) {
var checkpoint legacyCheckpointKey
if startKey == "" {
return checkpointKey{}, nil
}
// If a checkpoint key is provided, unmarshal it so we can work with its parts.
if err := json.Unmarshal([]byte(startKey), &checkpoint); err != nil {
return checkpointKey{}, trace.Wrap(err)
}

// decode the dynamo attrs into the go map repr common to the old and new formats.
m := make(map[string]any)
if err := dynamodbattribute.UnmarshalMap(checkpoint.Iterator, &m); err != nil {
return checkpointKey{}, trace.Wrap(err)
}

// encode the map into json, making it equivalent to the new format.
iterator, err := json.Marshal(m)
if err != nil {
return checkpointKey{}, trace.Wrap(err)
}

return checkpointKey{
Date: checkpoint.Date,
Iterator: string(iterator),
EventKey: checkpoint.EventKey,
}, nil
}

func getExprFilter(filter searchEventsFilter) *string {
var filterConds []string
if len(filter.eventTypes) > 0 {
Expand Down
16 changes: 16 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -666,3 +667,18 @@ func TestEndpoints(t *testing.T) {
})
}
}

func TestStartKeyBackCompat(t *testing.T) {
const (
oldStartKey = `{"date":"2023-04-27","iterator":{"CreatedAt":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"1682583778","NS":null,"NULL":null,"S":null,"SS":null},"CreatedAtDate":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"2023-04-27","SS":null},"EventIndex":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":"0","NS":null,"NULL":null,"S":null,"SS":null},"SessionID":{"B":null,"BOOL":null,"BS":null,"L":null,"M":null,"N":null,"NS":null,"NULL":null,"S":"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb","SS":null}}}`
newStartKey = `{"date":"2023-04-27","iterator":"{\"CreatedAt\":1682583778,\"CreatedAtDate\":\"2023-04-27\",\"EventIndex\":0,\"SessionID\":\"4bc51fd7-4f0c-47ee-b9a5-da621fbdbabb\"}"}`
)

oldCP, err := getCheckpointFromStartKey(oldStartKey)
require.NoError(t, err)

newCP, err := getCheckpointFromStartKey(newStartKey)
require.NoError(t, err)

require.Empty(t, cmp.Diff(oldCP, newCP))
}

0 comments on commit 885de7b

Please sign in to comment.