Skip to content

Commit 7091433

Browse files
committed
fix(app): fixes lint issues and test cases
Signed-off-by: Mohit Nagaraj <mohitnagaraj20@gmail.com>
1 parent f1b7b00 commit 7091433

File tree

21 files changed

+492
-212
lines changed

21 files changed

+492
-212
lines changed

CONTRIBUTING.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ Or you may configure your IDE, for example, Visual Studio Code to automatically
5252

5353

5454
#### Tests
55-
Users can now test their code on their local machine against the CI checks implemented using `make run-tests`.
55+
Users can now test their code on their local machine against the CI checks implemented using `make test`.
5656

5757
To test code changes on your local machine, run the following command:
5858
```
59-
make run-tests
59+
make test
6060
```
6161

6262
#### Building Docker image

broker/nats/nats.go

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package nats
22

33
import (
4+
"encoding/json"
45
"log"
56
"strings"
67
"sync"
78
"time"
89

910
"github.com/meshery/meshkit/broker"
11+
"github.com/meshery/meshkit/errors"
1012
nats "github.com/nats-io/nats.go"
1113
)
1214

@@ -23,10 +25,36 @@ type Options struct {
2325
MaxReconnect int
2426
}
2527

28+
// NatsConn defines the minimal interface for a NATS connection used by Nats
29+
// Only the methods used in Nats are included
30+
type NatsConn interface {
31+
Servers() []string
32+
Drain() error
33+
Close()
34+
Publish(subject string, data []byte) error
35+
QueueSubscribe(subject, queue string, cb func(msg *nats.Msg)) (*nats.Subscription, error)
36+
// Opts returns the options struct (for Info)
37+
Opts() nats.Options
38+
}
39+
40+
// natsConnWrapper adapts *nats.Conn to the NatsConn interface
41+
type natsConnWrapper struct {
42+
*nats.Conn
43+
}
44+
45+
func (w *natsConnWrapper) Opts() nats.Options {
46+
return w.Conn.Opts
47+
}
48+
49+
func (w *natsConnWrapper) QueueSubscribe(subject, queue string, cb func(msg *nats.Msg)) (*nats.Subscription, error) {
50+
// Adapt the callback to nats.MsgHandler
51+
return w.Conn.QueueSubscribe(subject, queue, nats.MsgHandler(cb))
52+
}
53+
2654
// Nats will implement Nats subscribe and publish functionality
2755
type Nats struct {
28-
ec *nats.EncodedConn
29-
wg *sync.WaitGroup
56+
conn NatsConn
57+
wg *sync.WaitGroup
3058
}
3159

3260
// New - constructor
@@ -57,46 +85,65 @@ func New(opts Options) (broker.Handler, error) {
5785
return nil, ErrConnect(err)
5886
}
5987

