Skip to content

Commit

Permalink
refactor admin, raft, and tests: fix deadlocks, slow startup, and
Browse files Browse the repository at this point in the history
  • Loading branch information
bubbajoe committed Jun 20, 2024
1 parent da67815 commit c2c01b8
Show file tree
Hide file tree
Showing 34 changed files with 639 additions and 443 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
- run: cd functional-tests/raft_tests && goreman start &

- name: Wait for server to start
run: sleep 5
run: sleep 10

- name: Functional Standalone Tests
run: |
Expand Down
12 changes: 8 additions & 4 deletions cmd/dgate-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,23 @@ func main() {
}
if dgateConfig, err := config.LoadConfig(*configPath); err != nil {
fmt.Printf("Error loading config: %s\n", err)
os.Exit(1)
panic(err)

Check warning on line 61 in cmd/dgate-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/dgate-server/main.go#L61

Added line #L61 was not covered by tests
} else {
logger, err := dgateConfig.GetLogger()
if err != nil {
fmt.Printf("Error setting up logger: %s\n", err)
os.Exit(1)
panic(err)

Check warning on line 66 in cmd/dgate-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/dgate-server/main.go#L66

Added line #L66 was not covered by tests
}
defer logger.Sync()
proxyState := proxy.NewProxyState(logger.Named("proxy"), dgateConfig)
admin.StartAdminAPI(version, dgateConfig, logger.Named("admin"), proxyState)
err = admin.StartAdminAPI(version, dgateConfig, logger.Named("admin"), proxyState)
if err != nil {
fmt.Printf("Error starting admin api: %s\n", err)
panic(err)

Check warning on line 73 in cmd/dgate-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/dgate-server/main.go#L70-L73

Added lines #L70 - L73 were not covered by tests
}
if err := proxyState.Start(); err != nil {
fmt.Printf("Error loading config: %s\n", err)
os.Exit(1)
panic(err)

Check warning on line 77 in cmd/dgate-server/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/dgate-server/main.go#L77

Added line #L77 was not covered by tests
}

sigchan := make(chan os.Signal, 1)
Expand Down
3 changes: 2 additions & 1 deletion functional-tests/admin_tests/admin_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ TEST_URL=${TEST_URL:-"http://localhost:8888"}

DIR="$( cd "$( dirname "$0" )" && pwd )"

# domain setup
export DGATE_ADMIN_API=$ADMIN_URL

# check if uuid is available
if ! command -v uuid > /dev/null; then
id=X$RANDOM-$RANDOM-$RANDOM
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ require (
github.com/stoewer/go-strcase v1.3.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.1
go.etcd.io/bbolt v1.3.10
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/prometheus v0.48.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.uber.org/zap v1.27.0
golang.org/x/net v0.21.0
golang.org/x/sync v0.7.0
golang.org/x/term v0.19.0
)

Expand Down Expand Up @@ -80,12 +82,10 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/bbolt v1.3.10 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
Expand Down
10 changes: 5 additions & 5 deletions internal/admin/admin_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func newAdminFSM(
logger.Warn("corrupted state detected", zap.ByteString("prev_state", stateBytes))
} else {
logger.Info("found state in store", zap.Any("prev_state", fsm.localState))
return fsm

Check warning on line 42 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L32-L42

Added lines #L32 - L42 were not covered by tests
}
}
return fsm

Check warning on line 45 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L45

Added line #L45 was not covered by tests
Expand Down Expand Up @@ -76,11 +77,10 @@ func (fsm *AdminFSM) applyLog(log *raft.Log, reload bool) (*spec.ChangeLog, erro
}

