Skip to content

Commit

Permalink
Add Pagination to Listing Projects for Housekeeping (yorkie-team#587)
Browse files Browse the repository at this point in the history
* Add Pagination to Listing Projects for Housekeeping

To avoid all project info being loaded from DB, pagination is added for projects and perform housekeeping in smaller portions.

* Add HousekeepingProjectFetchSize in config

Add housekeeping project fetch size option to the CLI flag

* set project fetch size to 100 in config.sample.yml

Move housekeepingProjectPage variable into the Housekeeping.run function

Add keyset pagination to find projects

* update housekeeping test not to set project id to zero value
  • Loading branch information
tedkimdev authored and Wu22e committed Sep 3, 2023
1 parent 367b388 commit e6c24f7
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 59 deletions.
6 changes: 6 additions & 0 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ func init() {
server.DefaultHousekeepingCandidatesLimitPerProject,
"candidates limit per project for a single housekeeping run",
)
cmd.Flags().IntVar(
&conf.Housekeeping.ProjectFetchSize,
"housekeeping-project-fetch-size",
server.DefaultHousekeepingProjectFetchSize,
"housekeeping project fetch size for a single housekeeping run",
)
cmd.Flags().StringVar(
&mongoConnectionURI,
"mongo-connection-uri",
Expand Down
4 changes: 3 additions & 1 deletion server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ type Database interface {
FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
) ([]*ClientInfo, error)
projectFetchSize int,
lastProjectID types.ID,
) (types.ID, []*ClientInfo, error)

// FindDocInfoByKey finds the document of the given key.
FindDocInfoByKey(
Expand Down
46 changes: 33 additions & 13 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,38 @@ func (d *DB) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (d *DB) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (d *DB) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID types.ID,
) ([]*database.ProjectInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

// TODO(krapie): txn.Get() loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
iter, err := txn.Get(
iter, err := txn.LowerBound(
tblProjects,
"id",
housekeepingLastProjectID.String(),
)
if err != nil {
return nil, fmt.Errorf("fetch all projects: %w", err)
return nil, fmt.Errorf("fetch projects: %w", err)
}

var infos []*database.ProjectInfo
for raw := iter.Next(); raw != nil; raw = iter.Next() {

for i := 0; i < pageSize; i++ {
raw := iter.Next()
if raw == nil {
break
}
info := raw.(*database.ProjectInfo).DeepCopy()

if i == 0 && info.ID == housekeepingLastProjectID {
pageSize++
continue
}

infos = append(infos, info)
}

Expand Down Expand Up @@ -599,23 +610,32 @@ func (d *DB) findDeactivateCandidatesPerProject(
func (d *DB) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
) ([]*database.ClientInfo, error) {
projects, err := d.listAllProjectInfos(ctx)
projectFetchSize int,
lastProjectID types.ID,
) (types.ID, []*database.ClientInfo, error) {
projects, err := d.listProjectInfos(ctx, projectFetchSize, lastProjectID)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

var candidates []*database.ClientInfo
for _, project := range projects {
infos, err := d.findDeactivateCandidatesPerProject(ctx, project, candidatesLimitPerProject)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

candidates = append(candidates, infos...)
}

return candidates, nil
var topProjectID types.ID
if len(projects) < projectFetchSize {
topProjectID = database.DefaultProjectID
} else {
topProjectID = projects[len(projects)-1].ID
}

return topProjectID, candidates, nil
}

// FindDocInfoByKeyAndOwner finds the document of the given key. If the
Expand Down
65 changes: 63 additions & 2 deletions server/backend/database/memory/housekeeping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
monkey "github.com/undefinedlabs/go-mpatch"

"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/database/memory"
)

Expand All @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) {
ctx := context.Background()

clientDeactivateThreshold := "23h"
_, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold)

userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)
project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

yesterday := gotime.Now().Add(-24 * gotime.Hour)
Expand All @@ -59,14 +63,71 @@ func TestHousekeeping(t *testing.T) {
clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name()))
assert.NoError(t, err)

candidates, err := memdb.FindDeactivateCandidates(
_, candidates, err := memdb.FindDeactivateCandidates(
ctx,
10,
10,
database.DefaultProjectID,
)
assert.NoError(t, err)
assert.Len(t, candidates, 2)
assert.Contains(t, candidates, clientA)
assert.Contains(t, candidates, clientB)
assert.NotContains(t, candidates, clientC)
})

t.Run("housekeeping pagination test", func(t *testing.T) {
ctx := context.Background()
memdb, projects := createDBandProjects(t)

fetchSize := 4
lastProjectID, _, err := memdb.FindDeactivateCandidates(
ctx,
0,
fetchSize,
database.DefaultProjectID,
)
assert.NoError(t, err)
assert.Equal(t, projects[fetchSize-1].ID, lastProjectID)

lastProjectID, _, err = memdb.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, projects[fetchSize*2-1].ID, lastProjectID)

lastProjectID, _, err = memdb.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, database.DefaultProjectID, lastProjectID)
})
}

