Skip to content

Commit

Permalink
metricbeat/module/mongodb: Improve logic to calculate oplog info and …
Browse files Browse the repository at this point in the history
…window (#42224)
  • Loading branch information
shmsr authored Jan 27, 2025
1 parent cb34518 commit b8ce873
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 65 deletions.
8 changes: 2 additions & 6 deletions metricbeat/module/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func ParseURL(module mb.Module, host string) (mb.HostData, error) {
}

func NewClient(config ModuleConfig, uri string, timeout time.Duration, mode readpref.Mode) (*mongo.Client, error) {

clientOptions := options.Client()

// options.Credentials must be nil for the driver to work properly if no auth is provided. Zero values breaks
Expand All @@ -143,12 +142,12 @@ func NewClient(config ModuleConfig, uri string, timeout time.Duration, mode read
if mode == 0 {
mode = readpref.NearestMode
}

readPreference, err := readpref.New(mode)
if err != nil {
return nil, err
}
clientOptions.SetReadPreference(readPreference)

clientOptions.SetConnectTimeout(timeout)

if config.TLS.IsEnabled() {
Expand All @@ -160,13 +159,10 @@ func NewClient(config ModuleConfig, uri string, timeout time.Duration, mode read
clientOptions.SetTLSConfig(tlsConfig.ToConfig())
}

client, err := mongo.NewClient(clientOptions)
client, err := mongo.Connect(context.Background(), clientOptions)
if err != nil {
return nil, fmt.Errorf("could not create mongodb client: %w", err)
}

if err = client.Connect(context.Background()); err != nil {
return client, fmt.Errorf("could not connect to mongodb: %w", err)
}
return client, nil
}
139 changes: 80 additions & 59 deletions metricbeat/module/mongodb/replstatus/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ package replstatus

import (
"context"
"errors"
"fmt"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type oplogInfo struct {
allocated int64
used float64
firstTs uint32
lastTs uint32
diff uint32
firstTs int64
lastTs int64
diff int64
}

// CollSize contains data about collection size
Expand All @@ -43,85 +43,106 @@ type CollSize struct {

const oplogCol = "oplog.rs"

// getReplicationInfo returns oplog info from local.oplog.rs
func getReplicationInfo(client *mongo.Client) (*oplogInfo, error) {
// get oplog.rs collection
// Get oplog collection info from local.oplog.rs (<db>.<collection>)
db := client.Database("local")

collections, err := db.ListCollectionNames(context.Background(), bson.D{})
if err != nil {
return nil, fmt.Errorf("could not retrieve collection names: %w", err)
}

if !contains(collections, oplogCol) {
return nil, errors.New("collection oplog.rs was not found")
// NOTE(shmsr):
// https://www.mongodb.com/docs/manual/reference/command/collStats/#syntax
// "scale" field is omitted here as it is by default 1, i.e., it return sizes in bytes.
//
// Also, note that collStats is deprecated since v6.2 but as we support older
// versions i.e., >= 5.0, let's keep it for now as this still works.
// TODO(shmsr): For newers versions, we can use db.collection.stats()
// https://www.mongodb.com/docs/manual/reference/method/db.collection.stats/#mongodb-method-db.collection.stats
// or use this: https://github.com/percona/mongodb_exporter/blob/95d1865e34940d0d610bb1fbff9745bc66ddbc73/exporter/collstats_collector.go#L100
res := db.RunCommand(context.Background(), bson.D{
{Key: "collStats", Value: oplogCol},
})
if err := res.Err(); err != nil {
return nil, fmt.Errorf("collStats command failed: %w", err)
}

collection := db.Collection(oplogCol)

// get oplog size
// Get MaxSize and Size from collStats by using db.runCommand
var oplogSize CollSize
res := db.RunCommand(context.Background(), bson.D{bson.E{Key: "collStats", Value: oplogCol}})
if err = res.Err(); err != nil {
return nil, fmt.Errorf("'collStats' command returned an error: %w", err)
if err := res.Decode(&oplogSize); err != nil {
return nil, fmt.Errorf("could not decode mongodb oplog size: %w", err)
}

if err = res.Decode(&oplogSize); err != nil {
return nil, fmt.Errorf("could not decode mongodb op log size: %w", err)
}

// get first and last items in the oplog
collection := db.Collection(oplogCol)
firstTs, lastTs, err := getOpTimestamp(collection)
if err != nil {
return nil, fmt.Errorf("could not get operation timestamp in op log: %w", err)
return nil, fmt.Errorf("could not get operation timestamps from oplog: %w", err)
}

diff := lastTs - firstTs

return &oplogInfo{
info := &oplogInfo{
allocated: oplogSize.MaxSize,
used: oplogSize.Size,
firstTs: firstTs,
lastTs: lastTs,
diff: diff,
}, nil
}
diff: lastTs - firstTs,
}

func getOpTimestamp(collection *mongo.Collection) (uint32, uint32, error) {
return info, nil
}

// Find both first and last timestamps using $min and $max
pipeline := bson.A{
bson.M{"$group": bson.M{"_id": 1, "minTS": bson.M{"$min": "$ts"}, "maxTS": bson.M{"$max": "$ts"}}},
// getOpTimestamp returns the first and last timestamp of the oplog collection.
func getOpTimestamp(collection *mongo.Collection) (int64, int64, error) {
// NOTE(shmsr):
//
// When you do db.getReplicationInfo() in monogo shell (mongosh), you can see
// {
// ...
// tFirst: 'Tue Jan 07 2025 22:33:28 GMT+0530 (India Standard Time)',
// tLast: 'Wed Jan 08 2025 11:45:07 GMT+0530 (India Standard Time)',
// now: 'Wed Jan 08 2025 11:45:14 GMT+0530 (India Standard Time)'
// }
// i.e., we get tFirst and tLast from oplog.rs which is the first and last
// timestamp of the oplog.
// Source from the same is here:
// https://github.com/mongodb/mongo/blob/20cbee37a0ee4d40b35d08b6a34ade81252f86a8/src/mongo/shell/db.js#L863
// This is how they calculate tFirst and tLast:
// https://github.com/mongodb/mongo/blob/20cbee37a0ee4d40b35d08b6a34ade81252f86a8/src/mongo/shell/db.js#L889
// So ideally, we will replicate the same logic here:
// var firstc = ol.find().sort({$natural: 1}).limit(1);
// var lastc = ol.find().sort({$natural: -1}).limit(1);
//
// oplog.rs is designed to scanned in natural ($natural) order. So, when
// querying without any sort, it will return the first entry in natural order.
// When we sort in reverse natural order (i.e., $natural: -1), it will return
// the last entry in natural order.

// NOTE(shmsr):
// Timeout is set to 10m for: https://github.com/elastic/beats/pull/42224#discussion_r1928519896
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

var firstDoc struct {
Timestamp time.Time `bson:"ts"`
}

cursor, err := collection.Aggregate(context.Background(), pipeline)
// Get oldest entry using natural order
firstOpts := options.
FindOne().
SetProjection(bson.D{{Key: "ts", Value: 1}})
err := collection.FindOne(ctx, bson.D{}, firstOpts).Decode(&firstDoc)
if err != nil {
return 0, 0, fmt.Errorf("could not get operation timestamps in op log: %w", err)
return 0, 0, fmt.Errorf("first timestamp query failed for collection %s: %w", collection.Name(), err)
}
defer cursor.Close(context.Background())

var result struct {
MinTS time.Time `bson:"minTS"`
MaxTS time.Time `bson:"maxTS"`
// Get newest entry using reverse natural order
var lastDoc struct {
Timestamp time.Time `bson:"ts"`
}

if !cursor.Next(context.Background()) {
return 0, 0, errors.New("no documents found in op log")
}
if err := cursor.Decode(&result); err != nil {
return 0, 0, fmt.Errorf("error decoding response for timestamps: %w", err)
lastOpts := options.
FindOne().
SetProjection(bson.D{{Key: "ts", Value: 1}}).
SetSort(bson.D{{Key: "$natural", Value: -1}})
err = collection.FindOne(ctx, bson.D{}, lastOpts).Decode(&lastDoc)
if err != nil {
return 0, 0, fmt.Errorf("last timestamp query failed: %w", err)
}

minTS := uint32(result.MinTS.Unix())
maxTS := uint32(result.MaxTS.Unix())

return minTS, maxTS, nil
}

func contains(s []string, x string) bool {
for _, n := range s {
if x == n {
return true
}
}
return false
return firstDoc.Timestamp.Unix(), lastDoc.Timestamp.Unix(), nil
}

0 comments on commit b8ce873

Please sign in to comment.