Skip to content

Commit

Permalink
Add keyset pagination to find projects
Browse files Browse the repository at this point in the history
* update housekeeping test not to set project id to zero value
  • Loading branch information
tedkimdev committed Aug 2, 2023
1 parent 5a768c2 commit 2b58225
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 32 deletions.
2 changes: 1 addition & 1 deletion server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type Database interface {
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
projectPage *int,
housekeepingLastProjectID *types.ID,
) ([]*ClientInfo, error)

// FindDocInfoByKey finds the document of the given key.
Expand Down
38 changes: 25 additions & 13 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package memory

import (
"bytes"
"context"
"fmt"
gotime "time"
Expand Down Expand Up @@ -228,12 +229,11 @@ func (d *DB) CreateProjectInfo(
func (d *DB) listProjectInfos(
ctx context.Context,
pageSize int,
page *int,
housekeepingLastProjectID *types.ID,
) ([]*database.ProjectInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

offset := (*page) * pageSize
iter, err := txn.Get(
tblProjects,
"id",
Expand All @@ -242,23 +242,35 @@ func (d *DB) listProjectInfos(
return nil, fmt.Errorf("fetch all projects: %w", err)
}

var infos []*database.ProjectInfo

for i := 0; i < offset; i++ {
iter.Next()
lastIDBytes, err := housekeepingLastProjectID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode last project id: %w", err)
}

for i := 0; i < pageSize; i++ {
var infos []*database.ProjectInfo
for i := 0; i < pageSize; {
raw := iter.Next()
if raw == nil {
*page = 0
*housekeepingLastProjectID = database.DefaultProjectID
break
}

info := raw.(*database.ProjectInfo).DeepCopy()
infos = append(infos, info)
}

*page++
idBytes, err := info.ID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode project id: %w", err)
}

if bytes.Compare(idBytes, lastIDBytes) > 0 {
infos = append(infos, info)
i++
}

if i == pageSize {
*housekeepingLastProjectID = infos[len(infos)-1].ID
}
}
return infos, nil
}

Expand Down Expand Up @@ -611,9 +623,9 @@ func (d *DB) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
projectPage *int,
housekeepingLastProjectID *types.ID,
) ([]*database.ClientInfo, error) {
projects, err := d.listProjectInfos(ctx, projectFetchSize, projectPage)
projects, err := d.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
}
Expand Down
10 changes: 7 additions & 3 deletions server/backend/database/memory/housekeeping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
monkey "github.com/undefinedlabs/go-mpatch"

"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/database/memory"
)

Expand All @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) {
ctx := context.Background()

clientDeactivateThreshold := "23h"
_, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold)

userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)
project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

yesterday := gotime.Now().Add(-24 * gotime.Hour)
Expand All @@ -59,12 +63,12 @@ func TestHousekeeping(t *testing.T) {
clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name()))
assert.NoError(t, err)

page := 0
housekeepingLastProjectID := database.DefaultProjectID
candidates, err := memdb.FindDeactivateCandidates(
ctx,
10,
10,
&page,
&housekeepingLastProjectID,
)
assert.NoError(t, err)
assert.Len(t, candidates, 2)
Expand Down
26 changes: 17 additions & 9 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,21 @@ func (c *Client) CreateProjectInfo(
func (c *Client) listProjectInfos(
ctx context.Context,
pageSize int,
page *int,
housekeepingLastProjectID *types.ID,
) ([]*database.ProjectInfo, error) {
encodedID, err := encodeID(*housekeepingLastProjectID)
if err != nil {
return nil, err
}

opts := options.Find()
opts.SetSkip(int64((*page) * pageSize))
opts.SetLimit(int64(pageSize))

cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}}, opts)
cursor, err := c.collection(colProjects).Find(ctx, bson.M{
"_id": bson.M{
"$gt": encodedID,
},
}, opts)
if err != nil {
return nil, fmt.Errorf("find project infos: %w", err)
}
Expand All @@ -255,12 +263,12 @@ func (c *Client) listProjectInfos(
return nil, fmt.Errorf("fetch project infos: %w", err)
}

isLastPage := len(infos) < pageSize
if isLastPage {
*page = 0
if len(infos) < pageSize {
*housekeepingLastProjectID = database.DefaultProjectID
} else if len(infos) > 0 {
*housekeepingLastProjectID = infos[len(infos)-1].ID
}

*page++
return infos, nil
}

Expand Down Expand Up @@ -667,9 +675,9 @@ func (c *Client) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
projectPage *int,
housekeepingLastProjectID *types.ID,
) ([]*database.ClientInfo, error) {
projects, err := c.listProjectInfos(ctx, projectFetchSize, projectPage)
projects, err := c.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions server/backend/housekeeping/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/clients"
Expand Down Expand Up @@ -107,11 +108,11 @@ func (h *Housekeeping) Stop() error {

// run is the housekeeping loop.
func (h *Housekeeping) run() {
housekeepingProjectPage := 0
housekeepingLastProjectID := database.DefaultProjectID

for {
ctx := context.Background()
if err := h.deactivateCandidates(ctx, &housekeepingProjectPage); err != nil {
if err := h.deactivateCandidates(ctx, &housekeepingLastProjectID); err != nil {
continue
}

Expand All @@ -124,7 +125,7 @@ func (h *Housekeeping) run() {
}

// deactivateCandidates deactivates candidates.
func (h *Housekeeping) deactivateCandidates(ctx context.Context, page *int) error {
func (h *Housekeeping) deactivateCandidates(ctx context.Context, housekeepingLastProjectID *types.ID) error {
start := time.Now()
locker, err := h.coordinator.NewLocker(ctx, deactivateCandidatesKey)
if err != nil {
Expand All @@ -145,7 +146,7 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context, page *int) erro
ctx,
h.candidatesLimitPerProject,
h.projectFetchSize,
page,
housekeepingLastProjectID,
)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions server/rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func TestMain(m *testing.M) {
ConnectionTimeout: helper.MongoConnectionTimeout,
PingTimeout: helper.MongoPingTimeout,
}, &housekeeping.Config{
Interval: helper.HousekeepingInterval.String(),
CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject,
Interval: helper.HousekeepingInterval.String(),
CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject,
HousekeepingProjectFetchSize: helper.HousekeepingProjectFetchSize,
}, met)
if err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 2b58225

Please sign in to comment.