Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pagination to Listing Projects for Housekeeping #587

Merged
merged 5 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/yorkie/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func init() {
server.DefaultHousekeepingCandidatesLimitPerProject,
"candidates limit per project for a single housekeeping run",
)
cmd.Flags().IntVar(
&conf.Housekeeping.HousekeepingProjectFetchSize,
"housekeeping-project-fetch-size",
server.DefaultHousekeepingProjectFetchSize,
"housekeeping project fetch size for a single housekeeping run",
)
cmd.Flags().StringVar(
&mongoConnectionURI,
"mongo-connection-uri",
Expand Down
2 changes: 2 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type Database interface {
FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
housekeepingLastProjectID *types.ID,
) ([]*ClientInfo, error)

// FindDocInfoByKey finds the document of the given key.
Expand Down
43 changes: 34 additions & 9 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package memory

import (
"bytes"
"context"
"fmt"
gotime "time"
Expand Down Expand Up @@ -224,16 +225,15 @@ func (d *DB) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (d *DB) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (d *DB) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ProjectInfo, error) {
txn := d.db.Txn(false)
defer txn.Abort()

// TODO(krapie): txn.Get() loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
iter, err := txn.Get(
tblProjects,
"id",
Expand All @@ -242,12 +242,35 @@ func (d *DB) listAllProjectInfos(
return nil, fmt.Errorf("fetch all projects: %w", err)
}

lastIDBytes, err := housekeepingLastProjectID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode last project id: %w", err)
}

var infos []*database.ProjectInfo
for raw := iter.Next(); raw != nil; raw = iter.Next() {
for i := 0; i < pageSize; {
raw := iter.Next()
if raw == nil {
*housekeepingLastProjectID = database.DefaultProjectID
break
}

info := raw.(*database.ProjectInfo).DeepCopy()
infos = append(infos, info)
}

idBytes, err := info.ID.Bytes()
if err != nil {
return nil, fmt.Errorf("decode project id: %w", err)
}

if bytes.Compare(idBytes, lastIDBytes) > 0 {
krapie marked this conversation as resolved.
Show resolved Hide resolved
infos = append(infos, info)
i++
}

if i == pageSize {
*housekeepingLastProjectID = infos[len(infos)-1].ID
}
}
return infos, nil
}

Expand Down Expand Up @@ -599,8 +622,10 @@ func (d *DB) findDeactivateCandidatesPerProject(
func (d *DB) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ClientInfo, error) {
projects, err := d.listAllProjectInfos(ctx)
projects, err := d.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion server/backend/database/memory/housekeeping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
monkey "github.com/undefinedlabs/go-mpatch"

"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/database/memory"
)

Expand All @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) {
ctx := context.Background()

clientDeactivateThreshold := "23h"
_, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold)

userInfo, err := memdb.CreateUserInfo(ctx, "test", "test")
assert.NoError(t, err)
project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold)
assert.NoError(t, err)

yesterday := gotime.Now().Add(-24 * gotime.Hour)
Expand All @@ -59,9 +63,12 @@ func TestHousekeeping(t *testing.T) {
clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name()))
assert.NoError(t, err)

housekeepingLastProjectID := database.DefaultProjectID
candidates, err := memdb.FindDeactivateCandidates(
ctx,
10,
10,
&housekeepingLastProjectID,
)
assert.NoError(t, err)
assert.Len(t, candidates, 2)
Expand Down
37 changes: 28 additions & 9 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,38 @@ func (c *Client) CreateProjectInfo(
return info, nil
}

// ListAllProjectInfos returns all project infos.
func (c *Client) listAllProjectInfos(
// listProjectInfos returns all project infos rotationally.
func (c *Client) listProjectInfos(
ctx context.Context,
pageSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ProjectInfo, error) {
// TODO(krapie): Find(ctx, bson.D{{}}) loads all projects in memory,
// which will cause performance issue as number of projects in DB grows.
// Therefore, pagination of projects is needed to avoid this issue.
cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}})
encodedID, err := encodeID(*housekeepingLastProjectID)
if err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, err
}

opts := options.Find()
opts.SetLimit(int64(pageSize))

cursor, err := c.collection(colProjects).Find(ctx, bson.M{
"_id": bson.M{
"$gt": encodedID,
},
}, opts)
if err != nil {
return nil, fmt.Errorf("find project infos: %w", err)
}

var infos []*database.ProjectInfo
if err := cursor.All(ctx, &infos); err != nil {
return nil, fmt.Errorf("fetch all project infos: %w", err)
return nil, fmt.Errorf("fetch project infos: %w", err)
}

if len(infos) < pageSize {
*housekeepingLastProjectID = database.DefaultProjectID
} else if len(infos) > 0 {
*housekeepingLastProjectID = infos[len(infos)-1].ID
}

