Skip to content

Commit 9913741

Browse files
committed
beholder heartbeat
1 parent adf46dd commit 9913741

File tree

5 files changed

+332
-1
lines changed

5 files changed

+332
-1
lines changed

pkg/beholder/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ type Config struct {
4949
LogRetryConfig *RetryConfig
5050
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter
5151

52+
// Heartbeat
53+
HeartbeatEnabled bool // Enable periodic heartbeat emission
54+
5255
// Auth
5356
AuthPublicKeyHex string
5457
AuthHeaders map[string]string
@@ -115,6 +118,7 @@ func DefaultConfig() Config {
115118
LogMaxQueueSize: 2048,
116119
LogBatchProcessor: true,
117120
LogStreamingEnabled: true, // Enable logs streaming by default
121+
HeartbeatEnabled: true, // Enable heartbeat by default
118122
}
119123
}
120124

pkg/beholder/heartbeat.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package beholder
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"go.opentelemetry.io/otel/attribute"
9+
"go.opentelemetry.io/otel/metric"
10+
"go.opentelemetry.io/otel/trace"
11+
"google.golang.org/protobuf/proto"
12+
13+
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
14+
"github.com/smartcontractkit/chainlink-common/pkg/config/build"
15+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
16+
"github.com/smartcontractkit/chainlink-common/pkg/services"
17+
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
18+
)
19+
20+
// Heartbeat represents a periodic heartbeat service that emits metrics and logs
21+
type Heartbeat struct {
22+
services.Service
23+
eng *services.Engine
24+
25+
Beat time.Duration
26+
Emitter Emitter
27+
Meter metric.Meter
28+
Logger logger.Logger
29+
Tracer trace.Tracer
30+
AppID string
31+
ServiceName string
32+
Version string
33+
Commit string
34+
Labels map[string]string
35+
}
36+
37+
// NewHeartbeat creates a new heartbeat service with custom configuration
38+
func NewHeartbeat(beat time.Duration, lggr logger.Logger, opts ...HeartbeatOpt) *Heartbeat {
39+
// Setup default emitter, meter, and tracer
40+
noopClient := NewNoopClient()
41+
42+
// Create heartbeat with defaults
43+
h := &Heartbeat{
44+
Beat: beat,
45+
Logger: lggr,
46+
Emitter: noopClient.Emitter,
47+
Meter: noopClient.Meter,
48+
Tracer: noopClient.Tracer,
49+
AppID: "chainlink", // Default app ID
50+
ServiceName: build.Program, // Default service name
51+
Version: build.Version, // Use build version
52+
Commit: build.ChecksumPrefix, // Use build commit
53+
Labels: make(map[string]string),
54+
}
55+
56+
// Apply options
57+
for _, opt := range opts {
58+
opt(h)
59+
}
60+
61+
// Build labels from current values
62+
h.Labels = map[string]string{
63+
"service": h.ServiceName,
64+
"version": h.Version,
65+
"commit": h.Commit,
66+
}
67+
if h.AppID != "" {
68+
h.Labels["app_id"] = h.AppID
69+
}
70+
71+
// Create service engine
72+
h.Service, h.eng = services.Config{
73+
Name: "BeholderHeartbeat",
74+
Start: h.start,
75+
}.NewServiceEngine(lggr)
76+
77+
return h
78+
}
79+
80+
// HeartbeatOpt is a functional option for configuring the heartbeat
81+
type HeartbeatOpt func(*Heartbeat)
82+
83+
// WithEmitter sets a custom message emitter for the heartbeat
84+
func WithEmitter(emitter Emitter) HeartbeatOpt {
85+
return func(h *Heartbeat) {
86+
h.Emitter = emitter
87+
}
88+
}
89+
90+
// WithMeter sets a custom meter for the heartbeat
91+
func WithMeter(meter metric.Meter) HeartbeatOpt {
92+
return func(h *Heartbeat) {
93+
h.Meter = meter
94+
}
95+
}
96+
97+
// WithTracer sets a custom tracer for the heartbeat
98+
func WithTracer(tracer trace.Tracer) HeartbeatOpt {
99+
return func(h *Heartbeat) {
100+
h.Tracer = tracer
101+
}
102+
}
103+
104+
// WithAppID sets a custom app ID for the heartbeat
105+
func WithAppID(appID string) HeartbeatOpt {
106+
return func(h *Heartbeat) {
107+
h.AppID = appID
108+
if appID != "" {
109+
h.Labels["app_id"] = appID
110+
} else {
111+
delete(h.Labels, "app_id")
112+
}
113+
}
114+
}
115+
116+
// WithServiceName sets a custom service name for the heartbeat
117+
func WithServiceName(serviceName string) HeartbeatOpt {
118+
return func(h *Heartbeat) {
119+
h.ServiceName = serviceName
120+
h.Labels["service"] = serviceName
121+
}
122+
}
123+
124+
// WithVersion sets a custom version for the heartbeat
125+
func WithVersion(version string) HeartbeatOpt {
126+
return func(h *Heartbeat) {
127+
h.Version = version
128+
h.Labels["version"] = version
129+
}
130+
}
131+
132+
// WithCommit sets a custom commit for the heartbeat
133+
func WithCommit(commit string) HeartbeatOpt {
134+
return func(h *Heartbeat) {
135+
h.Commit = commit
136+
h.Labels["commit"] = commit
137+
}
138+
}
139+
140+
// WithBeatInterval sets a custom beat interval for the heartbeat
141+
func WithBeatInterval(beat time.Duration) HeartbeatOpt {
142+
return func(h *Heartbeat) {
143+
h.Beat = beat
144+
}
145+
}
146+
147+
// start initializes and starts the heartbeat service
148+
func (h *Heartbeat) start(ctx context.Context) error {
149+
// Create heartbeat metrics
150+
heartbeatGauge, err := h.Meter.Int64Gauge("beholder_heartbeat")
151+
if err != nil {
152+
return fmt.Errorf("failed to create heartbeat status gauge: %w", err)
153+
}
154+
155+
heartbeatCount, err := h.Meter.Int64Counter("beholder_heartbeat_count")
156+
if err != nil {
157+
return fmt.Errorf("failed to create heartbeat counter: %w", err)
158+
}
159+
160+
// Define the heartbeat function
161+
beatFn := func(ctx context.Context) {
162+
start := time.Now()
163+
164+
// Create a trace span for the heartbeat
165+
ctx, span := h.Tracer.Start(ctx, "beholder_heartbeat", trace.WithAttributes(
166+
attribute.String("service", h.ServiceName),
167+
attribute.String("app_id", h.AppID),
168+
attribute.String("version", h.Version),
169+
attribute.String("commit", h.Commit),
170+
))
171+
defer span.End()
172+
173+
// Record heartbeat metrics
174+
heartbeatGauge.Record(ctx, 1)
175+
heartbeatCount.Add(ctx, 1)
176+
177+
// Emit heartbeat message
178+
179+
payload := &pb.BaseMessage{
180+
Msg: "beholder heartbeat",
181+
Labels: h.Labels,
182+
}
183+
payloadBytes, err := proto.Marshal(payload)
184+
if err != nil {
185+
// log error
186+
h.Logger.Errorw("heartbeat marshal protobuf failed", "err", err)
187+
}
188+
189+
err = h.Emitter.Emit(ctx, payloadBytes,
190+
AttrKeyDataSchema, "/beholder-base-message/versions/1", // required
191+
AttrKeyDomain, "platform", // required
192+
AttrKeyEntity, "BaseMessage", // required
193+
"service", h.ServiceName,
194+
"app_id", h.AppID,
195+
"version", h.Version,
196+
"commit", h.Commit,
197+
"timestamp", start.Unix(),
198+
)
199+
200+
if err != nil {
201+
h.Logger.Errorw("heartbeat emit failed", "err", err)
202+
}
203+
204+
// Log heartbeat
205+
h.Logger.Debugw("beholder heartbeat emitted",
206+
"service", h.ServiceName,
207+
"app_id", h.AppID,
208+
"version", h.Version,
209+
"commit", h.Commit,
210+
"timestamp", start.Unix(),
211+
)
212+
}
213+
214+
// Start the heartbeat ticker
215+
// Execute immediately first, then continue with regular intervals
216+
h.eng.Go(func(ctx context.Context) {
217+
beatFn(ctx)
218+
})
219+
h.eng.GoTick(timeutil.NewTicker(func() time.Duration { return h.Beat }), beatFn)
220+
221+
h.Logger.Infow("beholder heartbeat service started",
222+
"service", h.ServiceName,
223+
"beat_interval", h.Beat,
224+
)
225+
226+
return nil
227+
}

