diff --git a/metricbeat/module/mongodb/mongodb.go b/metricbeat/module/mongodb/mongodb.go index 50a97e0df000..daf15c774c5a 100644 --- a/metricbeat/module/mongodb/mongodb.go +++ b/metricbeat/module/mongodb/mongodb.go @@ -118,7 +118,6 @@ func ParseURL(module mb.Module, host string) (mb.HostData, error) { } func NewClient(config ModuleConfig, uri string, timeout time.Duration, mode readpref.Mode) (*mongo.Client, error) { - clientOptions := options.Client() // options.Credentials must be nil for the driver to work properly if no auth is provided. Zero values breaks @@ -143,12 +142,12 @@ func NewClient(config ModuleConfig, uri string, timeout time.Duration, mode read if mode == 0 { mode = readpref.NearestMode } - readPreference, err := readpref.New(mode) if err != nil { return nil, err } clientOptions.SetReadPreference(readPreference) + clientOptions.SetConnectTimeout(timeout) if config.TLS.IsEnabled() { @@ -160,13 +159,10 @@ func NewClient(config ModuleConfig, uri string, timeout time.Duration, mode read clientOptions.SetTLSConfig(tlsConfig.ToConfig()) } - client, err := mongo.NewClient(clientOptions) + client, err := mongo.Connect(context.Background(), clientOptions) if err != nil { return nil, fmt.Errorf("could not create mongodb client: %w", err) } - if err = client.Connect(context.Background()); err != nil { - return client, fmt.Errorf("could not connect to mongodb: %w", err) - } return client, nil } diff --git a/metricbeat/module/mongodb/replstatus/info.go b/metricbeat/module/mongodb/replstatus/info.go index a444fa03b1ff..1e576fcde997 100644 --- a/metricbeat/module/mongodb/replstatus/info.go +++ b/metricbeat/module/mongodb/replstatus/info.go @@ -19,20 +19,20 @@ package replstatus import ( "context" - "errors" "fmt" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) type oplogInfo struct { allocated int64 used float64 - firstTs uint32 - lastTs uint32 - diff uint32 + firstTs int64 + lastTs int64 + diff int64 } // CollSize contains data about collection size @@ -43,85 +43,106 @@ type CollSize struct { const oplogCol = "oplog.rs" +// getReplicationInfo returns oplog info from local.oplog.rs func getReplicationInfo(client *mongo.Client) (*oplogInfo, error) { - // get oplog.rs collection + // Get oplog collection info from local.oplog.rs (.) db := client.Database("local") - collections, err := db.ListCollectionNames(context.Background(), bson.D{}) - if err != nil { - return nil, fmt.Errorf("could not retrieve collection names: %w", err) - } - - if !contains(collections, oplogCol) { - return nil, errors.New("collection oplog.rs was not found") + // NOTE(shmsr): + // https://www.mongodb.com/docs/manual/reference/command/collStats/#syntax + // "scale" field is omitted here as it is by default 1, i.e., it return sizes in bytes. + // + // Also, note that collStats is deprecated since v6.2 but as we support older + // versions i.e., >= 5.0, let's keep it for now as this still works. + // TODO(shmsr): For newers versions, we can use db.collection.stats() + // https://www.mongodb.com/docs/manual/reference/method/db.collection.stats/#mongodb-method-db.collection.stats + // or use this: https://github.com/percona/mongodb_exporter/blob/95d1865e34940d0d610bb1fbff9745bc66ddbc73/exporter/collstats_collector.go#L100 + res := db.RunCommand(context.Background(), bson.D{ + {Key: "collStats", Value: oplogCol}, + }) + if err := res.Err(); err != nil { + return nil, fmt.Errorf("collStats command failed: %w", err) } - collection := db.Collection(oplogCol) - - // get oplog size + // Get MaxSize and Size from collStats by using db.runCommand var oplogSize CollSize - res := db.RunCommand(context.Background(), bson.D{bson.E{Key: "collStats", Value: oplogCol}}) - if err = res.Err(); err != nil { - return nil, fmt.Errorf("'collStats' command returned an error: %w", err) + if err := res.Decode(&oplogSize); err != nil { + return nil, fmt.Errorf("could not decode mongodb oplog size: %w", err) } - if err = res.Decode(&oplogSize); err != nil { - return nil, fmt.Errorf("could not decode mongodb op log size: %w", err) - } - - // get first and last items in the oplog + collection := db.Collection(oplogCol) firstTs, lastTs, err := getOpTimestamp(collection) if err != nil { - return nil, fmt.Errorf("could not get operation timestamp in op log: %w", err) + return nil, fmt.Errorf("could not get operation timestamps from oplog: %w", err) } - diff := lastTs - firstTs - - return &oplogInfo{ + info := &oplogInfo{ allocated: oplogSize.MaxSize, used: oplogSize.Size, firstTs: firstTs, lastTs: lastTs, - diff: diff, - }, nil -} + diff: lastTs - firstTs, + } -func getOpTimestamp(collection *mongo.Collection) (uint32, uint32, error) { + return info, nil +} - // Find both first and last timestamps using $min and $max - pipeline := bson.A{ - bson.M{"$group": bson.M{"_id": 1, "minTS": bson.M{"$min": "$ts"}, "maxTS": bson.M{"$max": "$ts"}}}, +// getOpTimestamp returns the first and last timestamp of the oplog collection. +func getOpTimestamp(collection *mongo.Collection) (int64, int64, error) { + // NOTE(shmsr): + // + // When you do db.getReplicationInfo() in monogo shell (mongosh), you can see + // { + // ... + // tFirst: 'Tue Jan 07 2025 22:33:28 GMT+0530 (India Standard Time)', + // tLast: 'Wed Jan 08 2025 11:45:07 GMT+0530 (India Standard Time)', + // now: 'Wed Jan 08 2025 11:45:14 GMT+0530 (India Standard Time)' + // } + // i.e., we get tFirst and tLast from oplog.rs which is the first and last + // timestamp of the oplog. + // Source from the same is here: + // https://github.com/mongodb/mongo/blob/20cbee37a0ee4d40b35d08b6a34ade81252f86a8/src/mongo/shell/db.js#L863 + // This is how they calculate tFirst and tLast: + // https://github.com/mongodb/mongo/blob/20cbee37a0ee4d40b35d08b6a34ade81252f86a8/src/mongo/shell/db.js#L889 + // So ideally, we will replicate the same logic here: + // var firstc = ol.find().sort({$natural: 1}).limit(1); + // var lastc = ol.find().sort({$natural: -1}).limit(1); + // + // oplog.rs is designed to scanned in natural ($natural) order. So, when + // querying without any sort, it will return the first entry in natural order. + // When we sort in reverse natural order (i.e., $natural: -1), it will return + // the last entry in natural order. + + // NOTE(shmsr): + // Timeout is set to 10m for: https://github.com/elastic/beats/pull/42224#discussion_r1928519896 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + var firstDoc struct { + Timestamp time.Time `bson:"ts"` } - cursor, err := collection.Aggregate(context.Background(), pipeline) + // Get oldest entry using natural order + firstOpts := options. + FindOne(). + SetProjection(bson.D{{Key: "ts", Value: 1}}) + err := collection.FindOne(ctx, bson.D{}, firstOpts).Decode(&firstDoc) if err != nil { - return 0, 0, fmt.Errorf("could not get operation timestamps in op log: %w", err) + return 0, 0, fmt.Errorf("first timestamp query failed for collection %s: %w", collection.Name(), err) } - defer cursor.Close(context.Background()) - var result struct { - MinTS time.Time `bson:"minTS"` - MaxTS time.Time `bson:"maxTS"` + // Get newest entry using reverse natural order + var lastDoc struct { + Timestamp time.Time `bson:"ts"` } - - if !cursor.Next(context.Background()) { - return 0, 0, errors.New("no documents found in op log") - } - if err := cursor.Decode(&result); err != nil { - return 0, 0, fmt.Errorf("error decoding response for timestamps: %w", err) + lastOpts := options. + FindOne(). + SetProjection(bson.D{{Key: "ts", Value: 1}}). + SetSort(bson.D{{Key: "$natural", Value: -1}}) + err = collection.FindOne(ctx, bson.D{}, lastOpts).Decode(&lastDoc) + if err != nil { + return 0, 0, fmt.Errorf("last timestamp query failed: %w", err) } - minTS := uint32(result.MinTS.Unix()) - maxTS := uint32(result.MaxTS.Unix()) - - return minTS, maxTS, nil -} - -func contains(s []string, x string) bool { - for _, n := range s { - if x == n { - return true - } - } - return false + return firstDoc.Timestamp.Unix(), lastDoc.Timestamp.Unix(), nil }