Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pagination to Listing Projects for Housekeeping #587

Merged
merged 5 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func init() {
server.DefaultHousekeepingCandidatesLimitPerProject,
"candidates limit per project for a single housekeeping run",
)
cmd.Flags().IntVar(
&conf.Housekeeping.ProjectFetchSize,
"housekeeping-project-fetch-size",
server.DefaultHousekeepingProjectFetchSize,
"housekeeping project fetch size for a single housekeeping run",
)
cmd.Flags().StringVar(
&mongoConnectionURI,
"mongo-connection-uri",
Expand Down
4 changes: 3 additions & 1 deletion server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ type Database interface {
FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
) ([]*ClientInfo, error)
projectFetchSize int,
housekeepingLastProjectID types.ID,
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
) (types.ID, []*ClientInfo, error)

// FindDocInfoByKey finds the document of the given key.
FindDocInfoByKey(
Expand Down
67 changes: 56 additions & 11 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package memory

import (
"bytes"
"context"
"fmt"
"sort"
gotime "time"

"github.com/hashicorp/go-memdb"
Expand Down Expand Up @@ -224,16 +226,15 @@ func (d *DB) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (d *DB) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (d *DB) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID types.ID,
) ([]*database.ProjectInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

// TODO(krapie): txn.Get() loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
iter, err := txn.Get(
tblProjects,
"id",
Expand All @@ -243,12 +244,47 @@ func (d *DB) listAllProjectInfos(
}

var infos []*database.ProjectInfo

for raw := iter.Next(); raw != nil; raw = iter.Next() {
info := raw.(*database.ProjectInfo).DeepCopy()
infos = append(infos, info)
}

return infos, nil
sort.SliceStable(infos, func(i, j int) bool {
firstIDBytes, err := infos[i].ID.Bytes()
if err != nil {
return true
}
secondIDBytes, err := infos[j].ID.Bytes()
if err != nil {
return true
}

return bytes.Compare(firstIDBytes, secondIDBytes) < 0
})

beginIndex := 0
for i := 0; i < len(infos); i++ {
proejctIDBytes, err := infos[i].ID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode project id: %w", err)
}
lastProjectIDBytes, err := housekeepingLastProjectID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode last project id: %w", err)
}
if bytes.Compare(proejctIDBytes, lastProjectIDBytes) > 0 {
beginIndex = i
break
}
}

lastIndex := beginIndex + pageSize - 1
if lastIndex >= len(infos) {
lastIndex = len(infos) - 1
}

return infos[beginIndex : lastIndex+1], nil
hackerwins marked this conversation as resolved.
Show resolved Hide resolved
}

// ListProjectInfos returns all project infos owned by owner.
Expand Down Expand Up @@ -599,23 +635,32 @@ func (d *DB) findDeactivateCandidatesPerProject(
func (d *DB) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
) ([]*database.ClientInfo, error) {
projects, err := d.listAllProjectInfos(ctx)
projectFetchSize int,
housekeepingLastProjectID types.ID,
) (types.ID, []*database.ClientInfo, error) {
projects, err := d.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

var candidates []*database.ClientInfo
for _, project := range projects {
infos, err := d.findDeactivateCandidatesPerProject(ctx, project, candidatesLimitPerProject)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

candidates = append(candidates, infos...)
}

return candidates, nil
var lastProjectID types.ID
if len(projects) < projectFetchSize {
lastProjectID = database.DefaultProjectID
} else {
lastProjectID = projects[len(projects)-1].ID
}

return lastProjectID, candidates, nil
}

// FindDocInfoByKeyAndOwner finds the document of the given key. If the
Expand Down
65 changes: 63 additions & 2 deletions server/backend/database/memory/housekeeping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
monkey "github.com/undefinedlabs/go-mpatch"

"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/database/memory"
)

Expand All @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) {
ctx := context.Background()

clientDeactivateThreshold := "23h"
_, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold)

userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)
project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

yesterday := gotime.Now().Add(-24 * gotime.Hour)
Expand All @@ -59,14 +63,71 @@ func TestHousekeeping(t *testing.T) {
clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name()))
assert.NoError(t, err)

candidates, err := memdb.FindDeactivateCandidates(
_, candidates, err := memdb.FindDeactivateCandidates(
ctx,
10,
10,
database.DefaultProjectID,
)
assert.NoError(t, err)
assert.Len(t, candidates, 2)
assert.Contains(t, candidates, clientA)
assert.Contains(t, candidates, clientB)
assert.NotContains(t, candidates, clientC)
})

t.Run("housekeeping pagination test", func(t *testing.T) {
ctx := context.Background()
memdb, projects := createDBandProjects(t)

fetchSize := 4
lastProjectID, _, err := memdb.FindDeactivateCandidates(
ctx,
0,
fetchSize,
database.DefaultProjectID,
)
assert.NoError(t, err)
assert.Equal(t, projects[fetchSize-1].ID, lastProjectID)

lastProjectID, _, err = memdb.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, projects[fetchSize*2-1].ID, lastProjectID)

lastProjectID, _, err = memdb.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, database.DefaultProjectID, lastProjectID)
})
}

