Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
// TokenMaxDuration specifies the lifetime for each access token.
TokenMaxDuration int64 = 86400 * 1e3 // Milliseconds
bearerPrefix string = "Bearer "
tokenRE string = `^(?P<TimestampHex>[a-fA-F0-9]+)\|(?P<SignedTimestampHex>[a-fA-F0-9]+)\|(?P<PublicKeyHex>[a-fA-F0-9]+)$`
tokenRE string = `^(?P<TimestampHex>[a-fA-F0-9]+)\|(?P<SignedTimestampHex>[a-fA-F0-9]+)\|(?P<PublicKeyHex>[a-fA-F0-9]+)$` //nolint:gosec // G101 false positive: regex pattern, not a credential
)

// Token represents the values we have in access tokens.
Expand Down
7 changes: 6 additions & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func (c *Cache) GetInterimCount(ctx context.Context, clientID string, countType

// IncrementInterimCount increments or decrements the amount of entities inserted in
// the DB that were not yet added to the item count
func (c *Cache) IncrementInterimCount(ctx context.Context, clientID string, countType string, subtract bool) (int, error) {
func (c *Cache) IncrementInterimCount(
ctx context.Context,
clientID string,
countType string,
subtract bool,
) (int, error) {
return c.Incr(ctx, GetInterimCountKey(clientID, countType), subtract)
}

Expand Down
97 changes: 80 additions & 17 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (

var (
// Could be modified in tests.
maxGUBatchSize = 500
maxClientObjectQuota = 50000
maxClientHistoryObjectQuota = 30000
maxGUBatchSize int32 = 500
maxClientObjectQuota = 50000
maxClientHistoryObjectQuota = 30000
)

const (
Expand All @@ -34,15 +34,29 @@ const (
// handleGetUpdatesRequest handles GetUpdatesMessage and fills
// GetUpdatesResponse. Target sync entities in the database will be updated or
// deleted based on the client's requests.
func handleGetUpdatesRequest(ctx context.Context, cache *cache.Cache, guMsg *sync_pb.GetUpdatesMessage, guRsp *sync_pb.GetUpdatesResponse, db datastore.Datastore, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
func handleGetUpdatesRequest(
ctx context.Context,
cache *cache.Cache,
guMsg *sync_pb.GetUpdatesMessage,
guRsp *sync_pb.GetUpdatesResponse,
db datastore.Datastore,
clientID string,
) (*sync_pb.SyncEnums_ErrorType, error) {
errCode := sync_pb.SyncEnums_SUCCESS // default value, might be changed later
isNewClient := guMsg.GetUpdatesOrigin != nil && *guMsg.GetUpdatesOrigin == sync_pb.SyncEnums_NEW_CLIENT
isPoll := guMsg.GetUpdatesOrigin != nil && *guMsg.GetUpdatesOrigin == sync_pb.SyncEnums_PERIODIC
if isNewClient {
// Reject the request if client has >= 50 devices in the chain.
activeDevices := 0
for {
hasChangesRemaining, syncEntities, err := db.GetUpdatesForType(ctx, deviceInfoTypeID, 0, false, clientID, int64(maxGUBatchSize))
hasChangesRemaining, syncEntities, err := db.GetUpdatesForType(
ctx,
deviceInfoTypeID,
0,
false,
clientID,
maxGUBatchSize,
)
if err != nil {
log.Error().Err(err).Msgf("db.GetUpdatesForType failed for type %v", deviceInfoTypeID)
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand Down Expand Up @@ -120,7 +134,7 @@ func handleGetUpdatesRequest(ctx context.Context, cache *cache.Cache, guMsg *syn
// of break because we need to prepare NewProgressMarker for all entries in
// FromProgressMarker, where the returned token stays the same as the one
// passed in FromProgressMarker.
if len(guRsp.Entries) >= maxSize {
if len(guRsp.Entries) >= int(maxSize) {
continue
}

Expand All @@ -135,8 +149,15 @@ func handleGetUpdatesRequest(ctx context.Context, cache *cache.Cache, guMsg *syn
continue
}

curMaxSize := int64(maxSize) - int64(len(guRsp.Entries))
hasChangesRemaining, entities, err := db.GetUpdatesForType(ctx, int(*fromProgressMarker.DataTypeId), token, fetchFolders, clientID, curMaxSize)
curMaxSize := int32(int(maxSize) - len(guRsp.Entries)) //nolint:gosec // maxSize is 500 and the >= check above ensures entries fits within it
hasChangesRemaining, entities, err := db.GetUpdatesForType(
ctx,
int(*fromProgressMarker.DataTypeId),
token,
fetchFolders,
clientID,
curMaxSize,
)
if err != nil {
log.Error().Err(err).Msgf("db.GetUpdatesForType failed for type %v", *fromProgressMarker.DataTypeId)
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand Down Expand Up @@ -195,7 +216,12 @@ func handleGetUpdatesRequest(ctx context.Context, cache *cache.Cache, guMsg *syn
return &errCode, nil
}

func getItemCounts(ctx context.Context, cache *cache.Cache, db datastore.Datastore, clientID string) (*datastore.ClientItemCounts, int, int, error) {
func getItemCounts(
ctx context.Context,
cache *cache.Cache,
db datastore.Datastore,
clientID string,
) (*datastore.ClientItemCounts, int, int, error) {
itemCounts, err := db.GetClientItemCount(ctx, clientID)
if err != nil {
return nil, 0, 0, err
Expand Down Expand Up @@ -223,7 +249,14 @@ func getInterimItemCounts(ctx context.Context, cache *cache.Cache, clientID stri
// For each commit entry:
// - new sync entity is created and inserted into the database if version is 0.
// - existed sync entity will be updated if version is greater than 0.
func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *sync_pb.CommitMessage, commitRsp *sync_pb.CommitResponse, db datastore.Datastore, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
func handleCommitRequest(
ctx context.Context,
cache *cache.Cache,
commitMsg *sync_pb.CommitMessage,
commitRsp *sync_pb.CommitResponse,
db datastore.Datastore,
clientID string,
) (*sync_pb.SyncEnums_ErrorType, error) {
if commitMsg == nil {
return nil, errors.New("nil commitMsg is received")
}
Expand All @@ -249,7 +282,10 @@ func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *syn
// "Boost" the quota with the difference between the history quota and count,
// so users can start syncing other entities immediately, instead of waiting for the
// history TTL to get rid of the excess items.
boostedQuotaAddition = min(maxClientObjectQuota-maxClientHistoryObjectQuota, currentHistoryItemCount-maxClientHistoryObjectQuota)
boostedQuotaAddition = min(
maxClientObjectQuota-maxClientHistoryObjectQuota,
currentHistoryItemCount-maxClientHistoryObjectQuota,
)
}

commitRsp.Entryresponse = make([]*sync_pb.CommitResponse_EntryResponse, len(commitMsg.Entries))
Expand All @@ -266,7 +302,9 @@ func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *syn
if err != nil { // Can't unmarshal & marshal the message from PB into DB format
rspType := sync_pb.CommitResponse_INVALID_MESSAGE
entryRsp.ResponseType = &rspType
entryRsp.ErrorMessage = aws.String(fmt.Sprintf("Cannot convert protobuf sync entity to DB format: %v", err.Error()))
entryRsp.ErrorMessage = aws.String(
fmt.Sprintf("Cannot convert protobuf sync entity to DB format: %v", err.Error()),
)
continue
}

Expand All @@ -280,7 +318,8 @@ func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *syn

oldVersion := *entityToCommit.Version
isUpdateOp := oldVersion != 0
isHistoryRelatedItem := *entityToCommit.DataType == datastore.HistoryTypeID || *entityToCommit.DataType == datastore.HistoryDeleteDirectiveTypeID
isHistoryRelatedItem := *entityToCommit.DataType == datastore.HistoryTypeID ||
*entityToCommit.DataType == datastore.HistoryDeleteDirectiveTypeID
*entityToCommit.Version = *entityToCommit.Mtime
if *entityToCommit.DataType == datastore.HistoryTypeID {
// Check if item exists using client_unique_tag
Expand All @@ -299,7 +338,12 @@ func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *syn
if currentNormalItemCount+currentHistoryItemCount+newNormalCount+newHistoryCount >= maxClientObjectQuota+boostedQuotaAddition {
rspType := sync_pb.CommitResponse_OVER_QUOTA
entryRsp.ResponseType = &rspType
entryRsp.ErrorMessage = aws.String(fmt.Sprintf("There are already %v non-deleted objects in store", currentNormalItemCount+currentHistoryItemCount))
entryRsp.ErrorMessage = aws.String(
fmt.Sprintf(
"There are already %v non-deleted objects in store",
currentNormalItemCount+currentHistoryItemCount,
),
)
continue
}

Expand Down Expand Up @@ -397,7 +441,13 @@ func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *syn

// handleClearServerDataRequest handles clearing user data from the datastore and cache
// and fills the response
func handleClearServerDataRequest(ctx context.Context, cache *cache.Cache, db datastore.Datastore, _ *sync_pb.ClearServerDataMessage, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
func handleClearServerDataRequest(
ctx context.Context,
cache *cache.Cache,
db datastore.Datastore,
_ *sync_pb.ClearServerDataMessage,
clientID string,
) (*sync_pb.SyncEnums_ErrorType, error) {
errCode := sync_pb.SyncEnums_SUCCESS
var err error

Expand Down Expand Up @@ -435,7 +485,14 @@ func handleClearServerDataRequest(ctx context.Context, cache *cache.Cache, db da

// HandleClientToServerMessage handles the protobuf ClientToServerMessage and
// fills the protobuf ClientToServerResponse.
func HandleClientToServerMessage(ctx context.Context, cache *cache.Cache, pb *sync_pb.ClientToServerMessage, pbRsp *sync_pb.ClientToServerResponse, db datastore.Datastore, clientID string) error {
func HandleClientToServerMessage(
ctx context.Context,
cache *cache.Cache,
pb *sync_pb.ClientToServerMessage,
pbRsp *sync_pb.ClientToServerResponse,
db datastore.Datastore,
clientID string,
) error {
// Create ClientToServerResponse and fill general fields for both GU and
// Commit.
pbRsp.StoreBirthday = aws.String(storeBirthday)
Expand Down Expand Up @@ -477,7 +534,13 @@ func HandleClientToServerMessage(ctx context.Context, cache *cache.Cache, pb *sy
} else if *pb.MessageContents == sync_pb.ClientToServerMessage_CLEAR_SERVER_DATA {
csdRsp := &sync_pb.ClearServerDataResponse{}
pbRsp.ClearServerData = csdRsp
pbRsp.ErrorCode, err = handleClearServerDataRequest(context.Background(), cache, db, pb.ClearServerData, clientID)
pbRsp.ErrorCode, err = handleClearServerDataRequest(
context.Background(),
cache,
db,
pb.ClearServerData,
clientID,
)
if err != nil {
if pbRsp.ErrorCode != nil {
pbRsp.ErrorMessage = aws.String(err.Error())
Expand Down
23 changes: 20 additions & 3 deletions command/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ func (a PBSyncAttrsByName) Len() int { return len(a) }
func (a PBSyncAttrsByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a PBSyncAttrsByName) Less(i, j int) bool { return *a[i].Name < *a[j].Name }

func NewPBSyncAttrs(name *string, version *int64, deleted *bool, folder *bool, serverTag *string, specifics *sync_pb.EntitySpecifics) *PBSyncAttrs {
func NewPBSyncAttrs(
name *string,
version *int64,
deleted *bool,
folder *bool,
serverTag *string,
specifics *sync_pb.EntitySpecifics,
) *PBSyncAttrs {
return &PBSyncAttrs{
Name: name,
Version: version,
Expand Down Expand Up @@ -421,7 +428,12 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_DeviceLimitExceed
suite.Require().NoError(
command.HandleClientToServerMessage(context.Background(), suite.cache, commitMsg, commitRsp, suite.dynamo, testCase.clientID),
"Commit device info should succeed for device %d", i)
suite.Equal(sync_pb.SyncEnums_SUCCESS, *commitRsp.ErrorCode, "Commit device info should succeed for device %d", i)
suite.Equal(
sync_pb.SyncEnums_SUCCESS,
*commitRsp.ErrorCode,
"Commit device info should succeed for device %d",
i,
)
}

// should get THROTTLED error when device limit is exceeded
Expand Down Expand Up @@ -598,7 +610,12 @@ func (suite *CommandTestSuite) TestHandleClientToServerMessage_ReplaceParentIDTo
child3 := getCommitEntity("id_child3", 0, false, getBookmarkSpecifics())
child3.ParentIdString = aws.String("id_parent2")

updateChild0 := getCommitEntity(*rsp.Commit.Entryresponse[0].IdString, *rsp.Commit.Entryresponse[0].Version, false, getBookmarkSpecifics())
updateChild0 := getCommitEntity(
*rsp.Commit.Entryresponse[0].IdString,
*rsp.Commit.Entryresponse[0].Version,
false,
getBookmarkSpecifics(),
)
updateChild0.ParentIdString = aws.String("id_parent")

entries := []*sync_pb.SyncEntity{parent1, child1, parent2, child2, child3, updateChild0}
Expand Down
8 changes: 7 additions & 1 deletion command/server_defined_unique_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ const (
bookmarkBarTag string = "bookmark_bar"
)

func createServerDefinedUniqueEntity(name string, serverDefinedTag string, clientID string, parentID string, specifics *sync_pb.EntitySpecifics) (*datastore.SyncEntity, error) {
func createServerDefinedUniqueEntity(
name string,
serverDefinedTag string,
clientID string,
parentID string,
specifics *sync_pb.EntitySpecifics,
) (*datastore.SyncEntity, error) {
now := time.Now().UnixMilli()
deleted := false
folder := true
Expand Down
16 changes: 14 additions & 2 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,25 @@ type Datastore interface {
// client token for a given client. Besides the array of sync entities, a
// boolean value indicating whether there are more updates to query in the
// next batch is returned.
GetUpdatesForType(ctx context.Context, dataType int, clientToken int64, fetchFolders bool, clientID string, maxSize int64) (bool, []SyncEntity, error)
GetUpdatesForType(
ctx context.Context,
dataType int,
clientToken int64,
fetchFolders bool,
clientID string,
maxSize int32,
) (bool, []SyncEntity, error)
// Check if a server-defined unique tag is in the datastore.
HasServerDefinedUniqueTag(ctx context.Context, clientID string, tag string) (bool, error)
// Get the count of sync items for a client.
GetClientItemCount(ctx context.Context, clientID string) (*ClientItemCounts, error)
// Update the count of sync items for a client.
UpdateClientItemCount(ctx context.Context, counts *ClientItemCounts, newNormalItemCount int, newHistoryItemCount int) error
UpdateClientItemCount(
ctx context.Context,
counts *ClientItemCounts,
newNormalItemCount int,
newHistoryItemCount int,
) error
// ClearServerData deletes all items for a given clientID
ClearServerData(ctx context.Context, clientID string) ([]SyncEntity, error)
// DisableSyncChain marks a chain as disabled so no further updates or commits can happen
Expand Down
22 changes: 19 additions & 3 deletions datastore/datastoretest/mock_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,24 @@ func (m *MockDatastore) InsertSyncEntitiesWithServerTags(ctx context.Context, en
}

// UpdateSyncEntity mocks calls to UpdateSyncEntity
func (m *MockDatastore) UpdateSyncEntity(ctx context.Context, entity *datastore.SyncEntity, oldVersion int64) (conflict bool, deleted bool, err error) {
func (m *MockDatastore) UpdateSyncEntity(
ctx context.Context,
entity *datastore.SyncEntity,
oldVersion int64,
) (conflict bool, deleted bool, err error) {
args := m.Called(ctx, entity, oldVersion)
return args.Bool(0), args.Bool(1), args.Error(2)
}

// GetUpdatesForType mocks calls to GetUpdatesForType
func (m *MockDatastore) GetUpdatesForType(ctx context.Context, dataType int, clientToken int64, fetchFolders bool, clientID string, maxSize int64) (bool, []datastore.SyncEntity, error) {
func (m *MockDatastore) GetUpdatesForType(
ctx context.Context,
dataType int,
clientToken int64,
fetchFolders bool,
clientID string,
maxSize int32,
) (bool, []datastore.SyncEntity, error) {
args := m.Called(ctx, dataType, clientToken, fetchFolders, clientID, maxSize)
return args.Bool(0), args.Get(1).([]datastore.SyncEntity), args.Error(2)
}
Expand All @@ -55,7 +66,12 @@ func (m *MockDatastore) GetClientItemCount(ctx context.Context, clientID string)
}

// UpdateClientItemCount mocks calls to UpdateClientItemCount
func (m *MockDatastore) UpdateClientItemCount(ctx context.Context, counts *datastore.ClientItemCounts, newNormalItemCount int, newHistoryItemCount int) error {
func (m *MockDatastore) UpdateClientItemCount(
ctx context.Context,
counts *datastore.ClientItemCounts,
newNormalItemCount int,
newHistoryItemCount int,
) error {
args := m.Called(ctx, counts, newNormalItemCount, newHistoryItemCount)
return args.Error(0)
}
Expand Down
2 changes: 1 addition & 1 deletion datastore/instrumented_datastore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions datastore/item_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func (dynamo *Dynamo) initRealCountsAndUpdateHistoryCounts(ctx context.Context,
// Query the DB to get updated counts
pkCond := expression.Key(clientIDDataTypeMtimeIdxPk).Equal(expression.Value(counts.ClientID))
filterCond := expression.And(
expression.Name(dataTypeAttrName).In(expression.Value(HistoryTypeID), expression.Value(HistoryDeleteDirectiveTypeID)),
expression.Name(dataTypeAttrName).
In(expression.Value(HistoryTypeID), expression.Value(HistoryDeleteDirectiveTypeID)),
expression.Name(deletedAttrName).Equal(expression.Value(false)),
)
expr, err := expression.NewBuilder().WithKeyCondition(pkCond).WithFilter(filterCond).Build()
Expand Down Expand Up @@ -166,7 +167,12 @@ func (dynamo *Dynamo) GetClientItemCount(ctx context.Context, clientID string) (

// UpdateClientItemCount updates the count of non-deleted sync items for a
// given client stored in the dynamoDB.
func (dynamo *Dynamo) UpdateClientItemCount(ctx context.Context, counts *ClientItemCounts, newNormalItemCount int, newHistoryItemCount int) error {
func (dynamo *Dynamo) UpdateClientItemCount(
ctx context.Context,
counts *ClientItemCounts,
newNormalItemCount int,
newHistoryItemCount int,
) error {
counts.HistoryItemCountPeriod4 += newHistoryItemCount
counts.ItemCount += newNormalItemCount

Expand Down
6 changes: 5 additions & 1 deletion datastore/item_count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func (suite *ItemCountTestSuite) TestGetClientItemCount() {
{ClientID: "client2", ID: "client2", ItemCount: 10},
}
for _, item := range items {
existing := datastore.ClientItemCounts{ClientID: item.ClientID, ID: item.ID, Version: datastore.CurrentCountVersion}
existing := datastore.ClientItemCounts{
ClientID: item.ClientID,
ID: item.ID,
Version: datastore.CurrentCountVersion,
}
suite.Require().NoError(
suite.dynamo.UpdateClientItemCount(context.Background(), &existing, item.ItemCount, 0))
}
Expand Down
Loading