Skip to content

Commit

Permalink
feat(collector): introduce the hotspot detector into go-collector (#1943
Browse files Browse the repository at this point in the history
)
  • Loading branch information
empiredan authored Mar 14, 2024
1 parent 261c95e commit 38fa72e
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 9 deletions.
4 changes: 2 additions & 2 deletions collector/avail/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

// Detector periodically checks the service availability of the Pegasus cluster.
type Detector interface {
Start(tom *tomb.Tomb) error
Run(tom *tomb.Tomb) error
}

// NewDetector returns a service-availability detector.
Expand Down Expand Up @@ -96,7 +96,7 @@ type pegasusDetector struct {
partitionCount int
}

func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
func (d *pegasusDetector) Run(tom *tomb.Tomb) error {
var err error
// Open the detect table.
d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
Expand Down
3 changes: 3 additions & 0 deletions collector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ falcon_agent:

availablity_detect:
table_name : test

hotspot:
partition_detect_interval : 10s
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,43 @@
// under the License.

package hotspot

import (
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)

type PartitionDetector interface {
Run(tom *tomb.Tomb) error
}

type PartitionDetectorConfig struct {
DetectInterval time.Duration
}

func NewPartitionDetector(conf PartitionDetectorConfig) PartitionDetector {
return &partitionDetector{
detectInterval: conf.DetectInterval,
}
}

type partitionDetector struct {
detectInterval time.Duration
}

func (d *partitionDetector) Run(tom *tomb.Tomb) error {
for {
select {
case <-time.After(d.detectInterval):
d.detect()
case <-tom.Dying():
log.Info("Hotspot partition detector exited.")
return nil
}
}
}

func (d *partitionDetector) detect() {
}
27 changes: 22 additions & 5 deletions collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"syscall"

"github.com/apache/incubator-pegasus/collector/avail"
"github.com/apache/incubator-pegasus/collector/hotspot"
"github.com/apache/incubator-pegasus/collector/metrics"
"github.com/apache/incubator-pegasus/collector/webui"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -87,18 +88,34 @@ func main() {

tom := &tomb.Tomb{}
setupSignalHandler(func() {
tom.Kill(errors.New("collector terminates")) // kill other goroutines
tom.Kill(errors.New("Collector terminates")) // kill other goroutines
})

tom.Go(func() error {
// Set detect inteverl and detect timeout 10s.
return avail.NewDetector(10000000000, 10000000000, 16).Start(tom)
return avail.NewDetector(10000000000, 10000000000, 16).Run(tom)
})

tom.Go(func() error {
return metrics.NewMetaServerMetricCollector().Start(tom)
return metrics.NewMetaServerMetricCollector().Run(tom)
})

tom.Go(func() error {
return metrics.NewReplicaServerMetricCollector().Start(tom)
return metrics.NewReplicaServerMetricCollector().Run(tom)
})

<-tom.Dead() // gracefully wait until all goroutines dead
tom.Go(func() error {
conf := hotspot.PartitionDetectorConfig{
DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"),
}
return hotspot.NewPartitionDetector(conf).Run(tom)
})

err := tom.Wait()
if err != nil {
log.Error("Collector exited abnormally:", err)
return
}

log.Info("Collector exited normally.")
}
4 changes: 2 additions & 2 deletions collector/metrics/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var SummaryMetricsMap map[string]prometheus.Summary
var TableNameByID map[string]string

type MetricCollector interface {
Start(tom *tomb.Tomb) error
Run(tom *tomb.Tomb) error
}

func NewMetricCollector(
Expand All @@ -79,7 +79,7 @@ type Collector struct {
role string
}

func (collector *Collector) Start(tom *tomb.Tomb) error {
func (collector *Collector) Run(tom *tomb.Tomb) error {
ticker := time.NewTicker(collector.detectInterval)
for {
select {
Expand Down

0 comments on commit 38fa72e

Please sign in to comment.