diff --git a/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml b/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml new file mode 100644 index 0000000000000..ca4d1c8218f07 --- /dev/null +++ b/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml @@ -0,0 +1,17 @@ +change_type: enhancement + +component: googlecloudpubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Turn noisy `warn` log about Pub/Sub servers into `debug`, and turn the reset count into a metric + +issues: [37571] + +subtext: | + The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers + recurrently close the connection after a time period to avoid a long-running sticky connection. Before the + receiver logged `warn` log lines everytime this happened. These log lines are moved to debug so that fleets with + lots of collectors with the receiver don't span logs at warn level. To keep track of the resets, whenever a + connection reset happens a `otelcol_receiver_googlecloudpubsub_stream_restarts` metric is increased by one. + +change_logs: [user] diff --git a/receiver/googlecloudpubsubreceiver/documentation.md b/receiver/googlecloudpubsubreceiver/documentation.md new file mode 100644 index 0000000000000..7cde645140ccb --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/documentation.md @@ -0,0 +1,20 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# googlecloudpubsub + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_receiver.googlecloudpubsub.stream_restarts + +Number of times the stream (re)starts due to a Pub/Sub servers connection close + +The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers +recurrently close the connection after a time period to avoid a long-running sticky connection. This metric +counts the number of the resets that occurred during the lifetime of the container. + + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/receiver/googlecloudpubsubreceiver/factory.go b/receiver/googlecloudpubsubreceiver/factory.go index 802718a55fb42..d627c3d0efbc1 100644 --- a/receiver/googlecloudpubsubreceiver/factory.go +++ b/receiver/googlecloudpubsubreceiver/factory.go @@ -41,24 +41,24 @@ func (factory *pubsubReceiverFactory) CreateDefaultConfig() component.Config { return &Config{} } -func (factory *pubsubReceiverFactory) ensureReceiver(params receiver.Settings, config component.Config) (*pubsubReceiver, error) { +func (factory *pubsubReceiverFactory) ensureReceiver(settings receiver.Settings, config component.Config) (*pubsubReceiver, error) { receiver := factory.receivers[config.(*Config)] if receiver != nil { return receiver, nil } rconfig := config.(*Config) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: params.ID, + ReceiverID: settings.ID, Transport: reportTransport, - ReceiverCreateSettings: params, + ReceiverCreateSettings: settings, }) if err != nil { return nil, err } receiver = &pubsubReceiver{ - logger: params.Logger, + settings: settings, obsrecv: obsrecv, - userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", params.BuildInfo.Version), + userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", settings.BuildInfo.Version), config: rconfig, } factory.receivers[config.(*Config)] = receiver diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index 1b7a5df00ffda..49eb5ea4f567c 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -1,10 +1,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver -go 1.22.0 +go 1.22.7 + +toolchain go1.23.4 require ( cloud.google.com/go/logging v1.13.0 - cloud.google.com/go/pubsub v1.45.3 + cloud.google.com/go/pubsub v1.46.0 github.com/google/go-cmp v0.6.0 github.com/googleapis/gax-go/v2 v2.14.1 github.com/iancoleman/strcase v0.3.0 @@ -20,23 +22,27 @@ require ( go.opentelemetry.io/collector/pdata v1.24.1-0.20250131104636-a737a48402e0 go.opentelemetry.io/collector/receiver v0.118.1-0.20250131104636-a737a48402e0 go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250131104636-a737a48402e0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 google.golang.org/api v0.219.0 - google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 - google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 + google.golang.org/genproto v0.0.0-20250127172529-29210b9bc287 + google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.4 ) require ( - cloud.google.com/go v0.117.0 // indirect + cloud.google.com/go v0.118.0 // indirect cloud.google.com/go/auth v0.14.0 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect cloud.google.com/go/compute/metadata v0.6.0 // indirect - cloud.google.com/go/iam v1.2.2 // indirect - cloud.google.com/go/longrunning v0.6.2 // indirect + cloud.google.com/go/iam v1.3.1 // indirect + cloud.google.com/go/longrunning v0.6.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -57,26 +63,22 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - go.einride.tech/aip v0.68.0 // indirect + go.einride.tech/aip v0.68.1 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250131104636-a737a48402e0 // indirect + go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250130224751-50b76b95bf35 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250131104636-a737a48402e0 // indirect + go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250130224751-50b76b95bf35 // indirect + go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250130224751-50b76b95bf35 // indirect go.opentelemetry.io/collector/extension v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/collector/featuregate v1.24.1-0.20250131104636-a737a48402e0 // indirect + go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250130224751-50b76b95bf35 // indirect + go.opentelemetry.io/collector/featuregate v1.24.1-0.20250130224751-50b76b95bf35 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/collector/pipeline v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250131104636-a737a48402e0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/collector/pipeline v0.118.1-0.20250130224751-50b76b95bf35 // indirect + go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250130224751-50b76b95bf35 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect diff --git a/receiver/googlecloudpubsubreceiver/go.sum b/receiver/googlecloudpubsubreceiver/go.sum index 6fc2cb9c0eaeb..181f09eecc386 100644 --- a/receiver/googlecloudpubsubreceiver/go.sum +++ b/receiver/googlecloudpubsubreceiver/go.sum @@ -1,20 +1,20 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.117.0 h1:Z5TNFfQxj7WG2FgOGX1ekC5RiXrYgms6QscOm32M/4s= -cloud.google.com/go v0.117.0/go.mod h1:ZbwhVTb1DBGt2Iwb3tNO6SEK4q+cplHZmLWH+DelYYc= +cloud.google.com/go v0.118.0 h1:tvZe1mgqRxpiVa3XlIGMiPcEUbP1gNXELgD4y/IXmeQ= +cloud.google.com/go v0.118.0/go.mod h1:zIt2pkedt/mo+DQjcT4/L3NDxzHPR29j5HcclNH+9PM= cloud.google.com/go/auth v0.14.0 h1:A5C4dKV/Spdvxcl0ggWwWEzzP7AZMJSEIgrkngwhGYM= cloud.google.com/go/auth v0.14.0/go.mod h1:CYsoRL1PdiDuqeQpZE0bP2pnPrGqFcOkI0nldEQis+A= cloud.google.com/go/auth/oauth2adapt v0.2.7 h1:/Lc7xODdqcEw8IrZ9SvwnlLX6j9FHQM74z6cBk9Rw6M= cloud.google.com/go/auth/oauth2adapt v0.2.7/go.mod h1:NTbTTzfvPl1Y3V1nPpOgl2w6d/FjO7NNUQaWSox6ZMc= cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I= cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg= -cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA= -cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY= +cloud.google.com/go/iam v1.3.1 h1:KFf8SaT71yYq+sQtRISn90Gyhyf4X8RGgeAVC8XGf3E= +cloud.google.com/go/iam v1.3.1/go.mod h1:3wMtuyT4NcbnYNPLMBzYRFiEfjKfJlLVLrisE7bwm34= cloud.google.com/go/logging v1.13.0 h1:7j0HgAp0B94o1YRDqiqm26w4q1rDMH7XNRU34lJXHYc= cloud.google.com/go/logging v1.13.0/go.mod h1:36CoKh6KA/M0PbhPKMq6/qety2DCAErbhXT62TuXALA= -cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc= -cloud.google.com/go/longrunning v0.6.2/go.mod h1:k/vIs83RN4bE3YCswdXC5PFfWVILjm3hpEUlSko4PiI= -cloud.google.com/go/pubsub v1.45.3 h1:prYj8EEAAAwkp6WNoGTE4ahe0DgHoyJd5Pbop931zow= -cloud.google.com/go/pubsub v1.45.3/go.mod h1:cGyloK/hXC4at7smAtxFnXprKEFTqmMXNNd9w+bd94Q= +cloud.google.com/go/longrunning v0.6.4 h1:3tyw9rO3E2XVXzSApn1gyEEnH2K9SynNQjMlBi3uHLg= +cloud.google.com/go/longrunning v0.6.4/go.mod h1:ttZpLCe6e7EXvn9OxpBRx7kZEB0efv8yBO6YnVMfhJs= +cloud.google.com/go/pubsub v1.46.0 h1:C4EzOr0VuaesIFVFoYTcpCmXIyPVDPtvD3DEyn1Sqew= +cloud.google.com/go/pubsub v1.46.0/go.mod h1:qdGehMCX7Qnuoo8EhzKLXgbm3x8a8BVxfm/c06ZnRyE= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= @@ -118,8 +118,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw= -go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg= +go.einride.tech/aip v0.68.1 h1:16/AfSxcQISGN5z9C5lM+0mLYXihrHbQ1onvYTr93aQ= +go.einride.tech/aip v0.68.1/go.mod h1:XaFtaj4HuA3Zwk9xoBtTWgNubZ0ZZXv9BZJCkuKuWbg= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -128,20 +128,20 @@ go.opentelemetry.io/collector/component v0.118.1-0.20250131104636-a737a48402e0 h go.opentelemetry.io/collector/component v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:X6S0R6vXECThDa1q+m682asO/WEfMWIMeafJsFidr3E= go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250131104636-a737a48402e0 h1:NqyMxc82JbeSGeU8H86HV9ZNVd8h8t0YTG6ZyEVX1B8= go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:VyxLB3apQi4ureRTv8HhSGrKpimubvgWs1yiXMubYmc= -go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250131104636-a737a48402e0 h1:WeGkTYG42TL618F/ofton2ZUH+QZbDIvc8FZi8kx7Yo= -go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250131104636-a737a48402e0/go.mod h1:cleBc9I0DIWpTiiHfu9v83FUaCTqcPXmebpLxjEIqro= +go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250130224751-50b76b95bf35 h1:viAYQrxX9L6c7V1P+uij9WT7Ddt1kPzQZL124X5Gz5k= +go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250130224751-50b76b95bf35/go.mod h1:cleBc9I0DIWpTiiHfu9v83FUaCTqcPXmebpLxjEIqro= go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250131104636-a737a48402e0 h1:IjOEQjR82LX/HjYukDu3uLLKfsiSR+qHD3CiS2uDeTg= go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:SlBEwQg0qly75rXZ6W1Ig8jN25KBVBkFIIAUI1GiAAE= go.opentelemetry.io/collector/confmap v1.24.1-0.20250131104636-a737a48402e0 h1:0EEEoCOWZhxIj+w8lw8yQxCRwKZjPrNVV1paD0nzr+I= go.opentelemetry.io/collector/confmap v1.24.1-0.20250131104636-a737a48402e0/go.mod h1:Rrhs+MWoaP6AswZp+ReQ2VO9dfOfcUjdjiSHBsG+nec= go.opentelemetry.io/collector/consumer v1.24.1-0.20250131104636-a737a48402e0 h1:UsiRofRX8S11uD9jqUcTePRQRoVGQ6RoyX+4ae9fW1M= go.opentelemetry.io/collector/consumer v1.24.1-0.20250131104636-a737a48402e0/go.mod h1:LN0625PHzPlGbzwGlRj5SG4/URKxkw/aFsBvvO4GQWU= -go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250131104636-a737a48402e0 h1:L1A5Qs+dirs2M9oxZdwYtSmQ8ZiTrd/ZEs466ld5dBI= -go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:wJiNWl/ieSTcok2WE4f1H9whdmS1N0KZrEwub85PMtw= +go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250130224751-50b76b95bf35 h1:QSG231cuUlnUbjZ5qlrYBXp4T3MP8X9A8jzKDQfPvcU= +go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250130224751-50b76b95bf35/go.mod h1:wJiNWl/ieSTcok2WE4f1H9whdmS1N0KZrEwub85PMtw= go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250131104636-a737a48402e0 h1:UubD5RYQIhvKdyIRjaedNocOYPhi9z8D6O1kuEPRBNE= go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:RTVwAJCP2PHuSGTmsqa66pQqX6rSdkDqzjdXftmiO2w= -go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250131104636-a737a48402e0 h1:9yZ6SkvPzviqarjHS38lC5z5dMMz/d3cNaPbNlPgvCM= -go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:++/yvA1L5F8+5dU7PdZ4Y4il5OtzJhdKjVYLureVsJY= +go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250130224751-50b76b95bf35 h1:np/OvM5XBNGrTo0+P/tJSxo1cFcocbBJw9WP4P9kJ4g= +go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250130224751-50b76b95bf35/go.mod h1:++/yvA1L5F8+5dU7PdZ4Y4il5OtzJhdKjVYLureVsJY= go.opentelemetry.io/collector/exporter v0.118.1-0.20250131104636-a737a48402e0 h1:uL3hORFvf/T8RODynIy9VCidk0AbE2U3kMBW8ilMTDg= go.opentelemetry.io/collector/exporter v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:dOEuzXCvDxGDRwQepMPly5q5H8p+ccRElaAbshWQIzA= go.opentelemetry.io/collector/exporter/exportertest v0.118.0 h1:8gWky42BcJsxoaqWbnqCDUjP3Y84hjC6RD/UWHwR7sI= @@ -152,28 +152,28 @@ go.opentelemetry.io/collector/extension v0.118.1-0.20250131104636-a737a48402e0 h go.opentelemetry.io/collector/extension v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:PnkzHrY8LNYZsUqiI2wqBp441hdx2gs/LjKJkvM8Ses= go.opentelemetry.io/collector/extension/extensiontest v0.118.0 h1:rKBUaFS9elGfENG45wANmrwx7mHsmt1+YWCzxjftElg= go.opentelemetry.io/collector/extension/extensiontest v0.118.0/go.mod h1:CqNXzkIOR32D8EUpptpOXhpFkibs3kFlRyNMEgIW8l4= -go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250131104636-a737a48402e0 h1:LHdi1/Z2kNDqnzEsFoiRDc7rPA0Vd21ylXxi1PLStHs= -go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:XyApiptNHX3S2vG3R3yg89gP94QyJ8bcUgasP5XSzdo= -go.opentelemetry.io/collector/featuregate v1.24.1-0.20250131104636-a737a48402e0 h1:VoGCPXTMfrZV38x6PtmtmzxceNQOvjsOh6rffW//Qo4= -go.opentelemetry.io/collector/featuregate v1.24.1-0.20250131104636-a737a48402e0/go.mod h1:3GaXqflNDVwWndNGBJ1+XJFy3Fv/XrFgjMN60N3z7yg= +go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250130224751-50b76b95bf35 h1:cciDElrrRUFeY0o90iR78dhvYc53SGDkTW+OaDoT5zc= +go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250130224751-50b76b95bf35/go.mod h1:XyApiptNHX3S2vG3R3yg89gP94QyJ8bcUgasP5XSzdo= +go.opentelemetry.io/collector/featuregate v1.24.1-0.20250130224751-50b76b95bf35 h1:HcAKilzY73kpAO68nu9kC8y37qx4XummMZfp4kmm7Uc= +go.opentelemetry.io/collector/featuregate v1.24.1-0.20250130224751-50b76b95bf35/go.mod h1:3GaXqflNDVwWndNGBJ1+XJFy3Fv/XrFgjMN60N3z7yg= go.opentelemetry.io/collector/pdata v1.24.1-0.20250131104636-a737a48402e0 h1:HRLqRiylBA5Ho2g/Xt48CCRQnzlYG6Zc8OGUR7XHFzg= go.opentelemetry.io/collector/pdata v1.24.1-0.20250131104636-a737a48402e0/go.mod h1:Zs7D4RXOGS7E2faGc/jfWdbmhoiHBxA7QbpuJOioxq8= go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250131104636-a737a48402e0 h1:PgVSar7jY7KrhIV4XM5LpSPkres3sBAA/alH3fwIAp4= go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:9J3kPFbrqgkgzQM3mukDr4daWmD08NvRCCymoAPiusI= go.opentelemetry.io/collector/pdata/testdata v0.118.0 h1:5N0w1SX9KIRkwvtkrpzQgXy9eGk3vfNG0ds6mhEPMIM= go.opentelemetry.io/collector/pdata/testdata v0.118.0/go.mod h1:UY+GHV5bOC1BnFburOZ0wiHReJj1XbW12mi2Ogbc5Lw= -go.opentelemetry.io/collector/pipeline v0.118.1-0.20250131104636-a737a48402e0 h1:PqdUoU2G1UhwXq4rp+feyepg8KvWEC3n/iweh2BtAwg= -go.opentelemetry.io/collector/pipeline v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= +go.opentelemetry.io/collector/pipeline v0.118.1-0.20250130224751-50b76b95bf35 h1:bc1u7P5ZbJkQCJbmJ/QrQ+6/3FehNfq/R0FyGCgvfKY= +go.opentelemetry.io/collector/pipeline v0.118.1-0.20250130224751-50b76b95bf35/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= go.opentelemetry.io/collector/receiver v0.118.1-0.20250131104636-a737a48402e0 h1:lLPg44jZ28MFk5q2CjGlOqvBVJnuj+31uX3GECTS72E= go.opentelemetry.io/collector/receiver v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:huwfVvNKWyMZjmeFOM1YB9HWx3cbJdjzNV9RCwFyl70= go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250131104636-a737a48402e0 h1:HnB93CVLJBm2ceQ92pCXuNiMLpt4NYn3hlVAsfGv1go= go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:w8VuS9eBscFCjjAHc1NKGlCZfJFngpAFfZ/xf/k2gHg= -go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250131104636-a737a48402e0 h1:vlIqQOAQZqWgcRfX1VqTaWqxZfYp8p+v/JlsixiZTPg= -go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250131104636-a737a48402e0/go.mod h1:ScFWnHuCXV0Z30cePBXS7Ud9IXfF8pBQujDCRUZolXw= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= +go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250130224751-50b76b95bf35 h1:PEZK7H+lJSLvRmv2iROQjOmh2HJlUf5ZywWbYQOsUqE= +go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250130224751-50b76b95bf35/go.mod h1:ScFWnHuCXV0Z30cePBXS7Ud9IXfF8pBQujDCRUZolXw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 h1:PS8wXpbyaDJQ2VDHHncMe9Vct0Zn1fEjpsjrLxGJoSc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0/go.mod h1:HDBUsEjOuRC0EzKZ1bSaRGZWUBAzo+MhAcUUORSr4D0= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= @@ -253,10 +253,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= -google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= -google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 h1:CkkIfIt50+lT6NHAVoRYEyAvQGFM7xEwXUUywFvEb3Q= -google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576/go.mod h1:1R3kvZ1dtP3+4p4d3G8uJ8rFk/fWlScl38vanWACI08= +google.golang.org/genproto v0.0.0-20250127172529-29210b9bc287 h1:WoUI1G0DQ648FKvSl756SKxHQR/bI+y4HyyIQfxMWI8= +google.golang.org/genproto v0.0.0-20250127172529-29210b9bc287/go.mod h1:wkQ2Aj/xvshAUDtO/JHvu9y+AaN9cqs28QuSVSHtZSY= +google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 h1:A2ni10G3UlplFrWdCDJTl7D7mJ7GSRm37S+PDimaKRw= +google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4= google.golang.org/genproto/googleapis/rpc v0.0.0-20250124145028-65684f501c47 h1:91mG8dNTpkC0uChJUQ9zCiRqx3GEEFOWaRZ0mI6Oj2I= google.golang.org/genproto/googleapis/rpc v0.0.0-20250124145028-65684f501c47/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/receiver/googlecloudpubsubreceiver/internal/handler.go b/receiver/googlecloudpubsubreceiver/internal/handler.go index 58695666ed942..17b126860b0da 100644 --- a/receiver/googlecloudpubsubreceiver/internal/handler.go +++ b/receiver/googlecloudpubsubreceiver/internal/handler.go @@ -13,9 +13,14 @@ import ( "time" "cloud.google.com/go/pubsub/apiv1/pubsubpb" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) // Time to wait before restarting, when the stream stopped @@ -36,7 +41,8 @@ type StreamHandler struct { streamWaitGroup sync.WaitGroup // wait group for the handler handlerWaitGroup sync.WaitGroup - logger *zap.Logger + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder // time that acknowledge loop waits before acknowledging messages ackBatchWait time.Duration @@ -51,19 +57,21 @@ func (handler *StreamHandler) ack(ackID string) { func NewHandler( ctx context.Context, - logger *zap.Logger, + settings receiver.Settings, + telemetryBuilder *metadata.TelemetryBuilder, client SubscriberClient, clientID string, subscription string, callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error, ) (*StreamHandler, error) { handler := StreamHandler{ - logger: logger, - client: client, - clientID: clientID, - subscription: subscription, - pushMessage: callback, - ackBatchWait: 10 * time.Second, + settings: settings, + telemetryBuilder: telemetryBuilder, + client: client, + clientID: clientID, + subscription: subscription, + pushMessage: callback, + ackBatchWait: 10 * time.Second, } return &handler, handler.initStream(ctx) } @@ -85,6 +93,11 @@ func (handler *StreamHandler) initStream(ctx context.Context) error { _ = handler.stream.CloseSend() return err } + handler.telemetryBuilder.ReceiverGooglecloudpubsubStreamRestarts.Add(ctx, 1, + metric.WithAttributes( + attribute.String("otelcol.component.kind", "receiver"), + attribute.String("otelcol.component.id", handler.settings.ID.String()), + )) return nil } @@ -102,7 +115,7 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) { var loopCtx context.Context loopCtx, cancel := context.WithCancel(ctx) - handler.logger.Info("Starting Streaming Pull") + handler.settings.Logger.Debug("Starting Streaming Pull") handler.streamWaitGroup.Add(2) go handler.requestStream(loopCtx, cancel) go handler.responseStream(loopCtx, cancel) @@ -117,13 +130,13 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) { if handler.isRunning.Load() { err := handler.initStream(ctx) if err != nil { - handler.logger.Error("Failed to recovery stream.") + handler.settings.Logger.Error("Failed to recovery stream.") } } - handler.logger.Warn("End of recovery loop, restarting.") + handler.settings.Logger.Debug("End of recovery loop, restarting.") time.Sleep(streamRecoveryBackoffPeriod) } - handler.logger.Warn("Shutting down recovery loop.") + handler.settings.Logger.Warn("Shutting down recovery loop.") handler.handlerWaitGroup.Done() } @@ -157,15 +170,15 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context. for { if err := handler.acknowledgeMessages(); err != nil { if errors.Is(err, io.EOF) { - handler.logger.Warn("EOF reached") + handler.settings.Logger.Warn("EOF reached") break } - handler.logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err)) + handler.settings.Logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err)) break } select { case <-ctx.Done(): - handler.logger.Warn("requestStream <-ctx.Done()") + handler.settings.Logger.Debug("requestStream <-ctx.Done()") case <-timer.C: timer.Reset(handler.ackBatchWait) } @@ -176,7 +189,7 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context. } } cancel() - handler.logger.Warn("Request Stream loop ended.") + handler.settings.Logger.Debug("Request Stream loop ended.") _ = handler.stream.CloseSend() handler.streamWaitGroup.Done() } @@ -202,18 +215,18 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context case errors.Is(err, io.EOF): activeStreaming = false case !grpcStatus: - handler.logger.Warn("response stream breaking on error", + handler.settings.Logger.Warn("response stream breaking on error", zap.Error(err)) activeStreaming = false case s.Code() == codes.Unavailable: - handler.logger.Info("response stream breaking on gRPC s 'Unavailable'") + handler.settings.Logger.Debug("response stream breaking on gRPC s 'Unavailable'") activeStreaming = false case s.Code() == codes.NotFound: - handler.logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream") + handler.settings.Logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream") time.Sleep(time.Second * 60) activeStreaming = false default: - handler.logger.Warn("response stream breaking on gRPC s "+s.Message(), + handler.settings.Logger.Warn("response stream breaking on gRPC s "+s.Message(), zap.String("s", s.Message()), zap.Error(err)) activeStreaming = false @@ -221,11 +234,11 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context } if errors.Is(ctx.Err(), context.Canceled) { // Canceling the loop, collector is probably stopping - handler.logger.Warn("response stream ctx.Err() == context.Canceled") + handler.settings.Logger.Warn("response stream ctx.Err() == context.Canceled") break } } cancel() - handler.logger.Warn("Response Stream loop ended.") + handler.settings.Logger.Debug("Response Stream loop ended.") handler.streamWaitGroup.Done() } diff --git a/receiver/googlecloudpubsubreceiver/internal/handler_test.go b/receiver/googlecloudpubsubreceiver/internal/handler_test.go index 94b285eb35caf..bf860038ff1ca 100644 --- a/receiver/googlecloudpubsubreceiver/internal/handler_test.go +++ b/receiver/googlecloudpubsubreceiver/internal/handler_test.go @@ -12,10 +12,12 @@ import ( "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/pstest" "github.com/stretchr/testify/assert" - "go.uber.org/zap/zaptest" + "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) func TestCancelStream(t *testing.T) { @@ -41,10 +43,13 @@ func TestCancelStream(t *testing.T) { }) assert.NoError(t, err) + settings := receivertest.NewNopSettings() + telemetryBuilder, _ := metadata.NewTelemetryBuilder(settings.TelemetrySettings) + client, err := pubsub.NewSubscriberClient(ctx, copts...) assert.NoError(t, err) - handler, err := NewHandler(ctx, zaptest.NewLogger(t), client, "client-id", "projects/my-project/subscriptions/otlp", + handler, err := NewHandler(ctx, settings, telemetryBuilder, client, "client-id", "projects/my-project/subscriptions/otlp", func(context.Context, *pubsubpb.ReceivedMessage) error { return nil }) diff --git a/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go new file mode 100644 index 0000000000000..1f1ffdc1ba8c0 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go @@ -0,0 +1,68 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + "sync" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ReceiverGooglecloudpubsubStreamRestarts metric.Int64Counter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// Shutdown unregister all registered callbacks for async instruments. +func (builder *TelemetryBuilder) Shutdown() { + builder.mu.Lock() + defer builder.mu.Unlock() + for _, reg := range builder.registrations { + reg.Unregister() + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.ReceiverGooglecloudpubsubStreamRestarts, err = builder.meter.Int64Counter( + "otelcol_receiver.googlecloudpubsub.stream_restarts", + metric.WithDescription("Number of times the stream (re)starts due to a Pub/Sub servers connection close"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go new file mode 100644 index 0000000000000..8e1906aa90f4f --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,74 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 0000000000000..049ed46ac54f9 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,88 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +type Telemetry struct { + *componenttest.Telemetry +} + +func SetupTelemetry(opts ...componenttest.TelemetryOption) Telemetry { + return Telemetry{Telemetry: componenttest.NewTelemetry(opts...)} +} + +func (tt *Telemetry) NewSettings() receiver.Settings { + set := receivertest.NewNopSettings() + set.ID = component.NewID(component.MustNewType("googlecloudpubsub")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +func (tt *Telemetry) AssertMetrics(t *testing.T, expected []metricdata.Metrics, opts ...metricdatatest.Option) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.Reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := getMetricFromResource(want.Name, md) + metricdatatest.AssertEqual(t, want, got, opts...) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), lenMetrics(md)) +} + +func NewSettings(tt *componenttest.Telemetry) receiver.Settings { + set := receivertest.NewNopSettings() + set.ID = component.NewID(component.MustNewType("googlecloudpubsub")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +func AssertEqualReceiverGooglecloudpubsubStreamRestarts(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver.googlecloudpubsub.stream_restarts", + Description: "Number of times the stream (re)starts due to a Pub/Sub servers connection close", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver.googlecloudpubsub.stream_restarts") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func getMetricFromResource(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func lenMetrics(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 0000000000000..84e0b9500ecd0 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,42 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := SetupTelemetry() + tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) + require.NoError(t, err) + defer tb.Shutdown() + tb.ReceiverGooglecloudpubsubStreamRestarts.Add(context.Background(), 1) + + testTel.AssertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_receiver.googlecloudpubsub.stream_restarts", + Description: "Number of times the stream (re)starts due to a Pub/Sub servers connection close", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {}, + }, + }, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + AssertEqualReceiverGooglecloudpubsubStreamRestarts(t, testTel.Telemetry, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/receiver/googlecloudpubsubreceiver/metadata.yaml b/receiver/googlecloudpubsubreceiver/metadata.yaml index 28d8f8e03fbd0..f5fbc6ac1c876 100644 --- a/receiver/googlecloudpubsubreceiver/metadata.yaml +++ b/receiver/googlecloudpubsubreceiver/metadata.yaml @@ -8,6 +8,20 @@ status: codeowners: active: [alexvanboxel] +telemetry: + metrics: + receiver.googlecloudpubsub.stream_restarts: + enabled: true + description: Number of times the stream (re)starts due to a Pub/Sub servers connection close + unit: "1" + sum: + value_type: int + monotonic: true + extended_documentation: | + The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers + recurrently close the connection after a time period to avoid a long-running sticky connection. This metric + counts the number of the resets that occurred during the lifetime of the container. + tests: config: project: my-project diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index 9fb36f6b1d6d9..4c444f2d8f97a 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -21,16 +21,17 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) // https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#streamingpullrequest type pubsubReceiver struct { - logger *zap.Logger + settings receiver.Settings obsrecv *receiverhelper.ObsReport tracesConsumer consumer.Traces metricsConsumer consumer.Metrics @@ -43,6 +44,7 @@ type pubsubReceiver struct { logsUnmarshaler plog.Unmarshaler handler *internal.StreamHandler startOnce sync.Once + telemetryBuilder *metadata.TelemetryBuilder } type buildInEncoding int @@ -118,6 +120,11 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, host component.Host) return } receiver.client = client + receiver.telemetryBuilder, err = metadata.NewTelemetryBuilder(receiver.settings.TelemetrySettings) + if err != nil { + startErr = fmt.Errorf("failed to create telemetry builder: %w", err) + return + } err = createHandlerFn(ctx) if err != nil { @@ -194,9 +201,9 @@ func (receiver *pubsubReceiver) setMarshallerFromEncodingID(encodingID buildInEn func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { if receiver.handler != nil { - receiver.logger.Info("Stopping Google Pubsub receiver") + receiver.settings.Logger.Info("Stopping Google Pubsub receiver") receiver.handler.CancelNow() - receiver.logger.Info("Stopped Google Pubsub receiver") + receiver.settings.Logger.Info("Stopped Google Pubsub receiver") receiver.handler = nil } if receiver.client == nil { @@ -370,7 +377,8 @@ func (receiver *pubsubReceiver) createMultiplexingReceiverHandler(ctx context.Co var err error receiver.handler, err = internal.NewHandler( ctx, - receiver.logger, + receiver.settings, + receiver.telemetryBuilder, receiver.client, receiver.config.ClientID, receiver.config.Subscription, @@ -432,7 +440,8 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error receiver.handler, err = internal.NewHandler( ctx, - receiver.logger, + receiver.settings, + receiver.telemetryBuilder, receiver.client, receiver.config.ClientID, receiver.config.Subscription, diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index 72eca61b13158..9d32fa157cc49 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -18,8 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/testdata" @@ -27,9 +25,9 @@ import ( func createBaseReceiver() (*pstest.Server, *pubsubReceiver) { srv := pstest.NewServer() - core, _ := observer.New(zap.WarnLevel) + settings := receivertest.NewNopSettings() return srv, &pubsubReceiver{ - logger: zap.New(core), + settings: settings, userAgent: "test-user-agent", config: &Config{ @@ -100,8 +98,7 @@ func TestReceiver(t *testing.T) { }) assert.NoError(t, err) - core, _ := observer.New(zap.WarnLevel) - params := receivertest.NewNopSettings() + settings := receivertest.NewNopSettings() traceSink := new(consumertest.TracesSink) metricSink := new(consumertest.MetricsSink) logSink := new(consumertest.LogsSink) @@ -110,12 +107,12 @@ func TestReceiver(t *testing.T) { ReceiverID: component.NewID(metadata.Type), Transport: reportTransport, LongLivedCtx: false, - ReceiverCreateSettings: params, + ReceiverCreateSettings: settings, }) require.NoError(t, err) receiver := &pubsubReceiver{ - logger: zap.New(core), + settings: settings, obsrecv: obsrecv, userAgent: "test-user-agent",