diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index caa942cae392..8629fa2b704a 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -164,6 +164,7 @@ receivers: - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.108.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.108.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/httpcheckreceiver v0.108.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.108.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.108.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/iisreceiver v0.108.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.108.0 diff --git a/cmd/otelcontribcol/components.go b/cmd/otelcontribcol/components.go index a197b786f59f..20539d112e91 100644 --- a/cmd/otelcontribcol/components.go +++ b/cmd/otelcontribcol/components.go @@ -432,6 +432,7 @@ func components() (otelcol.Factories, error) { factories.ReceiverModules[haproxyreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.108.0" factories.ReceiverModules[hostmetricsreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.108.0" factories.ReceiverModules[httpcheckreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/httpcheckreceiver v0.108.0" + factories.ReceiverModules[huaweicloudcesreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.108.0" factories.ReceiverModules[influxdbreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.108.0" factories.ReceiverModules[iisreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/iisreceiver v0.108.0" factories.ReceiverModules[jaegerreceiver.NewFactory().Type()] = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.108.0" diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index f71afd8b8fb6..fc935be3dc56 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -153,6 +153,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/haproxyreceiver v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/httpcheckreceiver v0.108.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/iisreceiver v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.108.0 @@ -545,7 +546,7 @@ require ( github.com/hashicorp/serf v0.10.1 // indirect github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95 // indirect github.com/hetznercloud/hcloud-go/v2 v2.10.2 // indirect - github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.104 // indirect + github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.110 // indirect github.com/iancoleman/strcase v0.3.0 // indirect github.com/imdario/mergo v0.3.16 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/cmd/otelcontribcol/go.sum b/cmd/otelcontribcol/go.sum index d755fe30c118..3259afc4a500 100644 --- a/cmd/otelcontribcol/go.sum +++ b/cmd/otelcontribcol/go.sum @@ -1669,8 +1669,8 @@ github.com/hetznercloud/hcloud-go/v2 v2.10.2/go.mod h1:xQ+8KhIS62W0D78Dpi57jsufW github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.104 h1:mCjW876Kx+y7LYxvbXVUxacN42AH2EJe715iZyPP1Fk= -github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.104/go.mod h1:lhdEO9Bbb3hZ0wG+JeK9/GqMOp/sgc92mFmVk5tNSCk= +github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.110 h1:XaWasqaF3tzRGw9cFmhXM1IiPAom8Nhg760F9LYHPKg= +github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.110/go.mod h1:JWz2ujO9X3oU5wb6kXp+DpR2UuDj2SldDbX8T0FSuhI= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= @@ -2564,7 +2564,6 @@ golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= @@ -2883,7 +2882,6 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -2901,7 +2899,6 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= diff --git a/receiver/huaweicloudcesreceiver/README.md b/receiver/huaweicloudcesreceiver/README.md index 7ad1aff9172a..b645bc7e912a 100644 --- a/receiver/huaweicloudcesreceiver/README.md +++ b/receiver/huaweicloudcesreceiver/README.md @@ -46,7 +46,15 @@ The following settings are optional: - `initial_delay`: The delay before the first collection of metrics begins. This is a duration field, such as 5s for 5 seconds. -- `collection_interval` (default = `60s`): This is the interval at which this receiver collects metrics. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. +- `collection_interval` (default = `60s`): This is the interval at which this receiver collects metrics. This value must be a string readable by Golang's [time.ParseDuration](https://pkg.go.dev/time#ParseDuration). Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`. We recommend a polling interval of at least one minute. + +- `retry_on_failure`: The following configurations can be used to control the retry policy of the CES client. The default values are suitable for most deployment scenarios. + - `enabled` (default true) + - `initial_interval` (default 100ms) + - `max_interval` (default 1s) + - `max_elapsed_time` (default 15s) + - `randomization_factor` (default 0.5) + - `multiplier` (default 1.5) ### Example Configuration @@ -88,6 +96,9 @@ Before running the application, you need to set the environment variables `HUAWE echo $HUAWEICLOUD_SDK_SK ``` +## Error handling +If you encounter any CES errors, please refer to the [Huawei Cloud Error Codes](https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-errorcode.html). + ## Converting CES metric representation to Open Telementery metric representation diff --git a/receiver/huaweicloudcesreceiver/config.go b/receiver/huaweicloudcesreceiver/config.go index a422936306e3..6ebe667d41f9 100644 --- a/receiver/huaweicloudcesreceiver/config.go +++ b/receiver/huaweicloudcesreceiver/config.go @@ -8,9 +8,11 @@ import ( "fmt" "slices" + "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ces/v1/model" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/multierr" ) @@ -57,10 +59,12 @@ type Config struct { // 14400: Cloud Eye aggregates data every 4 hours. // 86400: Cloud Eye aggregates data every 24 hours. // For details about the aggregation, see https://support.huaweicloud.com/intl/en-us/ces_faq/ces_faq_0009.html - Period int `mapstructure:"period"` + Period int32 `mapstructure:"period"` // Data aggregation method. The supported values ​​are max, min, average, sum, variance. Filter string `mapstructure:"filter"` + + BackOffConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"` } type HuaweiSessionConfig struct { @@ -78,10 +82,16 @@ type HuaweiSessionConfig struct { var _ component.Config = (*Config)(nil) // These valid periods are defined by CES API constraints: https://support.huaweicloud.com/intl/en-us/api-ces/ces_03_0034.html#section3 -var validPeriods = []int{1, 300, 1200, 3600, 14400, 86400} +var validPeriods = []int32{1, 300, 1200, 3600, 14400, 86400} // These valid filters are defined by CES API constraints: https://support.huaweicloud.com/intl/en-us/api-ces/ces_03_0034.html#section3 -var validFilters = []string{"max", "min", "average", "sum", "variance"} +var validFilters = map[string]model.ShowMetricDataRequestFilter{ + "max": model.GetShowMetricDataRequestFilterEnum().MAX, + "min": model.GetShowMetricDataRequestFilterEnum().MIN, + "average": model.GetShowMetricDataRequestFilterEnum().AVERAGE, + "sum": model.GetShowMetricDataRequestFilterEnum().SUM, + "variance": model.GetShowMetricDataRequestFilterEnum().VARIANCE, +} // Validate config func (config *Config) Validate() error { @@ -96,11 +106,14 @@ func (config *Config) Validate() error { if index := slices.Index(validPeriods, config.Period); index == -1 { err = multierr.Append(err, fmt.Errorf("invalid period: got %d; must be one of %v", config.Period, validPeriods)) } - - if index := slices.Index(validFilters, config.Filter); index == -1 { - err = multierr.Append(err, fmt.Errorf("invalid filter: got %s; must be one of %v", config.Filter, validFilters)) + if _, ok := validFilters[config.Filter]; !ok { + var validFiltersSlice []string + for key := range validFilters { + validFiltersSlice = append(validFiltersSlice, key) + } + err = multierr.Append(err, fmt.Errorf("invalid filter: got %s; must be one of %v", config.Filter, validFiltersSlice)) } - if config.Period >= int(config.CollectionInterval.Seconds()) { + if config.Period >= int32(config.CollectionInterval.Seconds()) { err = multierr.Append(err, errInvalidCollectionInterval) } diff --git a/receiver/huaweicloudcesreceiver/factory.go b/receiver/huaweicloudcesreceiver/factory.go index fbdbcbd8e252..5f5426680e5d 100644 --- a/receiver/huaweicloudcesreceiver/factory.go +++ b/receiver/huaweicloudcesreceiver/factory.go @@ -5,8 +5,11 @@ package huaweicloudcesreceiver // import "github.com/open-telemetry/opentelemetr import ( "context" + "time" + "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" @@ -22,6 +25,14 @@ func NewFactory() receiver.Factory { func createDefaultConfig() component.Config { return &Config{ + BackOffConfig: configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, + MaxInterval: time.Second, + MaxElapsedTime: 15 * time.Second, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + }, HuaweiSessionConfig: HuaweiSessionConfig{ NoVerifySSL: false, }, diff --git a/receiver/huaweicloudcesreceiver/go.mod b/receiver/huaweicloudcesreceiver/go.mod index f07b36a72cca..6869eb67900c 100644 --- a/receiver/huaweicloudcesreceiver/go.mod +++ b/receiver/huaweicloudcesreceiver/go.mod @@ -5,17 +5,19 @@ go 1.22.0 toolchain go1.22.3 require ( + github.com/cenkalti/backoff/v4 v4.3.0 github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.110 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.107.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.108.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.107.0 go.opentelemetry.io/collector/config/confighttp v0.107.0 go.opentelemetry.io/collector/config/configopaque v1.13.0 + go.opentelemetry.io/collector/config/configretry v1.13.0 go.opentelemetry.io/collector/confmap v0.107.0 go.opentelemetry.io/collector/consumer v0.107.0 go.opentelemetry.io/collector/consumer/consumertest v0.107.0 - go.opentelemetry.io/collector/pdata v1.13.1-0.20240816132030-9fd84668bb02 + go.opentelemetry.io/collector/pdata v1.14.1 go.opentelemetry.io/collector/receiver v0.106.1 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 @@ -45,7 +47,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.107.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.108.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/receiver/huaweicloudcesreceiver/go.sum b/receiver/huaweicloudcesreceiver/go.sum index ad025ef20a1d..c9dc3a760183 100644 --- a/receiver/huaweicloudcesreceiver/go.sum +++ b/receiver/huaweicloudcesreceiver/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -135,6 +137,8 @@ go.opentelemetry.io/collector/config/confighttp v0.107.0 h1:HnfFg/n3xu+XH7onWrFZ go.opentelemetry.io/collector/config/confighttp v0.107.0/go.mod h1:/slm41hcfOwAxv8ZcGCKHC22jnQZ71z42OSWChKuIgU= go.opentelemetry.io/collector/config/configopaque v1.13.0 h1:EDB9JIifmBth1z9IsEduoE1bT1Q8jV0sR005EMW7q1w= go.opentelemetry.io/collector/config/configopaque v1.13.0/go.mod h1:0xURn2sOy5j4fbaocpEYfM97HPGsiffkkVudSPyTJlM= +go.opentelemetry.io/collector/config/configretry v1.13.0 h1:gcjWB6FOG1u1e5ecs3nOtOysXWtxJxeL+cNiFLI+nCo= +go.opentelemetry.io/collector/config/configretry v1.13.0/go.mod h1:P+RA0IA+QoxnDn4072uyeAk1RIoYiCbxYsjpKX5eFC4= go.opentelemetry.io/collector/config/configtelemetry v0.107.0 h1:pSGd4FWQ/Up/Af+XZTR8JNneH/wmQ/TAU4Z16JHQeUc= go.opentelemetry.io/collector/config/configtelemetry v0.107.0/go.mod h1:WxWKNVAQJg/Io1nA3xLgn/DWLE/W1QOB2+/Js3ACi40= go.opentelemetry.io/collector/config/configtls v1.13.0 h1:N57vOibMIPX9YZq4ZLFjj5ZjUHMYW7bpkPkygU3vt8w= @@ -157,8 +161,8 @@ go.opentelemetry.io/collector/featuregate v1.13.0 h1:rc84eCf5hesXQ8/bP6Zc15wqthb go.opentelemetry.io/collector/featuregate v1.13.0/go.mod h1:PsOINaGgTiFc+Tzu2K/X2jP+Ngmlp7YKGV1XrnBkH7U= go.opentelemetry.io/collector/internal/globalgates v0.107.0 h1:PaD6WgQg80YTVxg8OF+YEqgI7WRd13wMu/R6GIG7uNU= go.opentelemetry.io/collector/internal/globalgates v0.107.0/go.mod h1:hca7Tpzu6JmBrAOgmlyp/ZM6kxprPRMKqSYoq/Tdzjw= -go.opentelemetry.io/collector/pdata v1.13.1-0.20240816132030-9fd84668bb02 h1:ZWKffCXPwEjYkfoDPCVLHEBdHrFyD/ZcMc4nbCmYkFU= -go.opentelemetry.io/collector/pdata v1.13.1-0.20240816132030-9fd84668bb02/go.mod h1:z1dTjwwtcoXxZx2/nkHysjxMeaxe9pEmYTEr4SMNIx8= +go.opentelemetry.io/collector/pdata v1.14.1 h1:wXZjtQA7Vy5HFqco+yA95ENyMQU5heBB1IxMHQf6mUk= +go.opentelemetry.io/collector/pdata v1.14.1/go.mod h1:z1dTjwwtcoXxZx2/nkHysjxMeaxe9pEmYTEr4SMNIx8= go.opentelemetry.io/collector/pdata/pprofile v0.107.0 h1:F25VZrEkSaneIBNcNI9LEBWf9nRC/WHKluSBTP0gKAA= go.opentelemetry.io/collector/pdata/pprofile v0.107.0/go.mod h1:1GrwsKtgogRCt8aG/0lfJ037yDdFtYqF+OtJr+snxRQ= go.opentelemetry.io/collector/pdata/testdata v0.107.0 h1:02CqvJrYjkrBlWDD+6yrByN1AhG2zT61OScLPhyyMwU= diff --git a/receiver/huaweicloudcesreceiver/integration_test.go b/receiver/huaweicloudcesreceiver/integration_test.go index f9d622af2c28..23420930ca47 100644 --- a/receiver/huaweicloudcesreceiver/integration_test.go +++ b/receiver/huaweicloudcesreceiver/integration_test.go @@ -34,35 +34,24 @@ func TestHuaweiCloudCESReceiverIntegration(t *testing.T) { Dimensions: []model.MetricsDimension{ { Name: "instance_id", - Value: "12345", + Value: "faea5b75-e390-4e2b-8733-9226a9026070", }, }, + Unit: "%", }, }, }, nil) - mc.On("BatchListMetricData", mock.Anything).Return(&model.BatchListMetricDataResponse{ - Metrics: &[]model.BatchMetricData{ + mc.On("ShowMetricData", mock.Anything).Return(&model.ShowMetricDataResponse{ + MetricName: stringPtr("cpu_util"), + Datapoints: &[]model.Datapoint{ { - Namespace: stringPtr("SYS.ECS"), - MetricName: "cpu_util", - Dimensions: &[]model.MetricsDimension{ - { - Name: "instance_id", - Value: "faea5b75-e390-4e2b-8733-9226a9026070", - }, - }, - Datapoints: []model.DatapointForBatchMetric{ - { - Average: float64Ptr(45.67), - Timestamp: 1556625610000, - }, - { - Average: float64Ptr(89.01), - Timestamp: 1556625715000, - }, - }, - Unit: stringPtr("%"), + Average: float64Ptr(45.67), + Timestamp: 1556625610000, + }, + { + Average: float64Ptr(89.01), + Timestamp: 1556625715000, }, }, }, nil) diff --git a/receiver/huaweicloudcesreceiver/internal/backoff.go b/receiver/huaweicloudcesreceiver/internal/backoff.go new file mode 100644 index 000000000000..7ad525bfa62c --- /dev/null +++ b/receiver/huaweicloudcesreceiver/internal/backoff.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver/internal" + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "go.uber.org/zap" +) + +// Generic function to make an API call with exponential backoff and context cancellation handling. +func MakeAPICallWithRetry[T any]( + ctx context.Context, + shutdownChan chan struct{}, + logger *zap.Logger, + apiCall func() (*T, error), + isThrottlingError func(error) bool, + backOffConfig *backoff.ExponentialBackOff, +) (*T, error) { + // Immediately check for context cancellation or server shutdown. + select { + case <-ctx.Done(): + return nil, fmt.Errorf("request was cancelled or timed out") + case <-shutdownChan: + return nil, fmt.Errorf("request is cancelled due to server shutdown") + case <-time.After(50 * time.Millisecond): + } + + // Make the initial API call. + resp, err := apiCall() + if err == nil { + return resp, nil + } + + // If the error is not due to request throttling, return the error. + if !isThrottlingError(err) { + return nil, err + } + + // Initialize the backoff mechanism for retrying the API call. + expBackoff := &backoff.ExponentialBackOff{ + InitialInterval: backOffConfig.InitialInterval, + RandomizationFactor: backOffConfig.RandomizationFactor, + Multiplier: backOffConfig.Multiplier, + MaxInterval: backOffConfig.MaxInterval, + MaxElapsedTime: backOffConfig.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + expBackoff.Reset() + attempts := 0 + + // Retry loop for handling throttling errors. + for { + attempts++ + delay := expBackoff.NextBackOff() + if delay == backoff.Stop { + return resp, err + } + logger.Warn("server busy, retrying request", + zap.Int("attempts", attempts), + zap.Duration("delay", delay)) + + // Handle context cancellation or shutdown before retrying. + select { + case <-ctx.Done(): + return nil, fmt.Errorf("request was cancelled or timed out") + case <-shutdownChan: + return nil, fmt.Errorf("request is cancelled due to server shutdown") + case <-time.After(delay): + } + + // Retry the API call. + resp, err = apiCall() + if err == nil { + return resp, nil + } + if !isThrottlingError(err) { + break + } + } + + return nil, err +} diff --git a/receiver/huaweicloudcesreceiver/internal/backoff_test.go b/receiver/huaweicloudcesreceiver/internal/backoff_test.go new file mode 100644 index 000000000000..402a1f23ace8 --- /dev/null +++ b/receiver/huaweicloudcesreceiver/internal/backoff_test.go @@ -0,0 +1,129 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zaptest" +) + +func TestMakeAPICallWithRetrySuccess(t *testing.T) { + logger := zaptest.NewLogger(t) + apiCall := func() (*string, error) { + result := "success" + return &result, nil + } + isThrottlingError := func(_ error) bool { + return false + } + + resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) + + assert.NoError(t, err) + assert.Equal(t, "success", *resp) +} + +func TestMakeAPICallWithRetryImmediateFailure(t *testing.T) { + logger := zaptest.NewLogger(t) + apiCall := func() (*string, error) { + return nil, errors.New("some error") + } + isThrottlingError := func(_ error) bool { + return false + } + + resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) + + assert.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, "some error", err.Error()) +} + +func TestMakeAPICallWithRetryThrottlingWithSuccess(t *testing.T) { + logger := zaptest.NewLogger(t) + callCount := 0 + apiCall := func() (*string, error) { + callCount++ + if callCount == 3 { + result := "success" + return &result, nil + } + return nil, errors.New("throttling error") + } + isThrottlingError := func(err error) bool { + return err.Error() == "throttling error" + } + + backOffConfig := backoff.NewExponentialBackOff() + backOffConfig.InitialInterval = 10 * time.Millisecond + + resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backOffConfig) + + assert.NoError(t, err) + assert.Equal(t, "success", *resp) + assert.Equal(t, 3, callCount) +} + +func TestMakeAPICallWithRetryThrottlingMaxRetries(t *testing.T) { + logger := zaptest.NewLogger(t) + apiCall := func() (*string, error) { + return nil, errors.New("throttling error") + } + isThrottlingError := func(err error) bool { + return err.Error() == "throttling error" + } + + backOffConfig := backoff.NewExponentialBackOff() + backOffConfig.MaxElapsedTime = 50 * time.Millisecond + + resp, err := MakeAPICallWithRetry(context.TODO(), make(chan struct{}), logger, apiCall, isThrottlingError, backOffConfig) + + assert.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, "throttling error", err.Error()) +} + +func TestMakeAPICallWithRetryContextCancellation(t *testing.T) { + logger := zaptest.NewLogger(t) + ctx, cancel := context.WithCancel(context.TODO()) + time.AfterFunc(time.Second, cancel) + + apiCall := func() (*string, error) { + return nil, errors.New("throttling error") + } + isThrottlingError := func(err error) bool { + return err.Error() == "throttling error" + } + + resp, err := MakeAPICallWithRetry(ctx, make(chan struct{}), logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) + + assert.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, "request was cancelled or timed out", err.Error()) +} + +func TestMakeAPICallWithRetryServerShutdown(t *testing.T) { + logger := zaptest.NewLogger(t) + shutdownChan := make(chan struct{}) + time.AfterFunc(time.Second, func() { close(shutdownChan) }) + + apiCall := func() (*string, error) { + return nil, errors.New("throttling error") + } + isThrottlingError := func(err error) bool { + return err.Error() == "throttling error" + } + + resp, err := MakeAPICallWithRetry(context.TODO(), shutdownChan, logger, apiCall, isThrottlingError, backoff.NewExponentialBackOff()) + + assert.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, "request is cancelled due to server shutdown", err.Error()) +} diff --git a/receiver/huaweicloudcesreceiver/internal/ces_to_otlp.go b/receiver/huaweicloudcesreceiver/internal/ces_to_otlp.go index d50ef431ff88..fc10a42ffadd 100644 --- a/receiver/huaweicloudcesreceiver/internal/ces_to_otlp.go +++ b/receiver/huaweicloudcesreceiver/internal/ces_to_otlp.go @@ -4,6 +4,8 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver/internal" import ( + "fmt" + "strings" "time" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ces/v1/model" @@ -11,7 +13,31 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -func ConvertCESMetricsToOTLP(projectID, region, filter string, cesMetrics []model.BatchMetricData) pmetric.Metrics { +type MetricData struct { + MetricName string + Dimensions []model.MetricsDimension + Namespace string + Unit string + Datapoints []model.Datapoint +} + +func GetMetricKey(m model.MetricInfoList) string { + strArray := make([]string, len(m.Dimensions)) + for i, ms := range m.Dimensions { + strArray[i] = ms.String() + } + return fmt.Sprintf("metric_name=%s,dimensions=%s", m.MetricName, strings.Join(strArray, " ")) +} + +func GetDimension(dimensions []model.MetricsDimension, index int) *string { + if len(dimensions) > index { + dimValue := dimensions[index].Name + "," + dimensions[index].Value + return &dimValue + } + return nil +} + +func ConvertCESMetricsToOTLP(projectID, region, filter string, cesMetrics map[string]MetricData) pmetric.Metrics { metrics := pmetric.NewMetrics() if len(cesMetrics) == 0 { return metrics @@ -30,17 +56,13 @@ func ConvertCESMetricsToOTLP(projectID, region, filter string, cesMetrics []mode metric := scopedMetric.Metrics().AppendEmpty() metric.SetName(cesMetric.MetricName) - if cesMetric.Unit != nil { - metric.SetUnit(*cesMetric.Unit) - } - if cesMetric.Dimensions != nil { - for _, dimension := range *cesMetric.Dimensions { - metric.Metadata().PutStr(dimension.Name, dimension.Value) - } - } - if cesMetric.Namespace != nil { - metric.Metadata().PutStr("service.namespace", *cesMetric.Namespace) + metric.SetUnit(cesMetric.Unit) + for _, dimension := range cesMetric.Dimensions { + metric.Metadata().PutStr(dimension.Name, dimension.Value) } + + metric.Metadata().PutStr("service.namespace", cesMetric.Namespace) + dataPoints := metric.SetEmptyGauge().DataPoints() for _, dataPoint := range cesMetric.Datapoints { dp := dataPoints.AppendEmpty() diff --git a/receiver/huaweicloudcesreceiver/internal/ces_to_otlp_test.go b/receiver/huaweicloudcesreceiver/internal/ces_to_otlp_test.go index aafb1339283d..fe464d21c0f2 100644 --- a/receiver/huaweicloudcesreceiver/internal/ces_to_otlp_test.go +++ b/receiver/huaweicloudcesreceiver/internal/ces_to_otlp_test.go @@ -16,22 +16,22 @@ import ( func TestConvertCESMetricsToOTLP(t *testing.T) { tests := []struct { name string - input []model.BatchMetricData + input map[string]MetricData expected pmetric.Metrics }{ { name: "Valid Metric Conversion", - input: []model.BatchMetricData{ - { - Namespace: stringPtr("SYS.ECS"), + input: map[string]MetricData{ + "cpu_util": { MetricName: "cpu_util", - Dimensions: &[]model.MetricsDimension{ + Namespace: "SYS.ECS", + Dimensions: []model.MetricsDimension{ { Name: "instance_id", Value: "faea5b75-e390-4e2b-8733-9226a9026070", }, }, - Datapoints: []model.DatapointForBatchMetric{ + Datapoints: []model.Datapoint{ { Average: float64Ptr(0.5), Timestamp: 1556625610000, @@ -41,18 +41,18 @@ func TestConvertCESMetricsToOTLP(t *testing.T) { Timestamp: 1556625715000, }, }, - Unit: stringPtr("%"), + Unit: "%", }, - { - Namespace: stringPtr("SYS.ECS"), + "network_vm_connections": { MetricName: "network_vm_connections", - Dimensions: &[]model.MetricsDimension{ + Namespace: "SYS.ECS", + Dimensions: []model.MetricsDimension{ { Name: "instance_id", Value: "06b4020f-461a-4a52-84da-53fa71c2f42b", }, }, - Datapoints: []model.DatapointForBatchMetric{ + Datapoints: []model.Datapoint{ { Average: float64Ptr(1), Timestamp: 1556625612000, @@ -62,14 +62,14 @@ func TestConvertCESMetricsToOTLP(t *testing.T) { Timestamp: 1556625717000, }, }, - Unit: stringPtr("count"), + Unit: "count", }, }, expected: expectedMetrics(), }, { name: "Empty Metric Data", - input: []model.BatchMetricData{}, + input: map[string]MetricData{}, expected: pmetric.NewMetrics(), }, } @@ -81,10 +81,6 @@ func TestConvertCESMetricsToOTLP(t *testing.T) { } } -func stringPtr(s string) *string { - return &s -} - func float64Ptr(f float64) *float64 { return &f } diff --git a/receiver/huaweicloudcesreceiver/receiver.go b/receiver/huaweicloudcesreceiver/receiver.go index aec75f602eee..eeee7917f0d2 100644 --- a/receiver/huaweicloudcesreceiver/receiver.go +++ b/receiver/huaweicloudcesreceiver/receiver.go @@ -6,16 +6,20 @@ package huaweicloudcesreceiver // import "github.com/open-telemetry/opentelemetr import ( "context" "errors" + "fmt" "net/url" "strconv" + "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/config" ces "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ces/v1" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ces/v1/model" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ces/v1/region" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" @@ -23,15 +27,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver/internal" ) +const ( + // See https://support.huaweicloud.com/intl/en-us/devg-apisign/api-sign-errorcode.html + requestThrottledErrMsg = "APIGW.0308" +) + type cesReceiver struct { logger *zap.Logger client internal.CesClient cancel context.CancelFunc - host component.Host - nextConsumer consumer.Metrics - lastUsedFinishTs time.Time - config *Config + host component.Host + nextConsumer consumer.Metrics + lastSeenTs map[string]time.Time + config *Config + shutdownChan chan struct{} } func newHuaweiCloudCesReceiver(settings receiver.Settings, cfg *Config, next consumer.Metrics) *cesReceiver { @@ -39,6 +49,8 @@ func newHuaweiCloudCesReceiver(settings receiver.Settings, cfg *Config, next con logger: settings.Logger, config: cfg, nextConsumer: next, + lastSeenTs: make(map[string]time.Time), + shutdownChan: make(chan struct{}, 1), } return rcvr } @@ -130,76 +142,136 @@ func (rcvr *cesReceiver) pollMetricsAndConsume(ctx context.Context) error { if rcvr.client == nil { return errors.New("invalid client") } - metricDefinitions, err := rcvr.listMetricDefinitions() + metricDefinitions, err := rcvr.listMetricDefinitions(ctx) if err != nil { return err } - cesMetrics, err := rcvr.listDataPoints(metricDefinitions) - if err != nil { - rcvr.logger.Error(err.Error()) - return err - } - metrics := internal.ConvertCESMetricsToOTLP(rcvr.config.ProjectID, rcvr.config.RegionName, rcvr.config.Filter, cesMetrics) - if err := rcvr.nextConsumer.ConsumeMetrics(ctx, metrics); err != nil { + metrics := rcvr.listDataPoints(ctx, metricDefinitions) + otpMetrics := internal.ConvertCESMetricsToOTLP(rcvr.config.ProjectID, rcvr.config.RegionName, rcvr.config.Filter, metrics) + if err := rcvr.nextConsumer.ConsumeMetrics(ctx, otpMetrics); err != nil { return err } return nil } -func (rcvr *cesReceiver) listMetricDefinitions() ([]model.MetricInfoList, error) { - response, err := rcvr.client.ListMetrics(&model.ListMetricsRequest{}) +func (rcvr *cesReceiver) listMetricDefinitions(ctx context.Context) ([]model.MetricInfoList, error) { + response, err := internal.MakeAPICallWithRetry( + ctx, + rcvr.shutdownChan, + rcvr.logger, + func() (*model.ListMetricsResponse, error) { + return rcvr.client.ListMetrics(&model.ListMetricsRequest{}) + }, + func(err error) bool { return strings.Contains(err.Error(), requestThrottledErrMsg) }, + newExponentialBackOff(&rcvr.config.BackOffConfig), + ) if err != nil { return []model.MetricInfoList{}, err } - if response.Metrics == nil || len((*response.Metrics)) == 0 { - return []model.MetricInfoList{}, errors.New("empty list of metric definitions") + if response == nil || response.Metrics == nil || len((*response.Metrics)) == 0 { + return []model.MetricInfoList{}, errors.New("unexpected empty list of metric definitions") } return *response.Metrics, nil } -func convertMetricInfoListArrayToMetricInfoArray(infoListArray []model.MetricInfoList) []model.MetricInfo { - infoArray := make([]model.MetricInfo, len(infoListArray)) - for i, infoList := range infoListArray { - infoArray[i] = model.MetricInfo{ - Namespace: infoList.Namespace, - MetricName: infoList.MetricName, - Dimensions: infoList.Dimensions, +// listDataPoints retrieves data points for a list of metric definitions. +// The function performs the following operations: +// 1. Generates a unique key for each metric definition (at least one dimenstion is required) and checks for duplicates. +// 2. Determines the time range (from-to) for fetching the metric data points, using the current timestamp +// and the last-seen timestamp for each metric. +// 3. Fetches data points for each metric definition. +// 4. Updates the last-seen timestamp for each metric based on the most recent data point timestamp. +// 5. Returns a map of metric keys to their corresponding MetricData, containing all fetched data points. +// +// Parameters: +// - ctx: Context for controlling cancellation and deadlines. +// - metricDefinitions: A slice of MetricInfoList containing the definitions of metrics to be fetched. +// +// Returns: +// - A map where each key is a unique metric identifier and each value is the associated MetricData. +func (rcvr *cesReceiver) listDataPoints(ctx context.Context, metricDefinitions []model.MetricInfoList) map[string]internal.MetricData { + // TODO: Implement deduplication: There may be a need for deduplication, possibly using a Processor to ensure unique metrics are processed. + to := time.Now() + metrics := make(map[string]internal.MetricData) + for _, metricDefinition := range metricDefinitions { + if len(metricDefinition.Dimensions) == 0 { + rcvr.logger.Warn("metric has 0 dimensions. skipping it", zap.String("metricName", metricDefinition.MetricName)) + continue + } + key := internal.GetMetricKey(metricDefinition) + if _, ok := metrics[key]; ok { + rcvr.logger.Warn("metric key found on multiple metric definitions", zap.String("key", key)) + continue + } + from, ok := rcvr.lastSeenTs[key] + if !ok { + from = to.Add(-1 * rcvr.config.CollectionInterval) + } + resp, dpErr := rcvr.listDataPointsForMetric(ctx, from, to, metricDefinition) + if dpErr != nil { + rcvr.logger.Warn(fmt.Sprintf("unable to get datapoints for metric name %+v", metricDefinition), zap.Error(dpErr)) + } + var datapoints []model.Datapoint + if resp != nil && resp.Datapoints != nil { + datapoints = *resp.Datapoints + + var maxdpTs int64 + for _, dp := range datapoints { + if dp.Timestamp > maxdpTs { + maxdpTs = dp.Timestamp + } + } + if maxdpTs > rcvr.lastSeenTs[key].UnixMilli() { + rcvr.lastSeenTs[key] = time.UnixMilli(maxdpTs) + } + + } + metrics[key] = internal.MetricData{ + MetricName: metricDefinition.MetricName, + Dimensions: metricDefinition.Dimensions, + Namespace: metricDefinition.Namespace, + Unit: metricDefinition.Unit, + Datapoints: datapoints, } } - return infoArray + return metrics } -func (rcvr *cesReceiver) listDataPoints(metricDefinitions []model.MetricInfoList) ([]model.BatchMetricData, error) { - // TODO: Handle delayed metrics. CES accepts metrics with up to a 30-minute delay. - // If the request is based on the current time ('now'), it may miss metrics delayed by a minute or more, - // as the next request would exclude them. Consider adding a delay configuration to account for this. - // TODO: Implement deduplication: There may be a need for deduplication, possibly using a Processor to ensure unique metrics are processed. - to := time.Now() - var from time.Time - if rcvr.lastUsedFinishTs.IsZero() { - from = to.Add(-1 * rcvr.config.CollectionInterval) - } else { - from = rcvr.lastUsedFinishTs - } - rcvr.lastUsedFinishTs = to - - response, err := rcvr.client.BatchListMetricData(&model.BatchListMetricDataRequest{ - Body: &model.BatchListMetricDataRequestBody{ - Metrics: convertMetricInfoListArrayToMetricInfoArray(metricDefinitions), - Period: strconv.Itoa(rcvr.config.Period), - Filter: rcvr.config.Filter, - From: from.UnixMilli(), - To: to.UnixMilli(), +func (rcvr *cesReceiver) listDataPointsForMetric(ctx context.Context, from, to time.Time, infoList model.MetricInfoList) (*model.ShowMetricDataResponse, error) { + return internal.MakeAPICallWithRetry( + ctx, + rcvr.shutdownChan, + rcvr.logger, + func() (*model.ShowMetricDataResponse, error) { + return rcvr.client.ShowMetricData(&model.ShowMetricDataRequest{ + Namespace: infoList.Namespace, + MetricName: infoList.MetricName, + Dim0: infoList.Dimensions[0].Name + "," + infoList.Dimensions[0].Value, + Dim1: internal.GetDimension(infoList.Dimensions, 1), + Dim2: internal.GetDimension(infoList.Dimensions, 2), + Dim3: internal.GetDimension(infoList.Dimensions, 3), + Period: rcvr.config.Period, + Filter: validFilters[rcvr.config.Filter], + From: from.UnixMilli(), + To: to.UnixMilli(), + }) }, - }) - if err != nil { - return []model.BatchMetricData{}, err - } - if response.Metrics == nil || len(*response.Metrics) == 0 { - return []model.BatchMetricData{}, errors.New("empty list of metric data") + func(err error) bool { return strings.Contains(err.Error(), requestThrottledErrMsg) }, + newExponentialBackOff(&rcvr.config.BackOffConfig), + ) +} + +func newExponentialBackOff(backOffConfig *configretry.BackOffConfig) *backoff.ExponentialBackOff { + return &backoff.ExponentialBackOff{ + InitialInterval: backOffConfig.InitialInterval, + RandomizationFactor: backOffConfig.RandomizationFactor, + Multiplier: backOffConfig.Multiplier, + MaxInterval: backOffConfig.MaxInterval, + MaxElapsedTime: backOffConfig.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, } - return *response.Metrics, nil } func createHTTPConfig(cfg HuaweiSessionConfig) (*config.HttpConfig, error) { @@ -239,5 +311,7 @@ func (rcvr *cesReceiver) Shutdown(_ context.Context) error { if rcvr.cancel != nil { rcvr.cancel() } + rcvr.shutdownChan <- struct{}{} + close(rcvr.shutdownChan) return nil } diff --git a/receiver/huaweicloudcesreceiver/receiver_test.go b/receiver/huaweicloudcesreceiver/receiver_test.go index 602ee20f3f83..3de0bc381909 100644 --- a/receiver/huaweicloudcesreceiver/receiver_test.go +++ b/receiver/huaweicloudcesreceiver/receiver_test.go @@ -13,13 +13,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver/internal/mocks" - + "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/huaweicloudcesreceiver/internal/mocks" ) func stringPtr(s string) *string { @@ -62,9 +62,10 @@ func TestListMetricDefinitionsSuccess(t *testing.T) { receiver := &cesReceiver{ client: mockCes, + config: createDefaultConfig().(*Config), } - metrics, err := receiver.listMetricDefinitions() + metrics, err := receiver.listMetricDefinitions(context.Background()) assert.NoError(t, err) assert.NotNil(t, metrics) @@ -81,9 +82,10 @@ func TestListMetricDefinitionsFailure(t *testing.T) { mockCes.On("ListMetrics", mock.Anything).Return(nil, errors.New("failed to list metrics")) receiver := &cesReceiver{ client: mockCes, + config: createDefaultConfig().(*Config), } - metrics, err := receiver.listMetricDefinitions() + metrics, err := receiver.listMetricDefinitions(context.Background()) assert.Error(t, err) assert.Len(t, metrics, 0) @@ -91,6 +93,72 @@ func TestListMetricDefinitionsFailure(t *testing.T) { mockCes.AssertExpectations(t) } +func TestListDataPointsForMetricBackOffWIthDefaultConfig(t *testing.T) { + mockCes := mocks.NewCesClient(t) + next := new(consumertest.MetricsSink) + receiver := newHuaweiCloudCesReceiver(receivertest.NewNopSettings(), createDefaultConfig().(*Config), next) + receiver.client = mockCes + + mockCes.On("ShowMetricData", mock.Anything).Return(nil, errors.New(requestThrottledErrMsg)).Times(3) + mockCes.On("ShowMetricData", mock.Anything).Return(&model.ShowMetricDataResponse{ + MetricName: stringPtr("cpu_util"), + Datapoints: &[]model.Datapoint{ + { + Average: float64Ptr(45.67), + Timestamp: 1556625610000, + }, + { + Average: float64Ptr(89.01), + Timestamp: 1556625715000, + }, + }, + }, nil) + + resp, err := receiver.listDataPointsForMetric(context.Background(), time.Now().Add(10*time.Minute), time.Now(), model.MetricInfoList{ + Namespace: "SYS.ECS", + MetricName: "cpu_util", + Dimensions: []model.MetricsDimension{ + { + Name: "instance_id", + Value: "12345", + }, + }, + }) + + require.NoError(t, err) + assert.Len(t, *resp.Datapoints, 2) +} + +func TestListDataPointsForMetricBackOffFails(t *testing.T) { + mockCes := mocks.NewCesClient(t) + next := new(consumertest.MetricsSink) + receiver := newHuaweiCloudCesReceiver(receivertest.NewNopSettings(), &Config{BackOffConfig: configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, + MaxInterval: 800 * time.Millisecond, + MaxElapsedTime: 1 * time.Second, + RandomizationFactor: 0, + Multiplier: 2, + }}, next) + receiver.client = mockCes + + mockCes.On("ShowMetricData", mock.Anything).Return(nil, errors.New(requestThrottledErrMsg)).Times(4) + + resp, err := receiver.listDataPointsForMetric(context.Background(), time.Now().Add(10*time.Minute), time.Now(), model.MetricInfoList{ + Namespace: "SYS.ECS", + MetricName: "cpu_util", + Dimensions: []model.MetricsDimension{ + { + Name: "instance_id", + Value: "12345", + }, + }, + }) + + require.ErrorContains(t, err, requestThrottledErrMsg) + assert.Nil(t, resp) +} + func TestPollMetricsAndConsumeSuccess(t *testing.T) { mockCes := mocks.NewCesClient(t) next := new(consumertest.MetricsSink) @@ -112,28 +180,16 @@ func TestPollMetricsAndConsumeSuccess(t *testing.T) { }, }, nil) - mockCes.On("BatchListMetricData", mock.Anything).Return(&model.BatchListMetricDataResponse{ - Metrics: &[]model.BatchMetricData{ + mockCes.On("ShowMetricData", mock.Anything).Return(&model.ShowMetricDataResponse{ + MetricName: stringPtr("cpu_util"), + Datapoints: &[]model.Datapoint{ { - Namespace: stringPtr("SYS.ECS"), - MetricName: "cpu_util", - Dimensions: &[]model.MetricsDimension{ - { - Name: "instance_id", - Value: "faea5b75-e390-4e2b-8733-9226a9026070", - }, - }, - Datapoints: []model.DatapointForBatchMetric{ - { - Average: float64Ptr(45.67), - Timestamp: 1556625610000, - }, - { - Average: float64Ptr(89.01), - Timestamp: 1556625715000, - }, - }, - Unit: stringPtr("%"), + Average: float64Ptr(45.67), + Timestamp: 1556625610000, + }, + { + Average: float64Ptr(89.01), + Timestamp: 1556625715000, }, }, }, nil) @@ -170,24 +226,12 @@ func TestStartReadingMetrics(t *testing.T) { }, }, nil) - m.On("BatchListMetricData", mock.Anything).Return(&model.BatchListMetricDataResponse{ - Metrics: &[]model.BatchMetricData{ + m.On("ShowMetricData", mock.Anything).Return(&model.ShowMetricDataResponse{ + MetricName: stringPtr("cpu_util"), + Datapoints: &[]model.Datapoint{ { - Namespace: stringPtr("SYS.ECS"), - MetricName: "cpu_util", - Dimensions: &[]model.MetricsDimension{ - { - Name: "instance_id", - Value: "faea5b75-e390-4e2b-8733-9226a9026070", - }, - }, - Datapoints: []model.DatapointForBatchMetric{ - { - Average: float64Ptr(45.67), - Timestamp: 1556625610000, - }, - }, - Unit: stringPtr("%"), + Average: float64Ptr(45.67), + Timestamp: 1556625610000, }, }, }, nil) @@ -220,6 +264,7 @@ func TestStartReadingMetrics(t *testing.T) { client: mockCes, logger: logger, nextConsumer: next, + lastSeenTs: make(map[string]time.Time), } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel()