Skip to content

Commit ac40f57

Browse files
committed
feat: improve updater, tests
1 parent e983775 commit ac40f57

File tree

3 files changed

+148
-59
lines changed

3 files changed

+148
-59
lines changed

internal/proxy/proxy.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,20 @@ func (p *Proxy) next() *Server {
111111
return p.next()
112112
}
113113

114-
func (p *Proxy) update(providers []seed.Provider) error {
114+
func (p *Proxy) update(seed seed.Seed) {
115+
var err error
116+
switch p.kind {
117+
case RPC:
118+
err = p.doUpdate(seed.APIs.RPC)
119+
case Rest:
120+
err = p.doUpdate(seed.APIs.Rest)
121+
}
122+
if err != nil {
123+
slog.Error("could not update seed", "err", err)
124+
}
125+
}
126+
127+
func (p *Proxy) doUpdate(providers []seed.Provider) error {
115128
p.mu.Lock()
116129
defer p.mu.Unlock()
117130

@@ -153,12 +166,7 @@ func (p *Proxy) Start(ctx context.Context) {
153166
for {
154167
select {
155168
case seed := <-p.ch:
156-
switch p.kind {
157-
case RPC:
158-
p.update(seed.APIs.RPC)
159-
case Rest:
160-
p.update(seed.APIs.Rest)
161-
}
169+
p.update(seed)
162170
case <-ctx.Done():
163171
p.shuttingDown.Store(true)
164172
return

internal/proxy/proxy_test.go

Lines changed: 55 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package proxy
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
76
"io"
87
"net/http"
@@ -17,53 +16,33 @@ import (
1716
)
1817

1918
func TestProxy(t *testing.T) {
20-
const chainID = "unittest"
19+
for name, kind := range map[string]ProxyKind{
20+
"rpc": RPC,
21+
"rest": Rest,
22+
} {
23+
t.Run(name, func(t *testing.T) {
24+
testProxy(t, kind)
25+
})
26+
}
27+
}
28+
29+
func testProxy(tb testing.TB, kind ProxyKind) {
2130
srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2231
_, _ = io.WriteString(w, "srv1 replied")
2332
}))
24-
t.Cleanup(srv1.Close)
33+
tb.Cleanup(srv1.Close)
2534
srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2635
time.Sleep(time.Millisecond * 500)
2736
_, _ = io.WriteString(w, "srv2 replied")
2837
}))
29-
t.Cleanup(srv2.Close)
38+
tb.Cleanup(srv2.Close)
3039
srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3140
w.WriteHeader(http.StatusTeapot)
3241
}))
33-
t.Cleanup(srv2.Close)
34-
35-
seed := seed.Seed{
36-
ChainID: chainID,
37-
APIs: seed.Apis{
38-
RPC: []seed.Provider{
39-
{
40-
Address: srv1.URL,
41-
Provider: "srv1",
42-
},
43-
{
44-
Address: srv2.URL,
45-
Provider: "srv2",
46-
},
47-
{
48-
Address: srv3.URL,
49-
Provider: "srv3",
50-
},
51-
},
52-
},
53-
}
54-
55-
t.Logf("%+v", seed)
56-
57-
seedSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
58-
bts, _ := json.Marshal(seed)
59-
_, _ = w.Write(bts)
60-
}))
61-
t.Cleanup(seedSrv.Close)
42+
tb.Cleanup(srv2.Close)
6243