pkg/beholder/heartbeat_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package beholder_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
12+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
13+
)
14+
15+
func TestHeartbeat_NewHeartbeat(t *testing.T) {
16+
lggr, err := logger.New()
17+
require.NoError(t, err)
18+
19+
heartbeat := beholder.NewHeartbeat(
20+
1*time.Second,
21+
lggr,
22+
beholder.WithAppID("test-app"),
23+
beholder.WithServiceName("test-service"),
24+
beholder.WithVersion("1.0.0"),
25+
beholder.WithCommit("abc123"),
26+
)
27+
require.NotNil(t, heartbeat)
28+
29+
assert.Equal(t, "test-app", heartbeat.AppID)
30+
assert.Equal(t, "test-service", heartbeat.ServiceName)
31+
assert.Equal(t, "1.0.0", heartbeat.Version)
32+
assert.Equal(t, "abc123", heartbeat.Commit)
33+
assert.Equal(t, 1*time.Second, heartbeat.Beat)
34+
assert.NotNil(t, heartbeat.Emitter)
35+
assert.NotNil(t, heartbeat.Meter)
36+
}
37+
38+
func TestHeartbeat_Start(t *testing.T) {
39+
lggr, err := logger.New()
40+
require.NoError(t, err)
41+
42+
heartbeat := beholder.NewHeartbeat(100*time.Millisecond, lggr)
43+
require.NotNil(t, heartbeat)
44+
45+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
46+
defer cancel()
47+
48+
err = heartbeat.Start(ctx)
49+
require.NoError(t, err)
50+
51+
// Wait for at least one heartbeat
52+
time.Sleep(150 * time.Millisecond)
53+
54+
err = heartbeat.Close()
55+
require.NoError(t, err)
56+
}
57+
58+
func TestHeartbeat_Defaults(t *testing.T) {
59+
lggr, err := logger.New()
60+
require.NoError(t, err)
61+
62+
heartbeat := beholder.NewHeartbeat(1*time.Second, lggr)
63+
require.NotNil(t, heartbeat)
64+
65+
// Check defaults
66+
assert.Equal(t, "chainlink", heartbeat.AppID)
67+
assert.Equal(t, "github.com/smartcontractkit/chainlink-common", heartbeat.ServiceName)
68+
assert.Equal(t, "(devel)", heartbeat.Version)
69+
assert.Equal(t, "unset", heartbeat.Commit)
70+
assert.Equal(t, 1*time.Second, heartbeat.Beat)
71+
}

