diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index 3e54fdf62..ecfb3d2f1 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -223,6 +223,12 @@ func init() { server.DefaultHousekeepingCandidatesLimitPerProject, "candidates limit per project for a single housekeeping run", ) + cmd.Flags().IntVar( + &conf.Housekeeping.ProjectFetchSize, + "housekeeping-project-fetch-size", + server.DefaultHousekeepingProjectFetchSize, + "housekeeping project fetch size for a single housekeeping run", + ) cmd.Flags().StringVar( &mongoConnectionURI, "mongo-connection-uri", diff --git a/server/backend/database/database.go b/server/backend/database/database.go index aa35327b2..be0118e2f 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -138,7 +138,9 @@ type Database interface { FindDeactivateCandidates( ctx context.Context, candidatesLimitPerProject int, - ) ([]*ClientInfo, error) + projectFetchSize int, + lastProjectID types.ID, + ) (types.ID, []*ClientInfo, error) // FindDocInfoByKey finds the document of the given key. FindDocInfoByKey( diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 1b5c8582f..aa46dcfef 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -224,27 +224,38 @@ func (d *DB) CreateProjectInfo( return info, nil } -// ListAllProjectInfos returns all project infos. -func (d *DB) listAllProjectInfos( +// listProjectInfos returns all project infos rotationally. +func (d *DB) listProjectInfos( ctx context.Context, + pageSize int, + housekeepingLastProjectID types.ID, ) ([]*database.ProjectInfo, error) { txn := d.db.Txn(false) defer txn.Abort() - // TODO(krapie): txn.Get() loads all projects in memory, - // which will cause performance issue as number of projects in DB grows. - // Therefore, pagination of projects is needed to avoid this issue. - iter, err := txn.Get( + iter, err := txn.LowerBound( tblProjects, "id", + housekeepingLastProjectID.String(), ) if err != nil { - return nil, fmt.Errorf("fetch all projects: %w", err) + return nil, fmt.Errorf("fetch projects: %w", err) } var infos []*database.ProjectInfo - for raw := iter.Next(); raw != nil; raw = iter.Next() { + + for i := 0; i < pageSize; i++ { + raw := iter.Next() + if raw == nil { + break + } info := raw.(*database.ProjectInfo).DeepCopy() + + if i == 0 && info.ID == housekeepingLastProjectID { + pageSize++ + continue + } + infos = append(infos, info) } @@ -599,23 +610,32 @@ func (d *DB) findDeactivateCandidatesPerProject( func (d *DB) FindDeactivateCandidates( ctx context.Context, candidatesLimitPerProject int, -) ([]*database.ClientInfo, error) { - projects, err := d.listAllProjectInfos(ctx) + projectFetchSize int, + lastProjectID types.ID, +) (types.ID, []*database.ClientInfo, error) { + projects, err := d.listProjectInfos(ctx, projectFetchSize, lastProjectID) if err != nil { - return nil, err + return database.DefaultProjectID, nil, err } var candidates []*database.ClientInfo for _, project := range projects { infos, err := d.findDeactivateCandidatesPerProject(ctx, project, candidatesLimitPerProject) if err != nil { - return nil, err + return database.DefaultProjectID, nil, err } candidates = append(candidates, infos...) } - return candidates, nil + var topProjectID types.ID + if len(projects) < projectFetchSize { + topProjectID = database.DefaultProjectID + } else { + topProjectID = projects[len(projects)-1].ID + } + + return topProjectID, candidates, nil } // FindDocInfoByKeyAndOwner finds the document of the given key. If the diff --git a/server/backend/database/memory/housekeeping_test.go b/server/backend/database/memory/housekeeping_test.go index f71a91943..5c1aeaeb0 100644 --- a/server/backend/database/memory/housekeeping_test.go +++ b/server/backend/database/memory/housekeeping_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" monkey "github.com/undefinedlabs/go-mpatch" + "github.com/yorkie-team/yorkie/server/backend/database" "github.com/yorkie-team/yorkie/server/backend/database/memory" ) @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) { ctx := context.Background() clientDeactivateThreshold := "23h" - _, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold) + + userInfo, err := memdb.CreateUserInfo(ctx, "test", "test") + assert.NoError(t, err) + project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold) assert.NoError(t, err) yesterday := gotime.Now().Add(-24 * gotime.Hour) @@ -59,9 +63,11 @@ func TestHousekeeping(t *testing.T) { clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name())) assert.NoError(t, err) - candidates, err := memdb.FindDeactivateCandidates( + _, candidates, err := memdb.FindDeactivateCandidates( ctx, 10, + 10, + database.DefaultProjectID, ) assert.NoError(t, err) assert.Len(t, candidates, 2) @@ -69,4 +75,59 @@ func TestHousekeeping(t *testing.T) { assert.Contains(t, candidates, clientB) assert.NotContains(t, candidates, clientC) }) + + t.Run("housekeeping pagination test", func(t *testing.T) { + ctx := context.Background() + memdb, projects := createDBandProjects(t) + + fetchSize := 4 + lastProjectID, _, err := memdb.FindDeactivateCandidates( + ctx, + 0, + fetchSize, + database.DefaultProjectID, + ) + assert.NoError(t, err) + assert.Equal(t, projects[fetchSize-1].ID, lastProjectID) + + lastProjectID, _, err = memdb.FindDeactivateCandidates( + ctx, + 0, + fetchSize, + lastProjectID, + ) + assert.NoError(t, err) + assert.Equal(t, projects[fetchSize*2-1].ID, lastProjectID) + + lastProjectID, _, err = memdb.FindDeactivateCandidates( + ctx, + 0, + fetchSize, + lastProjectID, + ) + assert.NoError(t, err) + assert.Equal(t, database.DefaultProjectID, lastProjectID) + }) +} + +func createDBandProjects(t *testing.T) (*memory.DB, []*database.ProjectInfo) { + t.Helper() + + ctx := context.Background() + memdb, err := memory.New() + assert.NoError(t, err) + + clientDeactivateThreshold := "23h" + userInfo, err := memdb.CreateUserInfo(ctx, "test", "test") + assert.NoError(t, err) + + projects := make([]*database.ProjectInfo, 0) + for i := 0; i < 10; i++ { + p, err := memdb.CreateProjectInfo(ctx, fmt.Sprintf("%d project", i), userInfo.ID, clientDeactivateThreshold) + assert.NoError(t, err) + + projects = append(projects, p) + } + + return memdb, projects } diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index bbf1278bc..65c319ef8 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -235,21 +235,32 @@ func (c *Client) CreateProjectInfo( return info, nil } -// ListAllProjectInfos returns all project infos. -func (c *Client) listAllProjectInfos( +// listProjectInfos returns all project infos rotationally. +func (c *Client) listProjectInfos( ctx context.Context, + pageSize int, + housekeepingLastProjectID types.ID, ) ([]*database.ProjectInfo, error) { - // TODO(krapie): Find(ctx, bson.D{{}}) loads all projects in memory, - // which will cause performance issue as number of projects in DB grows. - // Therefore, pagination of projects is needed to avoid this issue. - cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}}) + encodedID, err := encodeID(housekeepingLastProjectID) + if err != nil { + return nil, err + } + + opts := options.Find() + opts.SetLimit(int64(pageSize)) + + cursor, err := c.collection(colProjects).Find(ctx, bson.M{ + "_id": bson.M{ + "$gt": encodedID, + }, + }, opts) if err != nil { - return nil, fmt.Errorf("fetch all project infos: %w", err) + return nil, fmt.Errorf("find project infos: %w", err) } var infos []*database.ProjectInfo if err := cursor.All(ctx, &infos); err != nil { - return nil, fmt.Errorf("fetch all project infos: %w", err) + return nil, fmt.Errorf("fetch project infos: %w", err) } return infos, nil @@ -657,23 +668,31 @@ func (c *Client) findDeactivateCandidatesPerProject( func (c *Client) FindDeactivateCandidates( ctx context.Context, candidatesLimitPerProject int, -) ([]*database.ClientInfo, error) { - projects, err := c.listAllProjectInfos(ctx) + projectFetchSize int, + lastProjectID types.ID, +) (types.ID, []*database.ClientInfo, error) { + projects, err := c.listProjectInfos(ctx, projectFetchSize, lastProjectID) if err != nil { - return nil, err + return database.DefaultProjectID, nil, err } var candidates []*database.ClientInfo for _, project := range projects { clientInfos, err := c.findDeactivateCandidatesPerProject(ctx, project, candidatesLimitPerProject) if err != nil { - return nil, err + return database.DefaultProjectID, nil, err } candidates = append(candidates, clientInfos...) } - return candidates, nil + var topProjectID types.ID + if len(projects) < projectFetchSize { + topProjectID = database.DefaultProjectID + } else { + topProjectID = projects[len(projects)-1].ID + } + return topProjectID, candidates, nil } // FindDocInfoByKeyAndOwner finds the document of the given key. If the diff --git a/server/backend/database/mongo/client_test.go b/server/backend/database/mongo/client_test.go index ab1719d8d..81d70d8cf 100644 --- a/server/backend/database/mongo/client_test.go +++ b/server/backend/database/mongo/client_test.go @@ -100,4 +100,8 @@ func TestClient(t *testing.T) { t.Run("IsDocumentAttached test", func(t *testing.T) { testcases.RunIsDocumentAttachedTest(t, cli, dummyProjectID) }) + + t.Run("FindDeactivateCandidates test", func(t *testing.T) { + testcases.RunFindDeactivateCandidates(t, cli) + }) } diff --git a/server/backend/database/testcases/testcases.go b/server/backend/database/testcases/testcases.go index ec629c212..3cc19121c 100644 --- a/server/backend/database/testcases/testcases.go +++ b/server/backend/database/testcases/testcases.go @@ -19,8 +19,10 @@ package testcases import ( + "bytes" "context" "fmt" + "sort" "strconv" "testing" gotime "time" @@ -622,6 +624,52 @@ func RunFindDocInfosByPagingTest(t *testing.T, db database.Database, projectID t }) } +// RunFindDeactivateCandidates runs the FindDeactivateCandidates tests for the given db. +func RunFindDeactivateCandidates(t *testing.T, db database.Database) { + t.Run("housekeeping pagination test", func(t *testing.T) { + ctx := context.Background() + + // Lists all projects of the dummyOwnerID and otherOwnerID. + projects, err := db.ListProjectInfos(ctx, dummyOwnerID) + assert.NoError(t, err) + otherProjects, err := db.ListProjectInfos(ctx, otherOwnerID) + assert.NoError(t, err) + + projects = append(projects, otherProjects...) + + sort.Slice(projects, func(i, j int) bool { + iBytes, err := projects[i].ID.Bytes() + assert.NoError(t, err) + jBytes, err := projects[j].ID.Bytes() + assert.NoError(t, err) + return bytes.Compare(iBytes, jBytes) < 0 + }) + + fetchSize := 3 + lastProjectID := database.DefaultProjectID + + for i := 0; i < len(projects)/fetchSize; i++ { + lastProjectID, _, err = db.FindDeactivateCandidates( + ctx, + 0, + fetchSize, + lastProjectID, + ) + assert.NoError(t, err) + assert.Equal(t, projects[((i+1)*fetchSize)-1].ID, lastProjectID) + } + + lastProjectID, _, err = db.FindDeactivateCandidates( + ctx, + 0, + fetchSize, + lastProjectID, + ) + assert.NoError(t, err) + assert.Equal(t, database.DefaultProjectID, lastProjectID) + }) +} + // RunCreateChangeInfosTest runs the CreateChangeInfos tests for the given db. func RunCreateChangeInfosTest(t *testing.T, db database.Database, projectID types.ID) { t.Run("set RemovedAt in docInfo test", func(t *testing.T) { diff --git a/server/backend/housekeeping/config.go b/server/backend/housekeeping/config.go new file mode 100644 index 000000000..513bbd7a5 --- /dev/null +++ b/server/backend/housekeeping/config.go @@ -0,0 +1,61 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package housekeeping + +import ( + "fmt" + "time" +) + +// Config is the configuration for the housekeeping service. +type Config struct { + // Interval is the time between housekeeping runs. + Interval string `yaml:"Interval"` + + // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. + CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` + + // ProjectFetchSize is the maximum number of projects to be returned to deactivate candidates. + ProjectFetchSize int `yaml:"HousekeepingProjectFetchSize"` +} + +// Validate validates the configuration. +func (c *Config) Validate() error { + if _, err := time.ParseDuration(c.Interval); err != nil { + return fmt.Errorf( + `invalid argument %s for "--housekeeping-interval" flag: %w`, + c.Interval, + err, + ) + } + + if c.CandidatesLimitPerProject <= 0 { + return fmt.Errorf( + `invalid argument %d for "--housekeeping-candidates-limit-per-project" flag`, + c.ProjectFetchSize, + ) + } + + if c.ProjectFetchSize <= 0 { + return fmt.Errorf( + `invalid argument %d for "--housekeeping-project-fetc-size" flag`, + c.ProjectFetchSize, + ) + } + + return nil +} diff --git a/server/backend/housekeeping/config_test.go b/server/backend/housekeeping/config_test.go new file mode 100644 index 000000000..876904135 --- /dev/null +++ b/server/backend/housekeeping/config_test.go @@ -0,0 +1,48 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package housekeeping_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/server/backend/housekeeping" +) + +func TestConfig(t *testing.T) { + t.Run("validate test", func(t *testing.T) { + validConf := housekeeping.Config{ + Interval: "1m", + CandidatesLimitPerProject: 100, + ProjectFetchSize: 100, + } + assert.NoError(t, validConf.Validate()) + + conf1 := validConf + conf1.Interval = "hour" + assert.Error(t, conf1.Validate()) + + conf2 := validConf + conf2.CandidatesLimitPerProject = 0 + assert.Error(t, conf2.Validate()) + + conf3 := validConf + conf3.ProjectFetchSize = -1 + assert.Error(t, conf3.Validate()) + }) +} diff --git a/server/backend/housekeeping/housekeeping.go b/server/backend/housekeeping/housekeeping.go index d08abcbdc..2672ab182 100644 --- a/server/backend/housekeeping/housekeeping.go +++ b/server/backend/housekeeping/housekeeping.go @@ -24,6 +24,7 @@ import ( "fmt" "time" + "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/server/backend/database" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/clients" @@ -34,28 +35,6 @@ const ( deactivateCandidatesKey = "housekeeping/deactivateCandidates" ) -// Config is the configuration for the housekeeping service. -type Config struct { - // Interval is the time between housekeeping runs. - Interval string `yaml:"Interval"` - - // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. - CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` -} - -// Validate validates the configuration. -func (c *Config) Validate() error { - if _, err := time.ParseDuration(c.Interval); err != nil { - return fmt.Errorf( - `invalid argument %s for "--housekeeping-interval" flag: %w`, - c.Interval, - err, - ) - } - - return nil -} - // Housekeeping is the housekeeping service. It periodically runs housekeeping // tasks. It is responsible for deactivating clients that have not been active // for a long time. @@ -65,6 +44,7 @@ type Housekeeping struct { interval time.Duration candidatesLimitPerProject int + projectFetchSize int ctx context.Context cancelFunc context.CancelFunc @@ -106,6 +86,7 @@ func New( interval: interval, candidatesLimitPerProject: conf.CandidatesLimitPerProject, + projectFetchSize: conf.ProjectFetchSize, ctx: ctx, cancelFunc: cancelFunc, @@ -127,11 +108,16 @@ func (h *Housekeeping) Stop() error { // run is the housekeeping loop. func (h *Housekeeping) run() { + housekeepingLastProjectID := database.DefaultProjectID + for { ctx := context.Background() - if err := h.deactivateCandidates(ctx); err != nil { + lastProjectID, err := h.deactivateCandidates(ctx, housekeepingLastProjectID) + if err != nil { + logging.From(ctx).Error(err) continue } + housekeepingLastProjectID = lastProjectID select { case <-time.After(h.interval): @@ -142,15 +128,18 @@ func (h *Housekeeping) run() { } // deactivateCandidates deactivates candidates. -func (h *Housekeeping) deactivateCandidates(ctx context.Context) error { +func (h *Housekeeping) deactivateCandidates( + ctx context.Context, + housekeepingLastProjectID types.ID, +) (types.ID, error) { start := time.Now() locker, err := h.coordinator.NewLocker(ctx, deactivateCandidatesKey) if err != nil { - return err + return database.DefaultProjectID, err } if err := locker.Lock(ctx); err != nil { - return err + return database.DefaultProjectID, err } defer func() { @@ -159,12 +148,14 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context) error { } }() - candidates, err := h.database.FindDeactivateCandidates( + lastProjectID, candidates, err := h.database.FindDeactivateCandidates( ctx, h.candidatesLimitPerProject, + h.projectFetchSize, + housekeepingLastProjectID, ) if err != nil { - return err + return database.DefaultProjectID, err } deactivatedCount := 0 @@ -175,7 +166,7 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context) error { clientInfo.ProjectID, clientInfo.ID, ); err != nil { - return err + return database.DefaultProjectID, err } deactivatedCount++ @@ -190,5 +181,5 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context) error { ) } - return nil + return lastProjectID, nil } diff --git a/server/config.go b/server/config.go index e0b1b9dd7..63d51af41 100644 --- a/server/config.go +++ b/server/config.go @@ -42,6 +42,7 @@ const ( DefaultHousekeepingInterval = 30 * time.Second DefaultHousekeepingCandidatesLimitPerProject = 500 + DefaultHousekeepingProjectFetchSize = 100 DefaultMongoConnectionURI = "mongodb://localhost:27017" DefaultMongoConnectionTimeout = 5 * time.Second @@ -241,6 +242,7 @@ func newConfig(port int, profilingPort int) *Config { Housekeeping: &housekeeping.Config{ Interval: DefaultHousekeepingInterval.String(), CandidatesLimitPerProject: DefaultHousekeepingCandidatesLimitPerProject, + ProjectFetchSize: DefaultHousekeepingProjectFetchSize, }, Backend: &backend.Config{ ClientDeactivateThreshold: DefaultClientDeactivateThreshold, diff --git a/server/config.sample.yml b/server/config.sample.yml index 22fc97d78..e682b3fc3 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -36,6 +36,9 @@ Housekeeping: # CandidatesLimitPerProject is the maximum number of candidates to be returned per project (default: 100). CandidatesLimitPerProject: 100 + # ProjectFetchSize is the maximum number of projects to be returned to deactivate candidates. (default: 100). + ProjectFetchSize: 100 + # Backend is the configuration for the backend of Yorkie. Backend: # UseDefaultProject is whether to use the default project (default: true). diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index ae41aed59..41eb63f75 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -87,6 +87,7 @@ func TestMain(m *testing.M) { }, &housekeeping.Config{ Interval: helper.HousekeepingInterval.String(), CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + ProjectFetchSize: helper.HousekeepingProjectFetchSize, }, met) if err != nil { log.Fatal(err) diff --git a/test/helper/helper.go b/test/helper/helper.go index eec425fd5..914757af1 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -60,6 +60,7 @@ var ( AdminPassword = server.DefaultAdminPassword HousekeepingInterval = 10 * gotime.Second HousekeepingCandidatesLimitPerProject = 10 + HousekeepingProjectFetchSize = 10 AdminTokenDuration = "10s" ClientDeactivateThreshold = "10s" @@ -225,6 +226,7 @@ func TestConfig() *server.Config { Housekeeping: &housekeeping.Config{ Interval: HousekeepingInterval.String(), CandidatesLimitPerProject: HousekeepingCandidatesLimitPerProject, + ProjectFetchSize: HousekeepingProjectFetchSize, }, Backend: &backend.Config{ AdminUser: server.DefaultAdminUser,