func createDBandProjects(t *testing.T) (*memory.DB, []*database.ProjectInfo) {
t.Helper()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the first time to see t.Helper. When is this useful? Would it be good to add t.Helper to other helper functions too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.Helper() indicates to the Go test runner that createDBandProjects() helper function is a test helper. It makes it easy to report test failures at the call site of the parent function rather than the call site of t.Error() and t.Errorf(). When t.Error() is called from createDBandProjects() function, the Go test runner will report the filename and line number of the code which called the createDBandProjects() function in the output.


ctx := context.Background()
memdb, err := memory.New()
assert.NoError(t, err)

clientDeactivateThreshold := "23h"
userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)

projects := make([]*database.ProjectInfo, 0)
for i := 0; i < 10; i++ {
p, err := memdb.CreateProjectInfo(ctx, fmt.Sprintf("%d project", i), userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

projects = append(projects, p)
}

return memdb, projects
}
45 changes: 32 additions & 13 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,32 @@ func (c *Client) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (c *Client) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (c *Client) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID types.ID,
) ([]*database.ProjectInfo, error) {
// TODO(krapie): Find(ctx, bson.D{{}}) loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}})
encodedID, err := encodeID(housekeepingLastProjectID)
if err != nil {
return nil, err
}

opts := options.Find()
opts.SetLimit(int64(pageSize))

cursor, err := c.collection(colProjects).Find(ctx, bson.M{
"_id": bson.M{
"$gt": encodedID,
},
}, opts)
if err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, fmt.Errorf("find project infos: %w", err)
}

var infos []*database.ProjectInfo
if err := cursor.All(ctx, &infos); err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, fmt.Errorf("fetch project infos: %w", err)
}

return infos, nil
Expand Down Expand Up @@ -657,23 +668,31 @@ func (c *Client) findDeactivateCandidatesPerProject(
func (c *Client) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
) ([]*database.ClientInfo, error) {
projects, err := c.listAllProjectInfos(ctx)
projectFetchSize int,
housekeepingLastProjectID types.ID,
) (types.ID, []*database.ClientInfo, error) {
projects, err := c.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

var candidates []*database.ClientInfo
for _, project := range projects {
clientInfos, err := c.findDeactivateCandidatesPerProject(ctx, project, candidatesLimitPerProject)
if err != nil {
return nil, err
return database.DefaultProjectID, nil, err
}

candidates = append(candidates, clientInfos...)
}

return candidates, nil
var lastProjectID types.ID
if len(projects) < projectFetchSize {
lastProjectID = database.DefaultProjectID
} else {
lastProjectID = projects[len(projects)-1].ID
}
return lastProjectID, candidates, nil
}

// FindDocInfoByKeyAndOwner finds the document of the given key. If the
Expand Down
4 changes: 4 additions & 0 deletions server/backend/database/mongo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ func TestClient(t *testing.T) {
t.Run("IsDocumentAttached test", func(t *testing.T) {
testcases.RunIsDocumentAttachedTest(t, cli, dummyProjectID)
})

t.Run("FindDeactivateCandidates test", func(t *testing.T) {
testcases.RunFindDeactivateCandidates(t, cli)
})
}
48 changes: 48 additions & 0 deletions server/backend/database/testcases/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package testcases

import (
"bytes"
"context"
"fmt"
"sort"
"strconv"
"testing"
gotime "time"
Expand Down Expand Up @@ -622,6 +624,52 @@ func RunFindDocInfosByPagingTest(t *testing.T, db database.Database, projectID t
})
}

// RunFindDeactivateCandidates runs the FindDeactivateCandidates tests for the given db.
func RunFindDeactivateCandidates(t *testing.T, db database.Database) {
t.Run("housekeeping pagination test", func(t *testing.T) {
ctx := context.Background()

// Lists all projects of the dummyOwnerID and otherOwnerID.
projects, err := db.ListProjectInfos(ctx, dummyOwnerID)
assert.NoError(t, err)
otherProjects, err := db.ListProjectInfos(ctx, otherOwnerID)
assert.NoError(t, err)

projects = append(projects, otherProjects...)

sort.Slice(projects, func(i, j int) bool {
iBytes, err := projects[i].ID.Bytes()
assert.NoError(t, err)
jBytes, err := projects[j].ID.Bytes()
assert.NoError(t, err)
return bytes.Compare(iBytes, jBytes) < 0
})

fetchSize := 3
lastProjectID := database.DefaultProjectID

for i := 0; i < len(projects)/fetchSize; i++ {
lastProjectID, _, err = db.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, projects[((i+1)*fetchSize)-1].ID, lastProjectID)
}

lastProjectID, _, err = db.FindDeactivateCandidates(
ctx,
0,
fetchSize,
lastProjectID,
)
assert.NoError(t, err)
assert.Equal(t, database.DefaultProjectID, lastProjectID)
})
}

// RunCreateChangeInfosTest runs the CreateChangeInfos tests for the given db.
func RunCreateChangeInfosTest(t *testing.T, db database.Database, projectID types.ID) {
t.Run("set RemovedAt in docInfo test", func(t *testing.T) {
Expand Down
Loading
Loading