Skip to content

Commit 270f53d

Browse files
committed
Updates to golang market data handling code
1 parent aa7fd93 commit 270f53d

18 files changed

+775
-784
lines changed

api/services.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
// Code to interact with the apis of standard otp service interfaces
1+
// Package api contains Code to interact with the apis of standard otp service interfaces
22
package api
33

44
import (
5+
"errors"
56
"fmt"
67
"github.com/ettec/otp-common/api/executionvenue"
78
"google.golang.org/grpc"
89
"k8s.io/apimachinery/pkg/apis/meta/v1"
910
"k8s.io/client-go/kubernetes"
10-
"log"
11+
"log/slog"
1112
"strconv"
1213
"time"
1314
)
@@ -19,7 +20,7 @@ func GetOrderRouter(clientSet *kubernetes.Clientset, maxConnectRetrySecs time.Du
1920
})
2021

2122
if err != nil {
22-
panic(err)
23+
return nil, fmt.Errorf("failed to list order router services: %w", err)
2324
}
2425

2526
var client executionvenue.ExecutionVenueClient
@@ -34,26 +35,26 @@ func GetOrderRouter(clientSet *kubernetes.Clientset, maxConnectRetrySecs time.Du
3435
}
3536

3637
if podPort == 0 {
37-
log.Printf("ignoring order router service as it does not have a port named executionvenue, service: %v", service)
38+
slog.Info("ignoring order router service as it does not have an api port", "order-router-service", service)
3839
continue
3940
}
4041

4142
targetAddress := service.Name + ":" + strconv.Itoa(int(podPort))
4243

43-
log.Printf("connecting to order router service %v at: %v", service.Name, targetAddress)
44+
slog.Info("connecting to order router service", "name", service.Name, "address", targetAddress)
4445

4546
conn, err := grpc.Dial(targetAddress, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(maxConnectRetrySecs))
4647

4748
if err != nil {
48-
panic(err)
49+
return nil, fmt.Errorf("failed to dial order router service %v at %v: %w", service.Name, targetAddress, err)
4950
}
5051

5152
client = executionvenue.NewExecutionVenueClient(conn)
5253
break
5354
}
5455

5556
if client == nil {
56-
return nil, fmt.Errorf("failed to find order router")
57+
return nil, errors.New("failed to find order router")
5758
}
5859

5960
return client, nil

bootstrap/envvars.go

+37-18
Original file line numberDiff line numberDiff line change
@@ -1,113 +1,132 @@
1-
// Utility functions to read environment variables and set defaults.
1+
// Package bootstrap contains utility functions to read environment variables, if a default is not provided and
2+
// the value of an environment variable is not set then the application will panic.
23
package bootstrap
34

45
import (
5-
"log"
6+
"fmt"
7+
"log/slog"
68
"os"
79
"strconv"
810
)
911

12+
// GetIntEnvVar reads an environment variable and returns the value as an int, if the environment variable is not set
13+
// then the application will panic.
1014
func GetIntEnvVar(key string) int {
1115
value, exists := os.LookupEnv(key)
1216
if !exists {
13-
log.Panicf("missing required env var %v", key)
17+
panic(fmt.Sprintf("missing required env var %v", key))
1418
}
1519

1620
var err error
1721
result, err := strconv.Atoi(value)
1822
if err != nil {
19-
log.Panicf("cannot parse %v, error: %v", key, err)
23+
panic(fmt.Sprintf("cannot parse %v, error: %v", key, err))
2024
}
2125

22-
log.Printf("%v set to %v", key, value)
26+
slog.Info("Environment Variable Set", key, value)
2327

2428
return result
2529
}
2630

31+
// GetEnvVar reads an environment variable and returns the value as a string, if the environment variable is not set
32+
// then the application will panic.
2733
func GetEnvVar(key string) string {
2834
value, exists := os.LookupEnv(key)
2935
if !exists {
30-
log.Panicf("missing required env var %v", key)
36+
panic(fmt.Sprintf("missing required env var %v", key))
3137
}
3238

33-
log.Printf("%v set to %v", key, value)
39+
slog.Info("Environment Variable Set", key, value)
3440

3541
return value
3642
}
3743