60-
ec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
61-
if err != nil {
62-
return nil, ErrEncodedConn(err)
63-
}
64-
65-
return &Nats{ec: ec}, nil
88+
return &Nats{conn: &natsConnWrapper{nc}, wg: &sync.WaitGroup{}}, nil
6689
}
90+
6791
func (n *Nats) ConnectedEndpoints() (endpoints []string) {
68-
for _, server := range n.ec.Conn.Servers() {
92+
for _, server := range n.conn.Servers() {
6993
endpoints = append(endpoints, strings.TrimPrefix(server, "nats://"))
7094
}
7195
return
7296
}
7397

7498
func (n *Nats) Info() string {
75-
if n.ec == nil || n.ec.Conn == nil {
99+
if n.conn == nil {
76100
return broker.NotConnected
77101
}
78-
return n.ec.Conn.Opts.Name
102+
return n.conn.Opts().Name
79103
}
80104

81105
func (n *Nats) CloseConnection() {
82-
n.ec.Close()
106+
if n.conn != nil {
107+
if err := n.conn.Drain(); err != nil {
108+
log.Printf("nats: drain error: %v", err)
109+
}
110+
n.conn.Close()
111+
}
83112
}
84113

85114
// Publish - to publish messages
86115
func (n *Nats) Publish(subject string, message *broker.Message) error {
87-
err := n.ec.Publish(subject, message)
116+
if message == nil {
117+
return ErrPublish(errors.New(
118+
"nats_publish_error",
119+
errors.Alert,
120+
[]string{"message is nil"},
121+
[]string{},
122+
[]string{},
123+
[]string{},
124+
))
125+
}
126+
b, err := json.Marshal(message)
88127
if err != nil {
89128
return ErrPublish(err)
90129
}
91-
return nil
130+
return n.conn.Publish(subject, b)
92131
}
93132

94133
// PublishWithChannel - to publish messages with channel
95134
func (n *Nats) PublishWithChannel(subject string, msgch chan *broker.Message) error {
96-
err := n.ec.BindSendChan(subject, msgch)
97-
if err != nil {
98-
return ErrPublish(err)
99-
}
135+
go func() {
136+
for msg := range msgch {
137+
b, err := json.Marshal(msg)
138+
if err != nil {
139+
log.Printf("nats: JSON marshal error: %v", err)
140+
continue
141+
}
142+
if err := n.conn.Publish(subject, b); err != nil {
143+
log.Printf("nats: publish error for subject %s: %v", subject, err)
144+
}
145+
}
146+
}()
100147
return nil
101148
}
102149

@@ -105,7 +152,7 @@ func (n *Nats) PublishWithChannel(subject string, msgch chan *broker.Message) er
105152
// TODO will the method-user just subsribe, how will it handle the received messages?
106153
func (n *Nats) Subscribe(subject, queue string, message []byte) error {
107154
n.wg.Add(1)
108-
_, err := n.ec.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
155+
_, err := n.conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
109156
message = msg.Data
110157
n.wg.Done()
111158
})
@@ -119,7 +166,12 @@ func (n *Nats) Subscribe(subject, queue string, message []byte) error {
119166

120167
// SubscribeWithChannel will publish all the messages received to the given channel
121168
func (n *Nats) SubscribeWithChannel(subject, queue string, msgch chan *broker.Message) error {
122-
_, err := n.ec.BindRecvQueueChan(subject, queue, msgch)
169+
_, err := n.conn.QueueSubscribe(subject, queue, func(m *nats.Msg) {
170+
var msg broker.Message
171+
if err := json.Unmarshal(m.Data, &msg); err == nil {
172+
msgch <- &msg
173+
}
174+
})
123175
if err != nil {
124176
return ErrQueueSubscribe(err)
125177
}
@@ -153,8 +205,5 @@ func (in *Nats) DeepCopyObject() broker.Handler {
153205
// Check if the connection object is empty
154206
func (in *Nats) IsEmpty() bool {
155207
empty := &Nats{}
156-
if in == nil || *in == *empty {
157-
return true
158-
}
159-
return false
208+
return in == nil || *in == *empty
160209
}

broker/nats/nats_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package nats
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"testing"
7+
8+
"github.com/meshery/meshkit/broker"
9+
natsio "github.com/nats-io/nats.go"
10+
)
11+
12+
type mockNatsConn struct {
13+
servers []string
14+
name string
15+
publishErr error
16+
queueSubErr error
17+
closed bool
18+
drainErr error
19+
publishMessages [][]byte
20+
}
21+
22+
func (m *mockNatsConn) Servers() []string { return m.servers }
23+
func (m *mockNatsConn) Publish(subject string, data []byte) error {
24+
m.publishMessages = append(m.publishMessages, data)
25+
return m.publishErr
26+
}
27+
func (m *mockNatsConn) QueueSubscribe(subject, queue string, cb func(msg *natsio.Msg)) (*natsio.Subscription, error) {
28+
if m.queueSubErr != nil {
29+
return nil, m.queueSubErr
30+
}
31+
// Simulate a message
32+
msg := &natsio.Msg{Data: []byte(`{"ObjectType":"request-payload","EventType":"ADDED"}`)}
33+
cb(msg)
34+
return nil, nil
35+
}
36+
func (m *mockNatsConn) Drain() error { return m.drainErr }
37+
func (m *mockNatsConn) Close() { m.closed = true }
38+
func (m *mockNatsConn) Opts() natsio.Options { return natsio.Options{Name: m.name} }
39+
40+
// Ensure mockNatsConn implements NatsConn
41+
var _ NatsConn = (*mockNatsConn)(nil)
42+
43+
func newTestNats(mock NatsConn) *Nats {
44+
return &Nats{conn: mock, wg: &sync.WaitGroup{}}
45+
}
46+
47+
func TestConnectedEndpoints(t *testing.T) {
48+
mock := &mockNatsConn{servers: []string{"nats://localhost:4222", "nats://other:4222"}}
49+
n := newTestNats(mock)
50+
endpoints := n.ConnectedEndpoints()
51+
if len(endpoints) != 2 || endpoints[0] != "localhost:4222" {
52+
t.Errorf("unexpected endpoints: %v", endpoints)
53+
}
54+
}
55+
56+
func TestInfo(t *testing.T) {
57+
n := &Nats{conn: nil}
58+
if n.Info() != broker.NotConnected {
59+
t.Error("expected NotConnected when conn is nil")
60+
}
61+
mock := &mockNatsConn{name: "test-conn"}
62+
n = newTestNats(mock)
63+
if n.Info() != "test-conn" {
64+
t.Errorf("expected 'test-conn', got %s", n.Info())
65+
}
66+
}
67+
68+
func TestCloseConnection(t *testing.T) {
69+
mock := &mockNatsConn{}
70+
n := newTestNats(mock)
71+
n.CloseConnection()
72+
if !mock.closed {
73+
t.Error("expected connection to be closed")
74+
}
75+
}
76+
77+
func TestPublish(t *testing.T) {
78+
mock := &mockNatsConn{}
79+
n := newTestNats(mock)
80+
msg := &broker.Message{ObjectType: broker.Request, EventType: broker.Add}
81+
err := n.Publish("subject", msg)
82+
if err != nil {
83+
t.Errorf("unexpected error: %v", err)
84+
}
85+
// Test error on marshal
86+
err = n.Publish("subject", nil)
87+
if err == nil {
88+
t.Error("expected error for nil message")
89+
}
90+
// Test error on publish
91+
mock.publishErr = errors.New("publish error")
92+
msg = &broker.Message{ObjectType: broker.Request, EventType: broker.Add}
93+
err = n.Publish("subject", msg)
94+
if err == nil {
95+
t.Error("expected error from publish")
96+
}
97+
}
98+
99+
func TestPublishWithChannel(t *testing.T) {
100+
mock := &mockNatsConn{}
101+
n := newTestNats(mock)
102+
ch := make(chan *broker.Message, 1)
103+
ch <- &broker.Message{ObjectType: broker.Request, EventType: broker.Add}
104+
close(ch)
105+
err := n.PublishWithChannel("subject", ch)
106+
if err != nil {
107+
t.Errorf("unexpected error: %v", err)
108+
}
109+
}
110+
111+
func TestSubscribe(t *testing.T) {
112+
mock := &mockNatsConn{}
113+
n := newTestNats(mock)
114+
n.wg = &sync.WaitGroup{} // ensure wg is set
115+
var msg []byte
116+
err := n.Subscribe("subject", "queue", msg)
117+
if err != nil {
118+
t.Errorf("unexpected error: %v", err)
119+
}
120+
// Test error on subscribe
121+
mock.queueSubErr = errors.New("subscribe error")
122+
err = n.Subscribe("subject", "queue", msg)
123+
if err == nil {
124+
t.Error("expected error from subscribe")
125+
}
126+
}
127+
128+
func TestSubscribeWithChannel(t *testing.T) {
129+
mock := &mockNatsConn{}
130+
n := newTestNats(mock)
131+
ch := make(chan *broker.Message, 1)
132+
err := n.SubscribeWithChannel("subject", "queue", ch)
133+
if err != nil {
134+
t.Errorf("unexpected error: %v", err)
135+
}
136+
// Test error on subscribe
137+
mock.queueSubErr = errors.New("subscribe error")
138+
err = n.SubscribeWithChannel("subject", "queue", ch)
139+
if err == nil {
140+
t.Error("expected error from subscribe")
141+
}
142+
}
143+
144+
func TestDeepCopy(t *testing.T) {
145+
n := &Nats{conn: nil, wg: &sync.WaitGroup{}}
146+
copy := n.DeepCopy()
147+
if copy == nil || copy == n {
148+
t.Error("DeepCopy did not create a new instance")
149+
}
150+
}
151+
152+
func TestDeepCopyObject(t *testing.T) {
153+
n := &Nats{conn: nil, wg: &sync.WaitGroup{}}
154+
obj := n.DeepCopyObject()
155+
if obj == nil {
156+
t.Error("DeepCopyObject returned nil")
157+
}
158+
}
159+
160+
func TestIsEmpty(t *testing.T) {
161+
var n *Nats
162+
if !n.IsEmpty() {
163+
t.Error("expected true for nil receiver")
164+
}
165+
n = &Nats{}
166+
if !n.IsEmpty() {
167+
t.Error("expected true for empty Nats struct")
168+
}
169+
mock := &mockNatsConn{}
170+
n = &Nats{conn: mock}
171+
if n.IsEmpty() {
172+
t.Error("expected false for non-empty Nats struct")
173+
}
174+
}

converter/tests/helm_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ func extractManifestFromChart(chartData []byte) (bool, string) {
139139
}
140140
if strings.HasSuffix(hdr.Name, "templates/manifest.yaml") {
141141
buf := new(bytes.Buffer)
142-
io.Copy(buf, tr)
142+
if _, err := io.Copy(buf, tr); err != nil {
143+
return false, ""
144+
}
143145
return true, buf.String()
144146
}
145147
}

files/error.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ func ErrUnsupportedExtensionForOperation(operation string, fileName string, file
6767
return errors.New(ErrUnsupportedExtensionForOperationCode, errors.Critical, sdescription, ldescription, probableCause, remedy)
6868
}
6969

70-
func ErrUnsupportedExtension(fileName string, fileExt string, supportedExtensionsMap map[string]bool) error {
71-
supportedExtensions := slices.Collect(maps.Keys(supportedExtensionsMap))
70+
func ErrUnsupportedExtension(fileName string, fileExt string, supportedExtensions []string) error {
71+
extList := strings.Join(supportedExtensions, ", ")
7272

7373
sdescription := []string{
7474
fmt.Sprintf("The file '%s' has an unsupported extension: '%s'.", fileName, fileExt),
75-
fmt.Sprintf("Supported file extensions are: %s.", strings.Join(supportedExtensions, ", ")),
75+
fmt.Sprintf("Supported file extensions are: %s.", extList),
7676
}
7777

7878
ldescription := []string{
@@ -380,7 +380,7 @@ func ErrInvalidModel(operation string, filename string, err error) error {
380380
// check prefix as random numeric suffix is appended to archive during file handling (eg: .tar becomes .tar263831)
381381
return ErrInvalidModelArchive(filename, err)
382382
default:
383-
supportedExtensions := slices.Collect(maps.Keys(ValidIacExtensions))
383+
supportedExtensions := slices.Clone(ValidIacExtensions)
384384
supportedExtensions = slices.DeleteFunc(supportedExtensions, func(ext string) bool {
385385
return ext == ".zip"
386386
})

0 commit comments

Comments
 (0)