Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions cache/instrumented_redis.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 36 additions & 36 deletions command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ const (
// handleGetUpdatesRequest handles GetUpdatesMessage and fills
// GetUpdatesResponse. Target sync entities in the database will be updated or
// deleted based on the client's requests.
func handleGetUpdatesRequest(cache *cache.Cache, guMsg *sync_pb.GetUpdatesMessage, guRsp *sync_pb.GetUpdatesResponse, db datastore.Datastore, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
func handleGetUpdatesRequest(ctx context.Context, cache *cache.Cache, guMsg *sync_pb.GetUpdatesMessage, guRsp *sync_pb.GetUpdatesResponse, db datastore.Datastore, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
errCode := sync_pb.SyncEnums_SUCCESS // default value, might be changed later
isNewClient := guMsg.GetUpdatesOrigin != nil && *guMsg.GetUpdatesOrigin == sync_pb.SyncEnums_NEW_CLIENT
isPoll := guMsg.GetUpdatesOrigin != nil && *guMsg.GetUpdatesOrigin == sync_pb.SyncEnums_PERIODIC
if isNewClient {
// Reject the request if client has >= 50 devices in the chain.
activeDevices := 0
for {
hasChangesRemaining, syncEntities, err := db.GetUpdatesForType(deviceInfoTypeID, 0, false, clientID, int64(maxGUBatchSize))
hasChangesRemaining, syncEntities, err := db.GetUpdatesForType(ctx, deviceInfoTypeID, 0, false, clientID, int64(maxGUBatchSize))
if err != nil {
log.Error().Err(err).Msgf("db.GetUpdatesForType failed for type %v", deviceInfoTypeID)
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand All @@ -69,7 +69,7 @@ func handleGetUpdatesRequest(cache *cache.Cache, guMsg *sync_pb.GetUpdatesMessag
}

// Insert initial records if needed.
err := InsertServerDefinedUniqueEntities(db, clientID)
err := InsertServerDefinedUniqueEntities(ctx, db, clientID)
if err != nil {
log.Error().Err(err).Msg("Create server defined unique entities failed")
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand Down Expand Up @@ -131,12 +131,12 @@ func handleGetUpdatesRequest(cache *cache.Cache, guMsg *sync_pb.GetUpdatesMessag

// Check cache to short circuit with 0 updates for polling requests.
if isPoll &&
!cache.IsTypeMtimeUpdated(context.Background(), clientID, int(*fromProgressMarker.DataTypeId), token) {
!cache.IsTypeMtimeUpdated(ctx, clientID, int(*fromProgressMarker.DataTypeId), token) {
continue
}

curMaxSize := int64(maxSize) - int64(len(guRsp.Entries))
hasChangesRemaining, entities, err := db.GetUpdatesForType(int(*fromProgressMarker.DataTypeId), token, fetchFolders, clientID, curMaxSize)
hasChangesRemaining, entities, err := db.GetUpdatesForType(ctx, int(*fromProgressMarker.DataTypeId), token, fetchFolders, clientID, curMaxSize)
if err != nil {
log.Error().Err(err).Msgf("db.GetUpdatesForType failed for type %v", *fromProgressMarker.DataTypeId)
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand Down Expand Up @@ -188,31 +188,31 @@ func handleGetUpdatesRequest(cache *cache.Cache, guMsg *sync_pb.GetUpdatesMessag
} else {
mtime = *entities[j-1].Mtime
}
cache.SetTypeMtime(context.Background(), clientID, int(*fromProgressMarker.DataTypeId), mtime)
cache.SetTypeMtime(ctx, clientID, int(*fromProgressMarker.DataTypeId), mtime)
}
}

return &errCode, nil
}

func getItemCounts(cache *cache.Cache, db datastore.Datastore, clientID string) (*datastore.ClientItemCounts, int, int, error) {
itemCounts, err := db.GetClientItemCount(clientID)
func getItemCounts(ctx context.Context, cache *cache.Cache, db datastore.Datastore, clientID string) (*datastore.ClientItemCounts, int, int, error) {
itemCounts, err := db.GetClientItemCount(ctx, clientID)
if err != nil {
return nil, 0, 0, err
}
newNormalCount, newHistoryCount, err := getInterimItemCounts(cache, clientID, false)
newNormalCount, newHistoryCount, err := getInterimItemCounts(ctx, cache, clientID, false)
if err != nil {
return nil, 0, 0, err
}
return itemCounts, newNormalCount, newHistoryCount, nil
}

func getInterimItemCounts(cache *cache.Cache, clientID string, clearCache bool) (int, int, error) {
newNormalCount, err := cache.GetInterimCount(context.Background(), clientID, normalCountTypeStr, clearCache)
func getInterimItemCounts(ctx context.Context, cache *cache.Cache, clientID string, clearCache bool) (int, int, error) {
newNormalCount, err := cache.GetInterimCount(ctx, clientID, normalCountTypeStr, clearCache)
if err != nil {
return 0, 0, err
}
newHistoryCount, err := cache.GetInterimCount(context.Background(), clientID, historyCountTypeStr, clearCache)
newHistoryCount, err := cache.GetInterimCount(ctx, clientID, historyCountTypeStr, clearCache)
if err != nil {
return 0, 0, err
}
Expand All @@ -223,7 +223,7 @@ func getInterimItemCounts(cache *cache.Cache, clientID string, clearCache bool)
// For each commit entry:
// - new sync entity is created and inserted into the database if version is 0.
// - existed sync entity will be updated if version is greater than 0.
func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, commitRsp *sync_pb.CommitResponse, db datastore.Datastore, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
func handleCommitRequest(ctx context.Context, cache *cache.Cache, commitMsg *sync_pb.CommitMessage, commitRsp *sync_pb.CommitResponse, db datastore.Datastore, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
if commitMsg == nil {
return nil, errors.New("nil commitMsg is received")
}
Expand All @@ -233,7 +233,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
return &errCode, nil
}

itemCounts, newNormalCount, newHistoryCount, err := getItemCounts(cache, db, clientID)
itemCounts, newNormalCount, newHistoryCount, err := getItemCounts(ctx, cache, db, clientID)
if err != nil {
log.Error().Err(err).Msg("Get client's item count failed")
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand All @@ -259,7 +259,6 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
// Map to save commit data type ID & mtime
typeMtimeMap := make(map[int]int64)
for i, v := range commitMsg.Entries {
var conflict, deleted bool
entryRsp := &sync_pb.CommitResponse_EntryResponse{}
commitRsp.Entryresponse[i] = entryRsp

Expand All @@ -285,7 +284,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
*entityToCommit.Version = *entityToCommit.Mtime
if *entityToCommit.DataType == datastore.HistoryTypeID {
// Check if item exists using client_unique_tag
isUpdateOp, err = db.HasItem(clientID, *entityToCommit.ClientDefinedUniqueTag)
isUpdateOp, err = db.HasItem(ctx, clientID, *entityToCommit.ClientDefinedUniqueTag)
if err != nil {
log.Error().Err(err).Msg("Insert history sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
Expand All @@ -295,6 +294,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
}
}

var interimErr error
if !isUpdateOp { // Create
if currentNormalItemCount+currentHistoryItemCount+newNormalCount+newHistoryCount >= maxClientObjectQuota+boostedQuotaAddition {
rspType := sync_pb.CommitResponse_OVER_QUOTA
Expand All @@ -307,7 +307,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
// Insert all non-history items. For history items, ignore any items above history quoto
// and lie to the client about the objects being synced instead of returning OVER_QUOTA
// so the client can continue to sync other entities.
conflict, err = db.InsertSyncEntity(entityToCommit)
conflict, err := db.InsertSyncEntity(ctx, entityToCommit)
if err != nil {
log.Error().Err(err).Msg("Insert sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
Expand All @@ -326,13 +326,13 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
}

if isHistoryRelatedItem {
newHistoryCount, err = cache.IncrementInterimCount(context.Background(), clientID, historyCountTypeStr, false)
newHistoryCount, interimErr = cache.IncrementInterimCount(ctx, clientID, historyCountTypeStr, false)
} else {
newNormalCount, err = cache.IncrementInterimCount(context.Background(), clientID, normalCountTypeStr, false)
newNormalCount, interimErr = cache.IncrementInterimCount(ctx, clientID, normalCountTypeStr, false)
}
}
} else { // Update
conflict, deleted, err = db.UpdateSyncEntity(entityToCommit, oldVersion)
conflict, deleted, err := db.UpdateSyncEntity(ctx, entityToCommit, oldVersion)
if err != nil {
log.Error().Err(err).Msg("Update sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
Expand All @@ -347,16 +347,16 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
}
if deleted {
if isHistoryRelatedItem {
newHistoryCount, err = cache.IncrementInterimCount(context.Background(), clientID, historyCountTypeStr, true)
newHistoryCount, interimErr = cache.IncrementInterimCount(ctx, clientID, historyCountTypeStr, true)
} else {
newNormalCount, err = cache.IncrementInterimCount(context.Background(), clientID, normalCountTypeStr, true)
newNormalCount, interimErr = cache.IncrementInterimCount(ctx, clientID, normalCountTypeStr, true)
}
}
}
if err != nil {
log.Error().Err(err).Msg("Interim count update failed")
if interimErr != nil {
log.Error().Err(interimErr).Msg("Interim count update failed")
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
return &errCode, fmt.Errorf("interim count update failed: %w", err)
return &errCode, fmt.Errorf("interim count update failed: %w", interimErr)
}

typeMtimeMap[*entityToCommit.DataType] = *entityToCommit.Mtime
Expand All @@ -368,7 +368,7 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
entryRsp.Mtime = entityToCommit.Mtime
}

newNormalCount, newHistoryCount, err = getInterimItemCounts(cache, clientID, true)
newNormalCount, newHistoryCount, err = getInterimItemCounts(ctx, cache, clientID, true)
if err != nil {
log.Error().Err(err).Msg("Get interim item counts failed")
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand All @@ -377,10 +377,10 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c

// Save (clientID#dataType, mtime) into cache after writing into DB.
for dataType, mtime := range typeMtimeMap {
cache.SetTypeMtime(context.Background(), clientID, dataType, mtime)
cache.SetTypeMtime(ctx, clientID, dataType, mtime)
}

err = db.UpdateClientItemCount(itemCounts, newNormalCount, newHistoryCount)
err = db.UpdateClientItemCount(ctx, itemCounts, newNormalCount, newHistoryCount)
if err != nil {
// We only impose a soft quota limit on the item count for each client, so
// we only log the error without further actions here. The reason of this
Expand All @@ -397,18 +397,18 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c

// handleClearServerDataRequest handles clearing user data from the datastore and cache
// and fills the response
func handleClearServerDataRequest(cache *cache.Cache, db datastore.Datastore, _ *sync_pb.ClearServerDataMessage, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
func handleClearServerDataRequest(ctx context.Context, cache *cache.Cache, db datastore.Datastore, _ *sync_pb.ClearServerDataMessage, clientID string) (*sync_pb.SyncEnums_ErrorType, error) {
errCode := sync_pb.SyncEnums_SUCCESS
var err error

err = db.DisableSyncChain(clientID)
err = db.DisableSyncChain(ctx, clientID)
if err != nil {
log.Error().Err(err).Msg("Failed to disable sync chain")
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
return &errCode, err
}

syncEntities, err := db.ClearServerData(clientID)
syncEntities, err := db.ClearServerData(ctx, clientID)
if err != nil {
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
return &errCode, err
Expand All @@ -422,7 +422,7 @@ func handleClearServerDataRequest(cache *cache.Cache, db datastore.Datastore, _
}

if len(typeMtimeCacheKeys) > 0 {
err = cache.Del(context.Background(), typeMtimeCacheKeys...)
err = cache.Del(ctx, typeMtimeCacheKeys...)
if err != nil {
log.Error().Err(err).Msg("Failed to clear cache")
errCode = sync_pb.SyncEnums_TRANSIENT_ERROR
Expand All @@ -435,7 +435,7 @@ func handleClearServerDataRequest(cache *cache.Cache, db datastore.Datastore, _

// HandleClientToServerMessage handles the protobuf ClientToServerMessage and
// fills the protobuf ClientToServerResponse.
func HandleClientToServerMessage(cache *cache.Cache, pb *sync_pb.ClientToServerMessage, pbRsp *sync_pb.ClientToServerResponse, db datastore.Datastore, clientID string) error {
func HandleClientToServerMessage(ctx context.Context, cache *cache.Cache, pb *sync_pb.ClientToServerMessage, pbRsp *sync_pb.ClientToServerResponse, db datastore.Datastore, clientID string) error {
// Create ClientToServerResponse and fill general fields for both GU and
// Commit.
pbRsp.StoreBirthday = aws.String(storeBirthday)
Expand All @@ -449,7 +449,7 @@ func HandleClientToServerMessage(cache *cache.Cache, pb *sync_pb.ClientToServerM
} else if *pb.MessageContents == sync_pb.ClientToServerMessage_GET_UPDATES {
guRsp := &sync_pb.GetUpdatesResponse{}
pbRsp.GetUpdates = guRsp
pbRsp.ErrorCode, err = handleGetUpdatesRequest(cache, pb.GetUpdates, guRsp, db, clientID)
pbRsp.ErrorCode, err = handleGetUpdatesRequest(ctx, cache, pb.GetUpdates, guRsp, db, clientID)
if err != nil {
if pbRsp.ErrorCode != nil {
pbRsp.ErrorMessage = aws.String(err.Error())
Expand All @@ -463,7 +463,7 @@ func HandleClientToServerMessage(cache *cache.Cache, pb *sync_pb.ClientToServerM
} else if *pb.MessageContents == sync_pb.ClientToServerMessage_COMMIT {
commitRsp := &sync_pb.CommitResponse{}
pbRsp.Commit = commitRsp
pbRsp.ErrorCode, err = handleCommitRequest(cache, pb.Commit, commitRsp, db, clientID)
pbRsp.ErrorCode, err = handleCommitRequest(context.TODO(), cache, pb.Commit, commitRsp, db, clientID)
if err != nil {
if pbRsp.ErrorCode != nil {
pbRsp.ErrorMessage = aws.String(err.Error())
Expand All @@ -477,7 +477,7 @@ func HandleClientToServerMessage(cache *cache.Cache, pb *sync_pb.ClientToServerM
} else if *pb.MessageContents == sync_pb.ClientToServerMessage_CLEAR_SERVER_DATA {
csdRsp := &sync_pb.ClearServerDataResponse{}
pbRsp.ClearServerData = csdRsp
pbRsp.ErrorCode, err = handleClearServerDataRequest(cache, db, pb.ClearServerData, clientID)
pbRsp.ErrorCode, err = handleClearServerDataRequest(context.Background(), cache, db, pb.ClearServerData, clientID)
if err != nil {
if pbRsp.ErrorCode != nil {
pbRsp.ErrorMessage = aws.String(err.Error())
Expand Down
Loading
Loading