return infos, nil
Expand Down Expand Up @@ -657,8 +674,10 @@ func (c *Client) findDeactivateCandidatesPerProject(
func (c *Client) FindDeactivateCandidates(
ctx context.Context,
candidatesLimitPerProject int,
projectFetchSize int,
housekeepingLastProjectID *types.ID,
) ([]*database.ClientInfo, error) {
projects, err := c.listAllProjectInfos(ctx)
projects, err := c.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID)
if err != nil {
return nil, err
}
Expand Down
61 changes: 61 additions & 0 deletions server/backend/housekeeping/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package housekeeping

import (
"fmt"
"time"
)

// Config is the configuration for the housekeeping service.
type Config struct {
// Interval is the time between housekeeping runs.
Interval string `yaml:"Interval"`

// CandidatesLimitPerProject is the maximum number of candidates to be returned per project.
CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"`

// HousekeepingProjectFetchSize is the maximum number of projects to be returned to deactivate candidates.
HousekeepingProjectFetchSize int `yaml:"HousekeepingProjectFetchSize"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if _, err := time.ParseDuration(c.Interval); err != nil {
return fmt.Errorf(
`invalid argument %s for "--housekeeping-interval" flag: %w`,
c.Interval,
err,
)
}

if c.CandidatesLimitPerProject <= 0 {
return fmt.Errorf(
`invalid argument %d for "--housekeeping-candidates-limit-per-project" flag`,
c.HousekeepingProjectFetchSize,
)
}

if c.HousekeepingProjectFetchSize <= 0 {
return fmt.Errorf(
`invalid argument %d for "--housekeeping-project-fetc-size" flag`,
c.HousekeepingProjectFetchSize,
)
}

return nil
}
48 changes: 48 additions & 0 deletions server/backend/housekeeping/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package housekeeping_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/yorkie-team/yorkie/server/backend/housekeeping"
)

func TestConfig(t *testing.T) {
t.Run("validate test", func(t *testing.T) {
validConf := housekeeping.Config{
Interval: "1m",
CandidatesLimitPerProject: 100,
HousekeepingProjectFetchSize: 100,
}
assert.NoError(t, validConf.Validate())

conf1 := validConf
conf1.Interval = "hour"
assert.Error(t, conf1.Validate())

conf2 := validConf
conf2.CandidatesLimitPerProject = 0
assert.Error(t, conf2.Validate())

conf3 := validConf
conf3.HousekeepingProjectFetchSize = -1
assert.Error(t, conf3.Validate())
})
}
33 changes: 9 additions & 24 deletions server/backend/housekeeping/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"time"

"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/sync"
"github.com/yorkie-team/yorkie/server/clients"
Expand All @@ -34,28 +35,6 @@ const (
deactivateCandidatesKey = "housekeeping/deactivateCandidates"
)

// Config is the configuration for the housekeeping service.
type Config struct {
// Interval is the time between housekeeping runs.
Interval string `yaml:"Interval"`

// CandidatesLimitPerProject is the maximum number of candidates to be returned per project.
CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if _, err := time.ParseDuration(c.Interval); err != nil {
return fmt.Errorf(
`invalid argument %s for "--housekeeping-interval" flag: %w`,
c.Interval,
err,
)
}

return nil
}

// Housekeeping is the housekeeping service. It periodically runs housekeeping
// tasks. It is responsible for deactivating clients that have not been active
// for a long time.
Expand All @@ -65,6 +44,7 @@ type Housekeeping struct {

interval time.Duration
candidatesLimitPerProject int
projectFetchSize int

ctx context.Context
cancelFunc context.CancelFunc
Expand Down Expand Up @@ -106,6 +86,7 @@ func New(

interval: interval,
candidatesLimitPerProject: conf.CandidatesLimitPerProject,
projectFetchSize: conf.HousekeepingProjectFetchSize,

ctx: ctx,
cancelFunc: cancelFunc,
Expand All @@ -127,9 +108,11 @@ func (h *Housekeeping) Stop() error {

// run is the housekeeping loop.
func (h *Housekeeping) run() {
housekeepingLastProjectID := database.DefaultProjectID
hackerwins marked this conversation as resolved.
Show resolved Hide resolved

for {
ctx := context.Background()
if err := h.deactivateCandidates(ctx); err != nil {
if err := h.deactivateCandidates(ctx, &housekeepingLastProjectID); err != nil {
continue
}

Expand All @@ -142,7 +125,7 @@ func (h *Housekeeping) run() {
}

// deactivateCandidates deactivates candidates.
func (h *Housekeeping) deactivateCandidates(ctx context.Context) error {
func (h *Housekeeping) deactivateCandidates(ctx context.Context, housekeepingLastProjectID *types.ID) error {
start := time.Now()
locker, err := h.coordinator.NewLocker(ctx, deactivateCandidatesKey)
if err != nil {
Expand All @@ -162,6 +145,8 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context) error {
candidates, err := h.database.FindDeactivateCandidates(
ctx,
h.candidatesLimitPerProject,
h.projectFetchSize,
housekeepingLastProjectID,
)
if err != nil {
return err
Expand Down
Loading
Loading