func (fsm *AdminFSM) Apply(log *raft.Log) any {
resps := fsm.ApplyBatch([]*raft.Log{log})
if len(resps) != 1 {
panic("apply batch not returning the correct number of responses")
if resps := fsm.ApplyBatch([]*raft.Log{log}); len(resps) == 1 {
return resps[0]

Check warning on line 81 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L79-L81

Added lines #L79 - L81 were not covered by tests
}
return resps[0]
panic("apply batch not returning the correct number of responses")

Check warning on line 83 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L83

Added line #L83 was not covered by tests
}

func (fsm *AdminFSM) ApplyBatch(logs []*raft.Log) []any {

Check warning on line 86 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L86

Added line #L86 was not covered by tests
Expand Down Expand Up @@ -115,7 +115,7 @@ func (fsm *AdminFSM) ApplyBatch(logs []*raft.Log) []any {
zap.Uint64("applied_index", lastLogIndex),
)

Check warning on line 116 in internal/admin/admin_fsm.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_fsm.go#L111-L116

Added lines #L111 - L116 were not covered by tests
}
fsm.cs.SetReady(true)
// defer fsm.cs.SetReady(true)
}

return results
Expand Down
76 changes: 48 additions & 28 deletions internal/admin/admin_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,9 @@ func setupRaft(
address := raft.ServerAddress(advertAddr)

raftHttpLogger := logger.Named("http")
if adminConfig.Replication.AdvertScheme != "http" && adminConfig.Replication.AdvertScheme != "https" {
panic(fmt.Errorf("invalid scheme: %s", adminConfig.Replication.AdvertScheme))
}

transport := rafthttp.NewHTTPTransport(
address, http.DefaultClient, raftHttpLogger,
adminConfig.Replication.AdvertScheme+"://(address)/raft",
adminConfig.Replication.AdvertScheme,

Check warning on line 97 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L97

Added line #L97 was not covered by tests
)
fsmLogger := logger.Named("fsm")
adminFSM := newAdminFSM(fsmLogger, configStore, cs)

Check warning on line 100 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L100

Added line #L100 was not covered by tests
Expand All @@ -110,14 +106,13 @@ func setupRaft(
panic(err)
}

cs.SetupRaft(raftNode)

// Setup raft handler
server.Handle("/raft/*", transport)

raftAdminLogger := logger.Named("admin")
raftAdmin := raftadmin.NewRaftAdminHTTPServer(
raftNode, raftAdminLogger, []raft.ServerAddress{address},
raftAdmin := raftadmin.NewServer(
raftNode, raftAdminLogger,
[]raft.ServerAddress{address},

Check warning on line 115 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L113-L115

Added lines #L113 - L115 were not covered by tests
)

// Setup handler for raft admin
Expand All @@ -138,6 +133,39 @@ func setupRaft(
util.JsonResponse(w, http.StatusOK, raftNode.Stats())
}))

Check warning on line 134 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L131-L134

Added lines #L131 - L134 were not covered by tests

// Setup handler for readys
server.Handle("/raftadmin/readyz", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Raft-State", raftNode.State().String())
if err := cs.WaitForChanges(nil); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

Check warning on line 141 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L137-L141

Added lines #L137 - L141 were not covered by tests
}
leaderId, leaderAddr := raftNode.LeaderWithID()
util.JsonResponse(w, http.StatusOK, map[string]any{
"status": "ok",
"proxy_ready": cs.Ready(),
"state": raftNode.State().String(),
"leader": leaderId,
"leader_addr": leaderAddr,
})

Check warning on line 150 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L143-L150

Added lines #L143 - L150 were not covered by tests
}))

doer := func(req *http.Request) (*http.Response, error) {
req.Header.Set("User-Agent", "dgate")
if adminConfig.Replication.SharedKey != "" {
req.Header.Set("X-DGate-Shared-Key", adminConfig.Replication.SharedKey)

Check warning on line 156 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L153-L156

Added lines #L153 - L156 were not covered by tests
}
client := *http.DefaultClient
client.Timeout = time.Second * 10
return client.Do(req)

Check warning on line 160 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L158-L160

Added lines #L158 - L160 were not covered by tests
}
adminClient := raftadmin.NewClient(
doer, logger.Named("raft-admin-client"),
adminConfig.Replication.AdvertScheme,
)

Check warning on line 165 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L162-L165

Added lines #L162 - L165 were not covered by tests

cs.SetupRaft(raftNode, adminClient)

Check warning on line 167 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L167

Added line #L167 was not covered by tests

