Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,16 @@ import (

var (
// Could be modified in tests.
maxGUBatchSize = 500
maxClientObjectQuota = 85000
// Original:
// maxGUBatchSize = 500
// maxClientObjectQuota = 85000

// Test with small values
maxGUBatchSize = 5
maxClientObjectQuota = 30
// new
clientObjectsToStartCleanup = 20
historyObjectsToCleanup = 5
)

const (
Expand Down Expand Up @@ -214,6 +222,20 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
return &errCode, fmt.Errorf("error getting client's item count: %w", err)
}

deletedHistoryEntriesCount := 0

if itemCount >= clientObjectsToStartCleanup {
_, historySyncEntities, err3 :=
db.GetUpdatesForType(historyTypeID, 0/*clientToken*/, false/*fetchFolders*/, clientID, int64(historyObjectsToCleanup)/*maxSize int64*/)
if err3 == nil && len(historySyncEntities) > 0 {
err4 := db.DeleteThese(historySyncEntities)
if err4 == nil {
deletedHistoryEntriesCount = len(historySyncEntities);
itemCount -= deletedHistoryEntriesCount
}
}
}

commitRsp.Entryresponse = make([]*sync_pb.CommitResponse_EntryResponse, len(commitMsg.Entries))

// Map client-generated ID to its server-generated ID.
Expand Down Expand Up @@ -315,7 +337,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
cache.SetTypeMtime(context.Background(), clientID, dataType, mtime)
}

err = db.UpdateClientItemCount(clientID, count)
err = db.UpdateClientItemCount(clientID, count - deletedHistoryEntriesCount)
if err != nil {
// We only impose a soft quota limit on the item count for each client, so
// we only log the error without further actions here. The reason of this
Expand Down
2 changes: 2 additions & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Datastore interface {
GetUpdatesForType(dataType int, clientToken int64, fetchFolders bool, clientID string, maxSize int64) (bool, []SyncEntity, error)
// Check if a server-defined unique tag is in the datastore.
HasServerDefinedUniqueTag(clientID string, tag string) (bool, error)
// Deletes some entries
DeleteThese(entities []SyncEntity) error;
// Get the count of sync items for a client.
GetClientItemCount(clientID string) (int, error)
// Update the count of sync items for a client.
Expand Down
6 changes: 6 additions & 0 deletions datastore/datastoretest/mock_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ func (m *MockDatastore) HasItem(clientID string, ID string) (bool, error) {
return args.Bool(0), args.Error(1)
}

// DeleteThese mocks calls to DeleteThese
func (m *MockDatastore) DeleteThese(entities []datastore.SyncEntity) error {
args := m.Called(entities)
return args.Error(0)
}

// GetClientItemCount mocks calls to GetClientItemCount
func (m *MockDatastore) GetClientItemCount(clientID string) (int, error) {
args := m.Called(clientID)
Expand Down
14 changes: 14 additions & 0 deletions datastore/instrumented_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ func (_d DatastoreWithPrometheus) DisableSyncChain(clientID string) (err error)
return _d.base.DisableSyncChain(clientID)
}

// DeleteThese implements Datastore
func (_d DatastoreWithPrometheus) DeleteThese(entities []SyncEntity) (err error) {
_since := time.Now()
defer func() {
result := "ok"
if err != nil {
result = "error"
}

datastoreDurationSummaryVec.WithLabelValues(_d.instanceName, "DeleteThese", result).Observe(time.Since(_since).Seconds())
}()
return _d.base.DeleteThese(entities)
}

// GetClientItemCount implements Datastore
func (_d DatastoreWithPrometheus) GetClientItemCount(clientID string) (i1 int, err error) {
_since := time.Now()
Expand Down
63 changes: 63 additions & 0 deletions datastore/sync_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,69 @@ func (dynamo *Dynamo) ClearServerData(clientID string) ([]SyncEntity, error) {
return syncEntities, nil
}

// Code is taken mostly from ClearServerData function
func (dynamo *Dynamo) DeleteThese(entities []SyncEntity) error {
if len(entities) == 0 {
return fmt.Errorf("error deleting sync entities - none requested")
}

items := []*dynamodb.TransactWriteItem{}

for _, item := range entities {
// Fail delete if race condition detected (modified time has changed).
if item.Version != nil {
cond := expression.Name("Mtime").Equal(expression.Value(*item.Mtime))
expr, err := expression.NewBuilder().WithCondition(cond).Build()
if err != nil {
return fmt.Errorf("error deleting sync entities for client <???>: %w", err)
}

writeItem := dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
TableName: aws.String(Table),
Key: map[string]*dynamodb.AttributeValue{
pk: {
S: aws.String(item.ClientID),
},
sk: {
S: aws.String(item.ID),
},
},
},
}

items = append(items, &writeItem)
} else {
// If row doesn't hold Mtime, delete as usual.
writeItem := dynamodb.TransactWriteItem{
Delete: &dynamodb.Delete{
TableName: aws.String(Table),
Key: map[string]*dynamodb.AttributeValue{
pk: {
S: aws.String(item.ClientID),
},
sk: {
S: aws.String(item.ID),
},
},
},
}

items = append(items, &writeItem)
}
}
_, err := dynamo.TransactWriteItems(&dynamodb.TransactWriteItemsInput{TransactItems: items})

if err != nil {
return fmt.Errorf("error deleting sync entities for client <???>: %w", err)
}

return nil
}

// IsSyncChainDisabled checks whether a given sync chain has been deleted
func (dynamo *Dynamo) IsSyncChainDisabled(clientID string) (bool, error) {
key, err := dynamodbattribute.MarshalMap(DisabledMarkerItemQuery{
Expand Down