Skip to content

Commit

Permalink
Merge branch 'apache:master' into fix_go_client
Browse files Browse the repository at this point in the history
  • Loading branch information
lengyuexuexuan authored Mar 18, 2024
2 parents 792a990 + 38fa72e commit 23b16e9
Show file tree
Hide file tree
Showing 12 changed files with 1,097 additions and 9 deletions.
4 changes: 2 additions & 2 deletions collector/avail/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

// Detector periodically checks the service availability of the Pegasus cluster.
type Detector interface {
Start(tom *tomb.Tomb) error
Run(tom *tomb.Tomb) error
}

// NewDetector returns a service-availability detector.
Expand Down Expand Up @@ -96,7 +96,7 @@ type pegasusDetector struct {
partitionCount int
}

func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
func (d *pegasusDetector) Run(tom *tomb.Tomb) error {
var err error
// Open the detect table.
d.detectTable, err = d.client.OpenTable(context.Background(), d.detectTableName)
Expand Down
3 changes: 3 additions & 0 deletions collector/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ falcon_agent:

availablity_detect:
table_name : test

hotspot:
partition_detect_interval : 10s
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,43 @@
// under the License.

package hotspot

import (
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
)

type PartitionDetector interface {
Run(tom *tomb.Tomb) error
}

type PartitionDetectorConfig struct {
DetectInterval time.Duration
}

func NewPartitionDetector(conf PartitionDetectorConfig) PartitionDetector {
return &partitionDetector{
detectInterval: conf.DetectInterval,
}
}

type partitionDetector struct {
detectInterval time.Duration
}

func (d *partitionDetector) Run(tom *tomb.Tomb) error {
for {
select {
case <-time.After(d.detectInterval):
d.detect()
case <-tom.Dying():
log.Info("Hotspot partition detector exited.")
return nil
}
}
}

func (d *partitionDetector) detect() {
}
27 changes: 22 additions & 5 deletions collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"syscall"

"github.com/apache/incubator-pegasus/collector/avail"
"github.com/apache/incubator-pegasus/collector/hotspot"
"github.com/apache/incubator-pegasus/collector/metrics"
"github.com/apache/incubator-pegasus/collector/webui"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -87,18 +88,34 @@ func main() {

tom := &tomb.Tomb{}
setupSignalHandler(func() {
tom.Kill(errors.New("collector terminates")) // kill other goroutines
tom.Kill(errors.New("Collector terminates")) // kill other goroutines
})

tom.Go(func() error {
// Set detect inteverl and detect timeout 10s.
return avail.NewDetector(10000000000, 10000000000, 16).Start(tom)
return avail.NewDetector(10000000000, 10000000000, 16).Run(tom)
})

tom.Go(func() error {
return metrics.NewMetaServerMetricCollector().Start(tom)
return metrics.NewMetaServerMetricCollector().Run(tom)
})

tom.Go(func() error {
return metrics.NewReplicaServerMetricCollector().Start(tom)
return metrics.NewReplicaServerMetricCollector().Run(tom)
})

<-tom.Dead() // gracefully wait until all goroutines dead
tom.Go(func() error {
conf := hotspot.PartitionDetectorConfig{
DetectInterval: viper.GetDuration("hotspot.partition_detect_interval"),
}
return hotspot.NewPartitionDetector(conf).Run(tom)
})

err := tom.Wait()
if err != nil {
log.Error("Collector exited abnormally:", err)
return
}

log.Info("Collector exited normally.")
}
4 changes: 2 additions & 2 deletions collector/metrics/metric_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var SummaryMetricsMap map[string]prometheus.Summary
var TableNameByID map[string]string

type MetricCollector interface {
Start(tom *tomb.Tomb) error
Run(tom *tomb.Tomb) error
}

func NewMetricCollector(
Expand All @@ -79,7 +79,7 @@ type Collector struct {
role string
}

func (collector *Collector) Start(tom *tomb.Tomb) error {
func (collector *Collector) Run(tom *tomb.Tomb) error {
ticker := time.NewTicker(collector.detectInterval)
for {
select {
Expand Down
104 changes: 104 additions & 0 deletions go-client/idl/base/host_port.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package base

import (
"fmt"

"github.com/apache/thrift/lib/go/thrift"
)

type HostPortType int32

const (
HOST_TYPE_INVALID HostPortType = iota
HOST_TYPE_IPV4
HOST_TYPE_GROUP
)

type HostPort struct {
host string
port uint16
// TODO(yingchun): Now only support ipv4
hpType HostPortType
}

func NewHostPort(host string, port uint16) *HostPort {
return &HostPort{
host: host,
port: port,
hpType: HOST_TYPE_IPV4,
}
}

func (r *HostPort) Read(iprot thrift.TProtocol) error {
host, err := iprot.ReadString()
if err != nil {
return err
}
port, err := iprot.ReadI16()
if err != nil {
return err
}
hpType, err := iprot.ReadByte()
if err != nil {
return err
}

r.host = host
r.port = uint16(port)
r.hpType = HostPortType(hpType)
return nil
}

func (r *HostPort) Write(oprot thrift.TProtocol) error {
err := oprot.WriteString(r.host)
if err != nil {
return err
}
err = oprot.WriteI16(int16(r.port))
if err != nil {
return err
}
err = oprot.WriteByte(int8(r.hpType))
if err != nil {
return err
}
return nil
}

func (r *HostPort) GetHost() string {
return r.host
}

func (r *HostPort) GetPort() uint16 {
return r.port
}

func (r *HostPort) String() string {
if r == nil {
return "<nil>"
}
return fmt.Sprintf("HostPort(%s:%d)", r.host, r.port)
}

func (r *HostPort) GetHostPort() string {
return fmt.Sprintf("%s:%d", r.host, r.port)
}
51 changes: 51 additions & 0 deletions go-client/idl/base/host_port_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package base

import (
"testing"

"github.com/apache/thrift/lib/go/thrift"
"github.com/stretchr/testify/assert"
)

func TestHostPort(t *testing.T) {
testCases := map[string]uint16{
"localhost": 8080,
}

for host, port := range testCases {
hp := NewHostPort(host, port)
assert.Equal(t, host, hp.GetHost())
assert.Equal(t, port, hp.GetPort())

// test HostPort serialize
buf := thrift.NewTMemoryBuffer()
oprot := thrift.NewTBinaryProtocolTransport(buf)
hp.Write(oprot)

// test HostPort deserialize
readHostPort := NewHostPort("", 0)
readHostPort.Read(oprot)

// check equals
assert.Equal(t, readHostPort, hp)
}
}
1 change: 1 addition & 0 deletions src/shell/command_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef bool (*executor)(command_executor *this_, shell_context *sc, arguments a
struct command_executor
{
const char *name;
// TODO(yingchun): the 'description' is not output in the help message, fix it.
const char *description;
const char *option_usage;
executor exec;
Expand Down
85 changes: 85 additions & 0 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,91 @@ class aggregate_stats_calcs
} \
} while (0)

// A helper macro to parse command argument, the result is filled in a string vector variable named
// 'container'.
#define PARSE_STRS(container) \
do { \
const auto param = cmd(param_index++).str(); \
::dsn::utils::split_args(param.c_str(), container, ','); \
if (container.empty()) { \
fmt::print(stderr, \
"invalid command, '{}' should be in the form of 'val1,val2,val3' and " \
"should not be empty\n", \
param); \
return false; \
} \
std::set<std::string> str_set(container.begin(), container.end()); \
if (str_set.size() != container.size()) { \
fmt::print(stderr, "invalid command, '{}' has duplicate values\n", param); \
return false; \
} \
} while (false)

// A helper macro to parse command argument, the result is filled in an uint32_t variable named
// 'value'.
#define PARSE_UINT(value) \
do { \
const auto param = cmd(param_index++).str(); \
if (!::dsn::buf2uint32(param, value)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", param); \
return false; \
} \
} while (false)

// A helper macro to parse an optional command argument, the result is filled in an uint32_t
// variable 'value'.
#define PARSE_OPT_UINT(name, value, def_val) \
do { \
const auto param = cmd(name, (def_val)).str(); \
if (!::dsn::buf2uint32(param, value)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", param); \
return false; \
} \
} while (false)

// A helper macro to parse command argument, the result is filled in an uint32_t vector variable
// 'container'.
#define PARSE_UINTS(container) \
do { \
std::vector<std::string> strs; \
PARSE_STRS(strs); \
container.clear(); \
for (const auto &str : strs) { \
uint32_t v; \
if (!::dsn::buf2uint32(str, v)) { \
fmt::print(stderr, "invalid command, '{}' should be an unsigned integer\n", str); \
return false; \
} \
container.insert(v); \
} \
} while (false)

#define RETURN_FALSE_IF_NOT(expr, ...) \
do { \
if (dsn_unlikely(!(expr))) { \
fmt::print(stderr, "{}\n", fmt::format(__VA_ARGS__)); \
return false; \
} \
} while (false)

#define RETURN_FALSE_IF_NON_OK(expr, ...) \
do { \
const auto _ec = (expr); \
if (dsn_unlikely(_ec != dsn::ERR_OK)) { \
fmt::print(stderr, "{}: {}\n", _ec, fmt::format(__VA_ARGS__)); \
return false; \
} \
} while (false)

#define RETURN_FALSE_IF_NON_RDB_OK(expr, ...) \
do { \
const auto _s = (expr); \
if (dsn_unlikely(!_s.ok())) { \
fmt::print(stderr, "{}: {}\n", _s.ToString(), fmt::format(__VA_ARGS__)); \
return false; \
} \
} while (false)

// Total aggregation over the fetched metrics. The only dimension is the metric name, which
// is also the key of `stat_var_map`.
class total_aggregate_stats : public aggregate_stats
Expand Down
4 changes: 4 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,7 @@ bool clear_bulk_load(command_executor *e, shell_context *sc, arguments args);
// == detect hotkey (see 'commands/detect_hotkey.cpp') == //

bool detect_hotkey(command_executor *e, shell_context *sc, arguments args);

// == local partition split (see 'commands/local_partition_split.cpp') == //
extern const std::string local_partition_split_help;
bool local_partition_split(command_executor *e, shell_context *sc, arguments args);
Loading

0 comments on commit 23b16e9

Please sign in to comment.