func createDBandProjects(t *testing.T) (*memory.DB, []*database.ProjectInfo) {
t.Helper()

ctx := context.Background()
memdb, err := memory.New()
assert.NoError(t, err)

clientDeactivateThreshold := "23h"
userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)

projects := make([]*database.ProjectInfo, 0)
for i := 0; i < 10; i++ {
p, err := memdb.CreateProjectInfo(ctx, fmt.Sprintf("%d project", i), userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

projects = append(projects, p)
}

return memdb, projects
}
45 changes: 32 additions & 13 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,32 @@ func (c *Client) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (c *Client) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (c *Client) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID types.ID,
) ([]*database.ProjectInfo, error) {
// TODO(krapie): Find(ctx, bson.D{{}}) loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}})
encodedID, err := encodeID(housekeepingLastProjectID)
if err != nil {
return nil, err
}

opts := options.Find()
opts.SetLimit(int64(pageSize))

cursor, err := c.collection(colProjects).Find(ctx, bson.M{
"_id": bson.M{
"$gt": encodedID,
},
}, opts)
if err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, fmt.Errorf("find project infos: %w", err)
}

var infos []*database.ProjectInfo
if err := cursor.All(ctx, &infos); err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, fmt.Errorf("fetch project infos: %w", err)
}

return infos, nil
Expand Down Expand Up @@ -657,23 +668,31 @@ func (c *Client) findDeactivateCandidatesPerProject(
func (c *Client) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
) ([]*database.ClientInfo, error) {
projects, err := c.listAllProjectInfos(ctx)
projectFetchSize int,
lastProjectID types.ID,
) (types.ID, []*database.ClientInfo, error) {
projects, err := c.listProjectInfos(ctx, projectFetchSize, lastProjectID)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

var candidates []*database.ClientInfo
for _, project := range projects {
clientInfos, err := c.findDeactivateCandidatesPerProject(ctx, project, candidatesLimitPerProject)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

candidates = append(candidates, clientInfos...)
}

return candidates, nil
var topProjectID types.ID
if len(projects) < projectFetchSize {
topProjectID = database.DefaultProjectID
} else {
topProjectID = projects[len(projects)-1].ID
}
return topProjectID, candidates, nil
}

// FindDocInfoByKeyAndOwner finds the document of the given key. If the
Expand Down
4 changes: 4 additions & 0 deletions server/backend/database/mongo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ func TestClient(t *testing.T) {
t.Run("IsDocumentAttached test", func(t *testing.T) {
testcases.RunIsDocumentAttachedTest(t, cli, dummyProjectID)
})

t.Run("FindDeactivateCandidates test", func(t *testing.T) {
testcases.RunFindDeactivateCandidates(t, cli)
})
}
48 changes: 48 additions & 0 deletions server/backend/database/testcases/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package testcases

import (
"bytes"
"context"
"fmt"
"sort"
"strconv"
"testing"
gotime "time"
Expand Down Expand Up @@ -622,6 +624,52 @@ func RunFindDocInfosByPagingTest(t *testing.T, db database.Database, projectID t
})
}

// RunFindDeactivateCandidates runs the FindDeactivateCandidates tests for the given db.
func RunFindDeactivateCandidates(t *testing.T, db database.Database) {
t.Run("housekeeping pagination test", func(t *testing.T) {
ctx := context.Background()

// Lists all projects of the dummyOwnerID and otherOwnerID.
projects, err := db.ListProjectInfos(ctx, dummyOwnerID)
assert.NoError(t, err)
otherProjects, err := db.ListProjectInfos(ctx, otherOwnerID)
assert.NoError(t, err)

projects = append(projects, otherProjects...)

sort.Slice(projects, func(i, j int) bool {
iBytes, err := projects[i].ID.Bytes()
assert.NoError(t, err)
jBytes, err := projects[j].ID.Bytes()
assert.NoError(t, err)
return bytes.Compare(iBytes, jBytes) < 0
})

fetchSize := 3
lastProjectID := database.DefaultProjectID

for i := 0; i < len(projects)/fetchSize; i++ {
lastProjectID, _, err = db.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, projects[((i+1)*fetchSize)-1].ID, lastProjectID)
}

lastProjectID, _, err = db.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, database.DefaultProjectID, lastProjectID)
})
}

// RunCreateChangeInfosTest runs the CreateChangeInfos tests for the given db.
func RunCreateChangeInfosTest(t *testing.T, db database.Database, projectID types.ID) {
t.Run("set RemovedAt in docInfo test", func(t *testing.T) {
Expand Down
Loading

0 comments on commit e6c24f7

Please sign in to comment.