pkg/loop/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const (
6262
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
6363
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"
6464
envTelemetryLogStreamingEnabled = "CL_TELEMETRY_LOG_STREAMING_ENABLED"
65+
envTelemetryHeartbeatEnabled = "CL_TELEMETRY_HEARTBEAT_ENABLED"
6566

6667
envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
6768
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
@@ -118,6 +119,7 @@ type EnvConfig struct {
118119
TelemetryEmitterExportMaxBatchSize int
119120
TelemetryEmitterMaxQueueSize int
120121
TelemetryLogStreamingEnabled bool
122+
TelemetryHeartbeatEnabled bool
121123

122124
ChipIngressEndpoint string
123125
ChipIngressInsecureConnection bool
@@ -187,6 +189,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
187189
add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize))
188190
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))
189191
add(envTelemetryLogStreamingEnabled, strconv.FormatBool(e.TelemetryLogStreamingEnabled))
192+
add(envTelemetryHeartbeatEnabled, strconv.FormatBool(e.TelemetryHeartbeatEnabled))
190193

191194
add(envChipIngressEndpoint, e.ChipIngressEndpoint)
192195
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
@@ -351,6 +354,10 @@ func (e *EnvConfig) parse() error {
351354
if err != nil {
352355
return fmt.Errorf("failed to parse %s: %w", envTelemetryLogStreamingEnabled, err)
353356
}
357+
e.TelemetryHeartbeatEnabled, err = getBool(envTelemetryHeartbeatEnabled)
358+
if err != nil {
359+
return fmt.Errorf("failed to parse %s: %w", envTelemetryHeartbeatEnabled, err)
360+
}
354361
// Optional
355362
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
356363
e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection)

0 commit comments

Comments
 (0)