diff --git a/cmd/yorkie/server.go b/cmd/yorkie/server.go index 583188698..af94648a4 100644 --- a/cmd/yorkie/server.go +++ b/cmd/yorkie/server.go @@ -227,6 +227,12 @@ func init() { server.DefaultHousekeepingCandidatesLimitPerProject, "candidates limit per project for a single housekeeping run", ) + cmd.Flags().IntVar( + &conf.Housekeeping.HousekeepingProjectFetchSize, + "housekeeping-project-fetch-size", + server.DefaultHousekeepingProjectFetchSize, + "housekeeping project fetch size for a single housekeeping run", + ) cmd.Flags().StringVar( &mongoConnectionURI, "mongo-connection-uri", diff --git a/server/backend/database/database.go b/server/backend/database/database.go index 2d4922fee..967534e5f 100644 --- a/server/backend/database/database.go +++ b/server/backend/database/database.go @@ -138,6 +138,8 @@ type Database interface { FindDeactivateCandidates( ctx context.Context, candidatesLimitPerProject int, + projectFetchSize int, + housekeepingLastProjectID *types.ID, ) ([]*ClientInfo, error) // FindDocInfoByKey finds the document of the given key. diff --git a/server/backend/database/memory/database.go b/server/backend/database/memory/database.go index 93882c313..88e16b864 100644 --- a/server/backend/database/memory/database.go +++ b/server/backend/database/memory/database.go @@ -18,6 +18,7 @@ package memory import ( + "bytes" "context" "fmt" gotime "time" @@ -224,16 +225,15 @@ func (d *DB) CreateProjectInfo( return info, nil } -// ListAllProjectInfos returns all project infos. -func (d *DB) listAllProjectInfos( +// listProjectInfos returns all project infos rotationally. +func (d *DB) listProjectInfos( ctx context.Context, + pageSize int, + housekeepingLastProjectID *types.ID, ) ([]*database.ProjectInfo, error) { txn := d.db.Txn(false) defer txn.Abort() - // TODO(krapie): txn.Get() loads all projects in memory, - // which will cause performance issue as number of projects in DB grows. - // Therefore, pagination of projects is needed to avoid this issue. iter, err := txn.Get( tblProjects, "id", @@ -242,12 +242,35 @@ func (d *DB) listAllProjectInfos( return nil, fmt.Errorf("fetch all projects: %w", err) } + lastIDBytes, err := housekeepingLastProjectID.Bytes() + if err != nil { + return nil, fmt.Errorf("decode last project id: %w", err) + } + var infos []*database.ProjectInfo - for raw := iter.Next(); raw != nil; raw = iter.Next() { + for i := 0; i < pageSize; { + raw := iter.Next() + if raw == nil { + *housekeepingLastProjectID = database.DefaultProjectID + break + } + info := raw.(*database.ProjectInfo).DeepCopy() - infos = append(infos, info) - } + idBytes, err := info.ID.Bytes() + if err != nil { + return nil, fmt.Errorf("decode project id: %w", err) + } + + if bytes.Compare(idBytes, lastIDBytes) > 0 { + infos = append(infos, info) + i++ + } + + if i == pageSize { + *housekeepingLastProjectID = infos[len(infos)-1].ID + } + } return infos, nil } @@ -599,8 +622,10 @@ func (d *DB) findDeactivateCandidatesPerProject( func (d *DB) FindDeactivateCandidates( ctx context.Context, candidatesLimitPerProject int, + projectFetchSize int, + housekeepingLastProjectID *types.ID, ) ([]*database.ClientInfo, error) { - projects, err := d.listAllProjectInfos(ctx) + projects, err := d.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID) if err != nil { return nil, err } diff --git a/server/backend/database/memory/housekeeping_test.go b/server/backend/database/memory/housekeeping_test.go index f71a91943..9c084c79d 100644 --- a/server/backend/database/memory/housekeeping_test.go +++ b/server/backend/database/memory/housekeeping_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" monkey "github.com/undefinedlabs/go-mpatch" + "github.com/yorkie-team/yorkie/server/backend/database" "github.com/yorkie-team/yorkie/server/backend/database/memory" ) @@ -39,7 +40,10 @@ func TestHousekeeping(t *testing.T) { ctx := context.Background() clientDeactivateThreshold := "23h" - _, project, err := memdb.EnsureDefaultUserAndProject(ctx, "test", "test", clientDeactivateThreshold) + + userInfo, err := memdb.CreateUserInfo(ctx, "test", "test") + assert.NoError(t, err) + project, err := memdb.CreateProjectInfo(ctx, database.DefaultProjectName, userInfo.ID, clientDeactivateThreshold) assert.NoError(t, err) yesterday := gotime.Now().Add(-24 * gotime.Hour) @@ -59,9 +63,12 @@ func TestHousekeeping(t *testing.T) { clientC, err := memdb.ActivateClient(ctx, project.ID, fmt.Sprintf("%s-C", t.Name())) assert.NoError(t, err) + housekeepingLastProjectID := database.DefaultProjectID candidates, err := memdb.FindDeactivateCandidates( ctx, 10, + 10, + &housekeepingLastProjectID, ) assert.NoError(t, err) assert.Len(t, candidates, 2) diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index bdcdc460a..20ddf7f99 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -235,21 +235,38 @@ func (c *Client) CreateProjectInfo( return info, nil } -// ListAllProjectInfos returns all project infos. -func (c *Client) listAllProjectInfos( +// listProjectInfos returns all project infos rotationally. +func (c *Client) listProjectInfos( ctx context.Context, + pageSize int, + housekeepingLastProjectID *types.ID, ) ([]*database.ProjectInfo, error) { - // TODO(krapie): Find(ctx, bson.D{{}}) loads all projects in memory, - // which will cause performance issue as number of projects in DB grows. - // Therefore, pagination of projects is needed to avoid this issue. - cursor, err := c.collection(colProjects).Find(ctx, bson.D{{}}) + encodedID, err := encodeID(*housekeepingLastProjectID) if err != nil { - return nil, fmt.Errorf("fetch all project infos: %w", err) + return nil, err + } + + opts := options.Find() + opts.SetLimit(int64(pageSize)) + + cursor, err := c.collection(colProjects).Find(ctx, bson.M{ + "_id": bson.M{ + "$gt": encodedID, + }, + }, opts) + if err != nil { + return nil, fmt.Errorf("find project infos: %w", err) } var infos []*database.ProjectInfo if err := cursor.All(ctx, &infos); err != nil { - return nil, fmt.Errorf("fetch all project infos: %w", err) + return nil, fmt.Errorf("fetch project infos: %w", err) + } + + if len(infos) < pageSize { + *housekeepingLastProjectID = database.DefaultProjectID + } else if len(infos) > 0 { + *housekeepingLastProjectID = infos[len(infos)-1].ID } return infos, nil @@ -657,8 +674,10 @@ func (c *Client) findDeactivateCandidatesPerProject( func (c *Client) FindDeactivateCandidates( ctx context.Context, candidatesLimitPerProject int, + projectFetchSize int, + housekeepingLastProjectID *types.ID, ) ([]*database.ClientInfo, error) { - projects, err := c.listAllProjectInfos(ctx) + projects, err := c.listProjectInfos(ctx, projectFetchSize, housekeepingLastProjectID) if err != nil { return nil, err } diff --git a/server/backend/housekeeping/config.go b/server/backend/housekeeping/config.go new file mode 100644 index 000000000..091ae5e5c --- /dev/null +++ b/server/backend/housekeeping/config.go @@ -0,0 +1,61 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package housekeeping + +import ( + "fmt" + "time" +) + +// Config is the configuration for the housekeeping service. +type Config struct { + // Interval is the time between housekeeping runs. + Interval string `yaml:"Interval"` + + // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. + CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` + + // HousekeepingProjectFetchSize is the maximum number of projects to be returned to deactivate candidates. + HousekeepingProjectFetchSize int `yaml:"HousekeepingProjectFetchSize"` +} + +// Validate validates the configuration. +func (c *Config) Validate() error { + if _, err := time.ParseDuration(c.Interval); err != nil { + return fmt.Errorf( + `invalid argument %s for "--housekeeping-interval" flag: %w`, + c.Interval, + err, + ) + } + + if c.CandidatesLimitPerProject <= 0 { + return fmt.Errorf( + `invalid argument %d for "--housekeeping-candidates-limit-per-project" flag`, + c.HousekeepingProjectFetchSize, + ) + } + + if c.HousekeepingProjectFetchSize <= 0 { + return fmt.Errorf( + `invalid argument %d for "--housekeeping-project-fetc-size" flag`, + c.HousekeepingProjectFetchSize, + ) + } + + return nil +} diff --git a/server/backend/housekeeping/config_test.go b/server/backend/housekeeping/config_test.go new file mode 100644 index 000000000..a67cd0b59 --- /dev/null +++ b/server/backend/housekeeping/config_test.go @@ -0,0 +1,48 @@ +/* + * Copyright 2023 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package housekeeping_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/server/backend/housekeeping" +) + +func TestConfig(t *testing.T) { + t.Run("validate test", func(t *testing.T) { + validConf := housekeeping.Config{ + Interval: "1m", + CandidatesLimitPerProject: 100, + HousekeepingProjectFetchSize: 100, + } + assert.NoError(t, validConf.Validate()) + + conf1 := validConf + conf1.Interval = "hour" + assert.Error(t, conf1.Validate()) + + conf2 := validConf + conf2.CandidatesLimitPerProject = 0 + assert.Error(t, conf2.Validate()) + + conf3 := validConf + conf3.HousekeepingProjectFetchSize = -1 + assert.Error(t, conf3.Validate()) + }) +} diff --git a/server/backend/housekeeping/housekeeping.go b/server/backend/housekeeping/housekeeping.go index d08abcbdc..0bbfcb157 100644 --- a/server/backend/housekeeping/housekeeping.go +++ b/server/backend/housekeeping/housekeeping.go @@ -24,6 +24,7 @@ import ( "fmt" "time" + "github.com/yorkie-team/yorkie/api/types" "github.com/yorkie-team/yorkie/server/backend/database" "github.com/yorkie-team/yorkie/server/backend/sync" "github.com/yorkie-team/yorkie/server/clients" @@ -34,28 +35,6 @@ const ( deactivateCandidatesKey = "housekeeping/deactivateCandidates" ) -// Config is the configuration for the housekeeping service. -type Config struct { - // Interval is the time between housekeeping runs. - Interval string `yaml:"Interval"` - - // CandidatesLimitPerProject is the maximum number of candidates to be returned per project. - CandidatesLimitPerProject int `yaml:"CandidatesLimitPerProject"` -} - -// Validate validates the configuration. -func (c *Config) Validate() error { - if _, err := time.ParseDuration(c.Interval); err != nil { - return fmt.Errorf( - `invalid argument %s for "--housekeeping-interval" flag: %w`, - c.Interval, - err, - ) - } - - return nil -} - // Housekeeping is the housekeeping service. It periodically runs housekeeping // tasks. It is responsible for deactivating clients that have not been active // for a long time. @@ -65,6 +44,7 @@ type Housekeeping struct { interval time.Duration candidatesLimitPerProject int + projectFetchSize int ctx context.Context cancelFunc context.CancelFunc @@ -106,6 +86,7 @@ func New( interval: interval, candidatesLimitPerProject: conf.CandidatesLimitPerProject, + projectFetchSize: conf.HousekeepingProjectFetchSize, ctx: ctx, cancelFunc: cancelFunc, @@ -127,9 +108,11 @@ func (h *Housekeeping) Stop() error { // run is the housekeeping loop. func (h *Housekeeping) run() { + housekeepingLastProjectID := database.DefaultProjectID + for { ctx := context.Background() - if err := h.deactivateCandidates(ctx); err != nil { + if err := h.deactivateCandidates(ctx, &housekeepingLastProjectID); err != nil { continue } @@ -142,7 +125,7 @@ func (h *Housekeeping) run() { } // deactivateCandidates deactivates candidates. -func (h *Housekeeping) deactivateCandidates(ctx context.Context) error { +func (h *Housekeeping) deactivateCandidates(ctx context.Context, housekeepingLastProjectID *types.ID) error { start := time.Now() locker, err := h.coordinator.NewLocker(ctx, deactivateCandidatesKey) if err != nil { @@ -162,6 +145,8 @@ func (h *Housekeeping) deactivateCandidates(ctx context.Context) error { candidates, err := h.database.FindDeactivateCandidates( ctx, h.candidatesLimitPerProject, + h.projectFetchSize, + housekeepingLastProjectID, ) if err != nil { return err diff --git a/server/config.go b/server/config.go index c875102af..80b0b410f 100644 --- a/server/config.go +++ b/server/config.go @@ -42,6 +42,7 @@ const ( DefaultHousekeepingInterval = 30 * time.Second DefaultHousekeepingCandidatesLimitPerProject = 500 + DefaultHousekeepingProjectFetchSize = 100 DefaultMongoConnectionURI = "mongodb://localhost:27017" DefaultMongoConnectionTimeout = 5 * time.Second @@ -229,8 +230,9 @@ func newConfig(port int, profilingPort int) *Config { Port: profilingPort, }, Housekeeping: &housekeeping.Config{ - Interval: DefaultHousekeepingInterval.String(), - CandidatesLimitPerProject: DefaultHousekeepingCandidatesLimitPerProject, + Interval: DefaultHousekeepingInterval.String(), + CandidatesLimitPerProject: DefaultHousekeepingCandidatesLimitPerProject, + HousekeepingProjectFetchSize: DefaultHousekeepingProjectFetchSize, }, Backend: &backend.Config{ ClientDeactivateThreshold: DefaultClientDeactivateThreshold, diff --git a/server/config.sample.yml b/server/config.sample.yml index 4c48b7128..cee9cfca5 100644 --- a/server/config.sample.yml +++ b/server/config.sample.yml @@ -36,6 +36,9 @@ Housekeeping: # CandidatesLimitPerProject is the maximum number of candidates to be returned per project (default: 100). CandidatesLimitPerProject: 100 + # HousekeepingProjectFetchSize is the maximum number of projects to be returned to deactivate candidates. (default: 100). + HousekeepingProjectFetchSize: 100 + # Backend is the configuration for the backend of Yorkie. Backend: # UseDefaultProject is whether to use the default project (default: true). diff --git a/server/rpc/server_test.go b/server/rpc/server_test.go index e883d0e91..0247be752 100644 --- a/server/rpc/server_test.go +++ b/server/rpc/server_test.go @@ -83,8 +83,9 @@ func TestMain(m *testing.M) { ConnectionTimeout: helper.MongoConnectionTimeout, PingTimeout: helper.MongoPingTimeout, }, &housekeeping.Config{ - Interval: helper.HousekeepingInterval.String(), - CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + Interval: helper.HousekeepingInterval.String(), + CandidatesLimitPerProject: helper.HousekeepingCandidatesLimitPerProject, + HousekeepingProjectFetchSize: helper.HousekeepingProjectFetchSize, }, met) if err != nil { log.Fatal(err) diff --git a/test/helper/helper.go b/test/helper/helper.go index ba6c2da1f..7a331e40c 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -60,6 +60,7 @@ var ( AdminPassword = server.DefaultAdminPassword HousekeepingInterval = 10 * gotime.Second HousekeepingCandidatesLimitPerProject = 10 + HousekeepingProjectFetchSize = 10 AdminTokenDuration = "10s" ClientDeactivateThreshold = "10s" @@ -221,8 +222,9 @@ func TestConfig() *server.Config { Port: ProfilingPort + portOffset, }, Housekeeping: &housekeeping.Config{ - Interval: HousekeepingInterval.String(), - CandidatesLimitPerProject: HousekeepingCandidatesLimitPerProject, + Interval: HousekeepingInterval.String(), + CandidatesLimitPerProject: HousekeepingCandidatesLimitPerProject, + HousekeepingProjectFetchSize: HousekeepingProjectFetchSize, }, Backend: &backend.Config{ AdminUser: server.DefaultAdminUser,