configFuture := raftNode.GetConfiguration()
if err = configFuture.Error(); err != nil {
panic(err)
Expand Down Expand Up @@ -203,33 +231,25 @@ func setupRaft(
if len(addresses) > 0 {
addresses = append(addresses, adminConfig.Replication.ClusterAddrs...)
retries := 0
doer := func(req *http.Request) (*http.Response, error) {
req.Header.Set("User-Agent", "dgate")
if adminConfig.Replication.SharedKey != "" {
req.Header.Set("X-DGate-Shared-Key", adminConfig.Replication.SharedKey)
}
return http.DefaultClient.Do(req)
}
adminClient := raftadmin.NewHTTPAdminClient(doer,
adminConfig.Replication.AdvertScheme+"://(address)/raftadmin",
logger.Named("raft-admin-client"),
)
RETRY:
for _, url := range addresses {
err = adminClient.VerifyLeader(context.Background(), raft.ServerAddress(url))
for _, addr := range addresses {
err = adminClient.VerifyLeader(
context.Background(),
raft.ServerAddress(addr),
)

Check warning on line 239 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L235-L239

Added lines #L235 - L239 were not covered by tests
if err != nil {
if err == raftadmin.ErrNotLeader {
continue
}
if retries > 15 {
logger.Error("Skipping verifying leader",
zap.String("url", url), zap.Error(err),
zap.String("url", addr), zap.Error(err),

Check warning on line 246 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L246

Added line #L246 was not covered by tests
)
continue
}
retries += 1
logger.Debug("Retrying verifying leader",
zap.String("url", url), zap.Error(err))
zap.String("url", addr), zap.Error(err))

Check warning on line 252 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L252

Added line #L252 was not covered by tests
<-time.After(3 * time.Second)
goto RETRY
}
Expand All @@ -238,10 +258,10 @@ func setupRaft(
logger.Info("Adding non-voter",
zap.String("id", raftId),
zap.String("leader", adminConfig.Replication.AdvertAddr),
zap.String("url", url),
zap.String("url", addr),

Check warning on line 261 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L261

Added line #L261 was not covered by tests
)
resp, err := adminClient.AddNonvoter(
context.Background(), raft.ServerAddress(url),
context.Background(), raft.ServerAddress(addr),

Check warning on line 264 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L264

Added line #L264 was not covered by tests
&raftadmin.AddNonvoterRequest{
ID: raftId,
Address: adminConfig.Replication.AdvertAddr,
Expand All @@ -257,9 +277,9 @@ func setupRaft(
logger.Info("Adding voter: %s - leader: %s",
zap.String("id", raftId),
zap.String("leader", adminConfig.Replication.AdvertAddr),
zap.String("url", url),
zap.String("url", addr),

Check warning on line 280 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L280

Added line #L280 was not covered by tests
)
resp, err := adminClient.AddVoter(context.Background(), raft.ServerAddress(url), &raftadmin.AddVoterRequest{
resp, err := adminClient.AddVoter(context.Background(), raft.ServerAddress(addr), &raftadmin.AddVoterRequest{

Check warning on line 282 in internal/admin/admin_raft.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/admin_raft.go#L282

Added line #L282 was not covered by tests
ID: raftId,
Address: adminConfig.Replication.AdvertAddr,
})
Expand Down
4 changes: 3 additions & 1 deletion internal/admin/admin_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func TestAdminRoutes_configureRoutes(t *testing.T) {
cs.On("DocumentManager").Return(nil)
conf := configtest.NewTestAdminConfig()
if err := configureRoutes(
mux, "test", zap.NewNop(), cs, conf,
mux, "test",
zap.NewNop(),
cs, conf,
); err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/admin/changestate/change_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package changestate

import (
"github.com/dgate-io/dgate/internal/proxy"
"github.com/dgate-io/dgate/pkg/raftadmin"
"github.com/dgate-io/dgate/pkg/resources"
"github.com/dgate-io/dgate/pkg/spec"
"github.com/hashicorp/raft"
Expand All @@ -21,7 +22,7 @@ type ChangeState interface {
SetReady(bool)

// Replication
SetupRaft(*raft.Raft)
SetupRaft(*raft.Raft, *raftadmin.Client)
Raft() *raft.Raft

// Resources
Expand Down
3 changes: 2 additions & 1 deletion internal/admin/changestate/testutil/change_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/dgate-io/dgate/internal/admin/changestate"
"github.com/dgate-io/dgate/pkg/resources"
"github.com/dgate-io/dgate/pkg/spec"
"github.com/dgate-io/dgate/pkg/raftadmin"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -71,7 +72,7 @@ func (m *MockChangeState) ReloadState(a bool, cls ...*spec.ChangeLog) error {
}

// SetupRaft implements changestate.ChangeState.
func (m *MockChangeState) SetupRaft(*raft.Raft) {
func (m *MockChangeState) SetupRaft(*raft.Raft, *raftadmin.Client) {

Check warning on line 75 in internal/admin/changestate/testutil/change_state.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/changestate/testutil/change_state.go#L75

Added line #L75 was not covered by tests
m.Called().Error(0)
}

Expand Down
9 changes: 3 additions & 6 deletions internal/admin/routes/collection_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat
}

cl := spec.NewChangeLog(&collection, collection.NamespaceName, spec.AddCollectionCommand)
err = cs.ApplyChangeLog(cl)
if err != nil {
if err = cs.ApplyChangeLog(cl); err != nil {

Check warning on line 66 in internal/admin/routes/collection_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/routes/collection_routes.go#L66

Added line #L66 was not covered by tests
util.JsonError(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down Expand Up @@ -268,8 +267,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat
}

cl := spec.NewChangeLog(&doc, doc.NamespaceName, spec.AddDocumentCommand)
err = cs.ApplyChangeLog(cl)
if err != nil {
if err = cs.ApplyChangeLog(cl); err != nil {

Check warning on line 270 in internal/admin/routes/collection_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/routes/collection_routes.go#L270

Added line #L270 was not covered by tests
util.JsonError(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down Expand Up @@ -350,8 +348,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat
return
}
cl := spec.NewChangeLog(document, namespaceName, spec.DeleteDocumentCommand)
err = cs.ApplyChangeLog(cl)
if err != nil {
if err = cs.ApplyChangeLog(cl); err != nil {

Check warning on line 351 in internal/admin/routes/collection_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/routes/collection_routes.go#L351

Added line #L351 was not covered by tests
util.JsonError(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down
3 changes: 1 addition & 2 deletions internal/admin/routes/domain_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func ConfigureDomainAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch
domain.NamespaceName = spec.DefaultNamespace.Name
}
cl := spec.NewChangeLog(&domain, domain.NamespaceName, spec.AddDomainCommand)
err = cs.ApplyChangeLog(cl)
if err != nil {
if err = cs.ApplyChangeLog(cl); err != nil {

Check warning on line 44 in internal/admin/routes/domain_routes.go

View check run for this annotation

Codecov / codecov/patch

internal/admin/routes/domain_routes.go#L44

Added line #L44 was not covered by tests
util.JsonError(w, http.StatusBadRequest, err.Error())
return
}
Expand Down
5 changes: 0 additions & 5 deletions internal/admin/routes/misc_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ func ConfigureHealthAPI(server chi.Router, version string, cs changestate.Change
w.Header().Set("Content-Type", "application/json")
if cs.Ready() {
if r := cs.Raft(); r != nil {
if err := cs.WaitForChanges(nil); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(`{"status":"not ready"}`))
return
}
w.Header().Set("X-Raft-State", r.State().String())
if leaderAddr := r.Leader(); leaderAddr == "" {
w.WriteHeader(http.StatusServiceUnavailable)
Expand Down
3 changes: 1 addition & 2 deletions internal/admin/routes/module_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func ConfigureModuleAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch
mod.NamespaceName = spec.DefaultNamespace.Name
}
cl := spec.NewChangeLog(&mod, mod.NamespaceName, spec.DeleteModuleCommand)
err = cs.ApplyChangeLog(cl)
if err != nil {
if err = cs.ApplyChangeLog(cl); err != nil {
util.JsonError(w, http.StatusBadRequest, err.Error())
return
}
Expand Down
Loading

0 comments on commit c2c01b8

Please sign in to comment.