44+
// GetOptionalEnvVar reads an environment variable and returns the value as a string, or the default value if the
45+
// environment variable is not set.
3846
func GetOptionalEnvVar(key string, def string) string {
3947
strValue, exists := os.LookupEnv(key)
4048
result := def
4149
if exists {
4250
result = strValue
4351
}
4452

45-
log.Printf("%v set to %v", key, result)
53+
slog.Info("Environment Variable Set", key, strValue)
4654

4755
return result
4856
}
4957

58+
// GetOptionalBoolEnvVar reads an environment variable and returns the value as a bool, or the default value if the
59+
// environment variable is not set. If the environment variable is set but cannot be parsed as a bool then the
60+
// application will panic.
5061
func GetOptionalBoolEnvVar(key string, def bool) bool {
5162
strValue, exists := os.LookupEnv(key)
5263
result := def
5364
if exists {
5465
var err error
5566
result, err = strconv.ParseBool(strValue)
5667
if err != nil {
57-
log.Panicf("cannot parse %v, error: %v", key, err)
68+
panic(fmt.Sprintf("cannot parse %v, error: %v", key, err))
5869
}
5970
}
6071

61-
log.Printf("%v set to %v", key, result)
72+
slog.Info("Environment Variable Set", key, strValue)
6273

6374
return result
6475
}
6576

77+
// GetBoolEnvVar reads an environment variable and returns the value as a bool, if the environment variable is not set
78+
// then the application will panic.
6679
func GetBoolEnvVar(key string) bool {
6780
value, exists := os.LookupEnv(key)
6881
if !exists {
69-
log.Panicf("missing required env var %v", key)
82+
panic(fmt.Sprintf("missing required env var %v", key))
7083
}
7184

7285
var err error
7386
result, err := strconv.ParseBool(value)
7487
if err != nil {
75-
log.Panicf("cannot parse %v, error: %v", key, err)
88+
panic(fmt.Sprintf("cannot parse %v, error: %v", key, err))
7689
}
7790

78-
log.Printf("%v set to %v", key, value)
91+
slog.Info("Environment Variable Set", key, value)
7992

8093
return result
8194
}
8295

96+
// GetOptionalIntEnvVar reads an environment variable and returns the value as an int, or the default value if the
97+
// environment variable is not set. If the environment variable is set but cannot be parsed as an int then the
98+
// application will panic.
8399
func GetOptionalIntEnvVar(key string, def int) int {
84100
strValue, exists := os.LookupEnv(key)
85101
result := def
86102
if exists {
87103
var err error
88104
result, err = strconv.Atoi(strValue)
89105
if err != nil {
90-
log.Panicf("cannot parse %v, error: %v", key, err)
106+
panic(fmt.Sprintf("cannot parse %v, error: %v", key, err))
91107
}
92108
}
93109

94-
log.Printf("%v set to %v", key, result)
110+
slog.Info("Environment Variable Set", key, strValue)
95111

96112
return result
97113
}
98114

