Skip to content

Commit

Permalink
collector/filesystem: s/MNT_NOWAIT/MNT_WAIT
Browse files Browse the repository at this point in the history
`getfsstat(2)` spec mentions that using `MNT_NOWAIT` will return the
information it has available without requesting an update from each file
system. Hence, use `MNT_WAIT` in place of the earlier used mode, and
make changes to the affected collectors to avoid being stuck for long
intervals.

Fixes: #1498

Signed-off-by: Pranshu Srivastava <rexagod@gmail.com>
  • Loading branch information
rexagod committed Mar 18, 2024
1 parent 32ac7f4 commit 9b0988a
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 105 deletions.
26 changes: 19 additions & 7 deletions collector/filesystem_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package collector

import (
"errors"
"time"
"unsafe"

"github.com/go-kit/log/level"
Expand All @@ -38,17 +39,28 @@ const (
readOnly = 0x1 // MNT_RDONLY
)

// Expose filesystem fullness.
// GetStats exposes filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, err error) {
var mntbuf *C.struct_statfs
count := C.getmntinfo(&mntbuf, C.MNT_NOWAIT)
if count == 0 {
return nil, errors.New("getmntinfo() failed")
// `getmntinfo` relies on `getfsstat` in some variants, and is blocking in general.
count := 0
countCh := make(chan int, 1)
var mountBuf *C.struct_statfs
go func(mountBuf **C.struct_statfs) {
countCh <- int(C.getmntinfo(mountBuf, C.MNT_WAIT))
close(countCh)
}(&mountBuf)
select {
case count = <-countCh:
if count <= 0 {
return nil, errors.New("getmntinfo failed")
}
case <-time.After(*mountTimeout):
return nil, errors.New("getmntinfo timed out")
}

mnt := (*[1 << 20]C.struct_statfs)(unsafe.Pointer(mntbuf))
mnt := (*[1 << 20]C.struct_statfs)(unsafe.Pointer(mountBuf))
stats = []filesystemStats{}
for i := 0; i < int(count); i++ {
for i := 0; i < count; i++ {
mountpoint := C.GoString(&mnt[i].f_mntonname[0])
if c.excludedMountPointsPattern.MatchString(mountpoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", mountpoint)
Expand Down
3 changes: 3 additions & 0 deletions collector/filesystem_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ var (
"collector.filesystem.ignored-mount-points",
"Regexp of mount points to ignore for filesystem collector.",
).Hidden().String()
mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()

fsTypesExcludeSet bool
fsTypesExclude = kingpin.Flag(
Expand Down
43 changes: 36 additions & 7 deletions collector/filesystem_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package collector

import (
"errors"
"time"

"github.com/go-kit/log/level"
"golang.org/x/sys/unix"
)
Expand All @@ -28,16 +31,42 @@ const (

// Expose filesystem fullness.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
n, err := unix.Getfsstat(nil, unix.MNT_NOWAIT)
if err != nil {
var mountPointCount int
nChan := make(chan int, 1)
errChan := make(chan error, 1)
go func() {
var err error
var n int
n, err = unix.Getfsstat(nil, unix.MNT_WAIT)
if err != nil {
errChan <- err
return
}
nChan <- n
}()
select {
case mountPointCount = <-nChan:
case err := <-errChan:
return nil, err
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
buf := make([]unix.Statfs_t, n)
_, err = unix.Getfsstat(buf, unix.MNT_NOWAIT)
if err != nil {
return nil, err

buf := make([]unix.Statfs_t, mountPointCount)
go func(buf []unix.Statfs_t) {
_, err := unix.Getfsstat(buf, unix.MNT_WAIT)
errChan <- err
}(buf)
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
stats := []filesystemStats{}

var stats []filesystemStats
for _, fs := range buf {
mountpoint := unix.ByteSliceToString(fs.Mntonname[:])
if c.excludedMountPointsPattern.MatchString(mountpoint) {
Expand Down
138 changes: 58 additions & 80 deletions collector/filesystem_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,114 +37,115 @@ const (
defFSTypesExcluded = "^(autofs|binfmt_misc|bpf|cgroup2?|configfs|debugfs|devpts|devtmpfs|fusectl|hugetlbfs|iso9660|mqueue|nsfs|overlay|proc|procfs|pstore|rpc_pipefs|securityfs|selinuxfs|squashfs|sysfs|tracefs)$"
)

var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
"how long to wait for a mount to respond before marking it as stale").
Hidden().Default("5s").Duration()
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
"how many stat calls to process simultaneously").
Hidden().Default("4").Int()
var stuckMounts = make(map[string]struct{})
var stuckMountsMtx = &sync.Mutex{}
var stuckMountsMap = make(map[string]struct{})
var stuckMountsMutex = &sync.Mutex{}

// GetStats returns filesystem stats.
func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
mps, err := mountPointDetails(c.logger)
fsLabels, err := mountPointDetails(c.logger)
if err != nil {
return nil, err
}
stats := []filesystemStats{}
labelChan := make(chan filesystemLabels)
statChan := make(chan filesystemStats)
var fsStats []filesystemStats
fsLabelChan := make(chan filesystemLabels)
fsStatChan := make(chan filesystemStats)
wg := sync.WaitGroup{}

workerCount := *statWorkerCount
if workerCount < 1 {
workerCount = 1
}

for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for labels := range labelChan {
statChan <- c.processStat(labels)
for fsLabel := range fsLabelChan {
fsStatChan <- c.processStat(fsLabel)
}
}()
}

go func() {
for _, labels := range mps {
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
for _, fsLabel := range fsLabels {
if c.excludedMountPointsPattern.MatchString(fsLabel.mountPoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", fsLabel.mountPoint)
continue
}
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
continue
}

stuckMountsMtx.Lock()
if _, ok := stuckMounts[labels.mountPoint]; ok {
labels.deviceError = "mountpoint timeout"
stats = append(stats, filesystemStats{
labels: labels,
deviceError: 1,
})
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
stuckMountsMtx.Unlock()
if c.excludedFSTypesPattern.MatchString(fsLabel.fsType) {
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", fsLabel.fsType)
continue
}

stuckMountsMtx.Unlock()
labelChan <- labels
fsLabelChan <- fsLabel
}
close(labelChan)
close(fsLabelChan)
wg.Wait()
close(statChan)
close(fsStatChan)
}()

for stat := range statChan {
stats = append(stats, stat)
for fsStat := range fsStatChan {
fsStats = append(fsStats, fsStat)
}
return stats, nil
return fsStats, nil
}

func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
func (c *filesystemCollector) processStat(fsLabel filesystemLabels) filesystemStats {
var ro float64
for _, option := range strings.Split(labels.options, ",") {
for _, option := range strings.Split(fsLabel.options, ",") {
if option == "ro" {
ro = 1
break
}
}

success := make(chan struct{})
go stuckMountWatcher(labels.mountPoint, success, c.logger)

// If the mount point is stuck, mark it as such and return early.
// This is done to avoid blocking the stat call indefinitely.
// NOTE: For instance, this can happen when an NFS mount is unreachable.
buf := new(unix.Statfs_t)
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
stuckMountsMtx.Lock()
close(success)

// If the mount has been marked as stuck, unmark it and log it's recovery.
if _, ok := stuckMounts[labels.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
delete(stuckMounts, labels.mountPoint)
statFsErrChan := make(chan error, 1)
go func(buf *unix.Statfs_t) {
statFsErrChan <- unix.Statfs(rootfsFilePath(fsLabel.mountPoint), buf)
close(statFsErrChan)
}(buf)

select {
case err := <-statFsErrChan:
if err != nil {
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(fsLabel.mountPoint), "err", err)
fsLabel.deviceError = err.Error()
}
case <-time.After(*mountTimeout):
stuckMountsMutex.Lock()
if _, ok := stuckMountsMap[fsLabel.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", fsLabel.mountPoint)
stuckMountsMap[fsLabel.mountPoint] = struct{}{}
fsLabel.deviceError = "mountpoint timeout"
}
stuckMountsMutex.Unlock()
}
stuckMountsMtx.Unlock()

if err != nil {
labels.deviceError = err.Error()
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
// Check if the mount point has recovered and remove it from the stuck map.
if _, isOpen := <-statFsErrChan; !isOpen {
stuckMountsMutex.Lock()
if _, ok := stuckMountsMap[fsLabel.mountPoint]; ok {
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", fsLabel.mountPoint)
delete(stuckMountsMap, fsLabel.mountPoint)
}
stuckMountsMutex.Unlock()
}

// If the mount point is stuck or statfs errored, mark it as such and return.
if fsLabel.deviceError != "" {
return filesystemStats{
labels: labels,
labels: fsLabel,
deviceError: 1,
ro: ro,
}
}

return filesystemStats{
labels: labels,
labels: fsLabel,
size: float64(buf.Blocks) * float64(buf.Bsize),
free: float64(buf.Bfree) * float64(buf.Bsize),
avail: float64(buf.Bavail) * float64(buf.Bsize),
Expand All @@ -154,29 +155,6 @@ func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemSta
}
}

// stuckMountWatcher listens on the given success channel and if the channel closes
// then the watcher does nothing. If instead the timeout is reached, the
// mount point that is being watched is marked as stuck.
func stuckMountWatcher(mountPoint string, success chan struct{}, logger log.Logger) {
mountCheckTimer := time.NewTimer(*mountTimeout)
defer mountCheckTimer.Stop()
select {
case <-success:
// Success
case <-mountCheckTimer.C:
// Timed out, mark mount as stuck
stuckMountsMtx.Lock()
select {
case <-success:
// Success came in just after the timeout was reached, don't label the mount as stuck
default:
level.Debug(logger).Log("msg", "Mount point timed out, it is being labeled as stuck and will not be monitored", "mountpoint", mountPoint)
stuckMounts[mountPoint] = struct{}{}
}
stuckMountsMtx.Unlock()
}
}

func mountPointDetails(logger log.Logger) ([]filesystemLabels, error) {
file, err := os.Open(procFilePath("1/mounts"))
if errors.Is(err, os.ErrNotExist) {
Expand Down
49 changes: 38 additions & 11 deletions collector/filesystem_openbsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package collector

import (
"errors"
"time"

"github.com/go-kit/log/level"
"golang.org/x/sys/unix"
)
Expand All @@ -26,21 +29,45 @@ const (
defFSTypesExcluded = "^devfs$"
)

// Expose filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, err error) {
var mnt []unix.Statfs_t
size, err := unix.Getfsstat(mnt, unix.MNT_NOWAIT)
if err != nil {
return nil, err
// GetStats exposes filesystem fullness.
func (c *filesystemCollector) GetStats() (stats []filesystemStats, fsstatErr error) {
var mountPointCount int
nChan := make(chan int, 1)
errChan := make(chan error, 1)
go func() {
var statErr error
var n int
n, statErr = unix.Getfsstat(nil, unix.MNT_WAIT)
if statErr != nil {
errChan <- statErr
return
}
nChan <- n
}()
select {
case mountPointCount = <-nChan:
case statErr := <-errChan:
return nil, statErr
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}
mnt = make([]unix.Statfs_t, size)
_, err = unix.Getfsstat(mnt, unix.MNT_NOWAIT)
if err != nil {
return nil, err

buf := make([]unix.Statfs_t, mountPointCount)
go func(buf []unix.Statfs_t) {
_, fsstatErr = unix.Getfsstat(buf, unix.MNT_WAIT)
errChan <- fsstatErr
}(buf)
select {
case err := <-errChan:
if err != nil {
return nil, err
}
case <-time.After(*mountTimeout):
return nil, errors.New("getfsstat timed out")
}

stats = []filesystemStats{}
for _, v := range mnt {
for _, v := range buf {
mountpoint := unix.ByteSliceToString(v.F_mntonname[:])
if c.excludedMountPointsPattern.MatchString(mountpoint) {
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", mountpoint)
Expand Down

0 comments on commit 9b0988a

Please sign in to comment.