63-
proxy := New(config.Config{
64-
SeedURL: seedSrv.URL,
65-
SeedRefreshInterval: 500 * time.Millisecond,
66-
ChainID: chainID,
44+
ch := make(chan seed.Seed, 1)
45+
proxy := New(kind, ch, config.Config{
6746
HealthyThreshold: 10 * time.Millisecond,
6847
ProxyRequestTimeout: time.Second,
6948
UnhealthyServerRecoverChancePct: 1,
@@ -72,19 +51,43 @@ func TestProxy(t *testing.T) {
7251
})
7352

7453
ctx, cancel := context.WithCancel(context.Background())
75-
t.Cleanup(cancel)
54+
tb.Cleanup(cancel)
7655
proxy.Start(ctx)
7756

78-
require.Len(t, proxy.servers, 3)
57+
serverList := []seed.Provider{
58+
{
59+
Address: srv1.URL,
60+
Provider: "srv1",
61+
},
62+
{
63+
Address: srv2.URL,
64+
Provider: "srv2",
65+
},
66+
{
67+
Address: srv3.URL,
68+
Provider: "srv3",
69+
},
70+
}
71+
72+
ch <- seed.Seed{
73+
APIs: seed.Apis{
74+
Rest: serverList,
75+
RPC: serverList,
76+
},
77+
}
78+
79+
require.Eventually(tb, func() bool { return proxy.initialized.Load() }, time.Second, time.Millisecond)
80+
81+
require.Len(tb, proxy.servers, 3)
7982

8083
proxySrv := httptest.NewServer(proxy)
81-
t.Cleanup(proxySrv.Close)
84+
tb.Cleanup(proxySrv.Close)
8285

8386
var wg errgroup.Group
8487
wg.SetLimit(20)
8588
for i := 0; i < 100; i++ {
8689
wg.Go(func() error {
87-
t.Log("go")
90+
tb.Log("go")
8891
req, err := http.NewRequest(http.MethodGet, proxySrv.URL, nil)
8992
if err != nil {
9093
return err
@@ -102,13 +105,13 @@ func TestProxy(t *testing.T) {
102105
return nil
103106
})
104107
}
105-
require.NoError(t, wg.Wait())
108+
require.NoError(tb, wg.Wait())
106109

107110
// stop the proxy
108111
cancel()
109112

110113
stats := proxy.Stats()
111-
require.Len(t, stats, 3)
114+
require.Len(tb, stats, 3)
112115

113116
var srv1Stats ServerStat
114117
var srv2Stats ServerStat
@@ -124,13 +127,13 @@ func TestProxy(t *testing.T) {
124127
srv3Stats = st
125128
}
126129
}
127-
require.Zero(t, srv1Stats.ErrorRate)
128-
require.Zero(t, srv2Stats.ErrorRate)
129-
require.Equal(t, float64(100), srv3Stats.ErrorRate)
130-
require.Greater(t, srv1Stats.Requests, srv2Stats.Requests)
131-
require.Greater(t, srv2Stats.Avg, srv1Stats.Avg)
132-
require.False(t, srv1Stats.Degraded)
133-
require.True(t, srv2Stats.Degraded)
134-
require.True(t, srv1Stats.Initialized)
135-
require.True(t, srv2Stats.Initialized)
130+
require.Zero(tb, srv1Stats.ErrorRate)
131+
require.Zero(tb, srv2Stats.ErrorRate)
132+
require.Equal(tb, float64(100), srv3Stats.ErrorRate)
133+
require.Greater(tb, srv1Stats.Requests, srv2Stats.Requests)
134+
require.Greater(tb, srv2Stats.Avg, srv1Stats.Avg)
135+
require.False(tb, srv1Stats.Degraded)
136+
require.True(tb, srv2Stats.Degraded)
137+
require.True(tb, srv1Stats.Initialized)
138+
require.True(tb, srv2Stats.Initialized)
136139
}

internal/seed/updater_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package seed
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"sync/atomic"
9+
"testing"
10+
"time"
11+
12+
"github.com/akash-network/rpc-proxy/internal/config"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestUpdater(t *testing.T) {
17+
chainID := "test"
18+
seed := Seed{
19+
ChainID: chainID,
20+
APIs: Apis{
21+
RPC: []Provider{
22+
{
23+
Address: "http://rpc.local",
24+
Provider: "rpc-provider",
25+
},
26+
},
27+
Rest: []Provider{
28+
{
29+
Address: "http://rest.local",
30+
Provider: "rest-provider",
31+
},
32+
},
33+
},
34+
}
35+
36+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
37+
bts, _ := json.Marshal(seed)
38+
_, _ = w.Write(bts)
39+
}))
40+
t.Cleanup(srv.Close)
41+
42+
rpc := make(chan Seed, 1)
43+
rest := make(chan Seed, 1)
44+
45+
up := New(config.Config{
46+
SeedRefreshInterval: time.Millisecond,
47+
SeedURL: srv.URL,
48+
ChainID: chainID,
49+
}, rpc, rest)
50+
51+
ctx, cancel := context.WithCancel(context.Background())
52+
t.Cleanup(cancel)
53+
up.Start(ctx)
54+
55+
go func() {
56+
time.Sleep(time.Millisecond * 500)
57+
cancel()
58+
}()
59+
60+
var rpcUpdates, restUpdates atomic.Uint32
61+
62+
outer:
63+
for {
64+
select {
65+
case got := <-rpc:
66+
rpcUpdates.Add(1)
67+
require.Equal(t, seed, got)
68+
case got := <-rest:
69+
restUpdates.Add(1)
70+
require.Equal(t, seed, got)
71+
case <-ctx.Done():
72+
break outer
73+
}
74+
}
75+
76+
require.NotZero(t, rpcUpdates.Load())
77+
require.NotZero(t, restUpdates.Load())
78+
}

0 commit comments

Comments
 (0)