115+
// GetOptionalFloatEnvVar reads an environment variable and returns the value as a float64, if the environment variable is not set
116+
// then the application will panic. If the environment variable is set but cannot be parsed as a float64 then the
117+
// application will panic.
99118
func GetOptionalFloatEnvVar(key string, def float64) float64 {
100119
strValue, exists := os.LookupEnv(key)
101120
result := def
102121
if exists {
103122
var err error
104123
result, err = strconv.ParseFloat(strValue, 64)
105124
if err != nil {
106-
log.Panicf("cannot parse %v, error: %v", key, err)
125+
panic(fmt.Sprintf("cannot parse %v, error: %v", key, err))
107126
}
108127
}
109128

110-
log.Printf("%v set to %v", key, result)
129+
slog.Info("Environment Variable Set", key, result)
111130

112131
return result
113-
}
132+
}

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/prometheus/client_golang v1.7.1
99
github.com/segmentio/kafka-go v0.3.4
1010
github.com/shopspring/decimal v0.0.0-20191009025716-f1972eb1d1f5
11+
github.com/stretchr/testify v1.4.0
1112
google.golang.org/grpc v1.25.1
1213
k8s.io/api v0.17.4
1314
k8s.io/apimachinery v0.17.4
@@ -27,6 +28,7 @@ require (
2728
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
2829
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
2930
github.com/modern-go/reflect2 v1.0.1 // indirect
31+
github.com/pmezard/go-difflib v1.0.0 // indirect
3032
github.com/prometheus/client_model v0.2.0 // indirect
3133
github.com/prometheus/common v0.10.0 // indirect
3234
github.com/prometheus/procfs v0.1.3 // indirect

loadbalancing/loadbalancing.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Common utility functions used in load balancing.
1+
// Package loadbalancing contains utility functions used in load balancing.
22
package loadbalancing
33

44
import (
@@ -17,10 +17,10 @@ type BalancingStatefulPod struct {
1717
TargetAddress string
1818
Ordinal int
1919
Name string
20-
Mic string
20+
Mic string
2121
}
2222

23-
func GetBalancingStatefulPod(pod v12.Pod) ( *BalancingStatefulPod, error) {
23+
func GetBalancingStatefulPod(pod v12.Pod) (*BalancingStatefulPod, error) {
2424
const micLabel = "mic"
2525
if _, ok := pod.Labels[micLabel]; !ok {
2626
return nil, fmt.Errorf("ignoring stateful pod as it does not have a mic label, pod: %v", pod)
@@ -30,7 +30,7 @@ func GetBalancingStatefulPod(pod v12.Pod) ( *BalancingStatefulPod, error) {
3030

3131
targetAddress, err := getStatefulSetMemberAddress(pod)
3232
if err != nil {
33-
return nil, fmt.Errorf("failed to get stateful pod address:%v", err)
33+
return nil, fmt.Errorf("failed to get stateful pod address:%v", err)
3434
}
3535

3636
ordinal, err := getStatefulSetPodOrdinal(pod)

marketdata/boundedcircularbuffer.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package marketdata
2+
3+
type boundedCircularBuffer[T any] struct {
4+
buffer []T
5+
capacity int
6+
len int
7+
readPtr int
8+
writePtr int
9+
}
10+
11+
func newBoundedCircularBuffer[T any](capacity int) *boundedCircularBuffer[T] {
12+
b := &boundedCircularBuffer[T]{buffer: make([]T, capacity), capacity: capacity}
13+
14+
return b
15+
}
16+
17+
// true if the buffer is not full and the value is added
18+
func (b *boundedCircularBuffer[T]) addHead(item T) bool {
19+
20+
if b.len == b.capacity {
21+
return false
22+
}
23+
24+
b.buffer[b.writePtr] = item
25+
b.len++
26+
27+
if b.writePtr == b.capacity-1 {
28+
b.writePtr = 0
29+
} else {
30+
b.writePtr++
31+
}
32+
33+
return true
34+
35+
}
36+
37+
func (b *boundedCircularBuffer[T]) getTail() (T, bool) {
38+
var result T
39+
if b.len == 0 {
40+
return result, false
41+
}
42+
43+
return b.buffer[b.readPtr], true
44+
}
45+
46+
// returns the value and true if a value is available
47+
func (b *boundedCircularBuffer[T]) removeTail() (T, bool) {
48+
var result T
49+
if b.len == 0 {
50+
return result, false
51+
}
52+
53+
result = b.buffer[b.readPtr]
54+
b.len--
55+
b.readPtr++
56+
if b.readPtr == b.capacity {
57+
b.readPtr = 0
58+
}
59+
60+
return result, true
61+
62+
}

0 commit comments

Comments
 (0)