From 9bcbce43b6f60ce577f25386e32e54561531d558 Mon Sep 17 00:00:00 2001 From: Joe Williams <7463219+bubbajoe@users.noreply.github.com> Date: Fri, 14 Jun 2024 03:32:44 +0900 Subject: [PATCH] Fix/state issues (#12) * fix: persistent/fsm issues * fix!: apply fixes from feat/fix-raft-bug branch --- .github/workflows/discord.yml | 20 +++++ TODO.md | 22 ++--- functional-tests/raft_tests/raft_test.sh | 6 +- functional-tests/raft_tests/test1.yaml | 2 +- go.mod | 4 +- go.sum | 10 +-- internal/admin/admin_fsm.go | 87 +++++-------------- internal/admin/admin_raft.go | 50 +++++++---- internal/admin/admin_routes.go | 68 ++++++--------- internal/admin/changestate/change_state.go | 3 +- .../changestate/testutil/change_state.go | 7 +- internal/admin/routes/collection_routes.go | 7 +- internal/admin/routes/domain_routes.go | 4 +- internal/admin/routes/misc_routes.go | 4 +- internal/admin/routes/module_routes.go | 4 +- internal/admin/routes/namespace_routes.go | 4 +- internal/admin/routes/route_routes.go | 7 +- internal/admin/routes/secret_routes.go | 4 +- internal/admin/routes/service_routes.go | 4 +- internal/config/config.go | 27 +++--- internal/config/loader.go | 4 +- internal/proxy/change_log.go | 50 +++++------ internal/proxy/dynamic_proxy.go | 20 +++-- internal/proxy/proxy_handler.go | 20 ++++- internal/proxy/proxy_replication.go | 8 +- internal/proxy/proxy_state.go | 80 +++++++++++++---- pkg/dgclient/common.go | 2 +- pkg/rafthttp/rafthttp.go | 1 + pkg/spec/change_log.go | 22 ++--- pkg/spec/response_writer_tracker.go | 11 +++ pkg/util/http.go | 21 ++++- 31 files changed, 300 insertions(+), 283 deletions(-) create mode 100644 .github/workflows/discord.yml diff --git a/.github/workflows/discord.yml b/.github/workflows/discord.yml new file mode 100644 index 0000000..c209ce5 --- /dev/null +++ b/.github/workflows/discord.yml @@ -0,0 +1,20 @@ +on: + release: + types: [published] + + jobs: + github-releases-to-discord: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Github Releases To Discord + uses: SethCohen/github-releases-to-discord@v1.13.1 + with: + webhook_url: ${{ secrets.DISCORD_WEBHOOK_URL }} + color: "2105893" + username: "Release Changelog" + avatar_url: "https://github.com/dgate-io.png" + content: "||@everyone||" + footer_title: "Changelog" + footer_timestamp: true \ No newline at end of file diff --git a/TODO.md b/TODO.md index 00ff125..5bb5d29 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ - server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.) - cluster management (raft commands, replica commands, etc.) (low priority) - other commands (backup, restore, etc.) (low priority) - - replace k6 with wrk for performance tests ## Add Module Tests @@ -16,9 +15,6 @@ - [ ] - Add option to specify export variables when ambiguous (?) - [ ] - check how global variable conflicts are handled -## Start using Pkl - -replace dgate server config with pkl. ## dgate-cli declaritive config @@ -70,10 +66,6 @@ expose metrics for the following: - Add Transactions - [ ] - Add transactional support for admin API -## DGate Documentation (dgate.io/docs) - -Use Docusaurus to create the documentation for DGate. - ## DGate Admin Console (low priority) Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser. @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster. -## DGate CLI - argument variable suggestions - -For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match. -``` -dgate-cli ns mk my-ns nmae=my-ns -Variable 'nmae' is not recognized. Did you mean 'name'? -``` - ## DGate CLI - help command show required variables When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables. @@ -159,4 +143,8 @@ Add stack tracing for typescript modules. Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation. -## Add Telemetry (sentry, datadog, etc.) \ No newline at end of file +## Add Telemetry (sentry, datadog, etc.) + +## ResourceManager callback for resource changes + +Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more. \ No newline at end of file diff --git a/functional-tests/raft_tests/raft_test.sh b/functional-tests/raft_tests/raft_test.sh index 4882466..7829b1b 100755 --- a/functional-tests/raft_tests/raft_test.sh +++ b/functional-tests/raft_tests/raft_test.sh @@ -34,7 +34,7 @@ dgate-cli -f domain create name=dm-$id \ dgate-cli -f service create \ name=svc-$id namespace=ns-$id \ - urls="http://localhost:8888/$RANDOM" + urls="http://localhost:8081/$RANDOM" dgate-cli -f route create \ name=rt-$id \ @@ -55,14 +55,14 @@ for i in {1..5}; do done if dgate-cli --admin $ADMIN_URL4 namespace create name=0; then - echo "Expected error when creating namespace" + echo "Expected error when creating namespace on non-voter" exit 1 fi export DGATE_ADMIN_API=$ADMIN_URL5 if dgate-cli --admin $ADMIN_URL5 namespace create name=0; then - echo "Expected error when creating namespace" + echo "Expected error when creating namespace on non-voter" exit 1 fi diff --git a/functional-tests/raft_tests/test1.yaml b/functional-tests/raft_tests/test1.yaml index 5c18c1c..8ce2e76 100644 --- a/functional-tests/raft_tests/test1.yaml +++ b/functional-tests/raft_tests/test1.yaml @@ -1,6 +1,6 @@ version: v1 log_level: info - +debug: true tags: - "dev" - "internal" diff --git a/go.mod b/go.mod index 3b9198e..772cf23 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/knadh/koanf/v2 v2.0.1 github.com/mitchellh/mapstructure v1.5.0 github.com/prometheus/client_golang v1.19.0 - github.com/rs/zerolog v1.31.0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/spf13/pflag v1.0.5 github.com/stoewer/go-strcase v1.3.0 @@ -31,8 +30,8 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.48.0 go.opentelemetry.io/otel/metric v1.26.0 go.opentelemetry.io/otel/sdk/metric v1.26.0 + go.uber.org/zap v1.27.0 golang.org/x/net v0.21.0 - golang.org/x/sync v0.6.0 golang.org/x/term v0.19.0 ) @@ -81,7 +80,6 @@ require ( go.opentelemetry.io/otel/sdk v1.26.0 // indirect go.opentelemetry.io/otel/trace v1.26.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.33.0 // indirect diff --git a/go.sum b/go.sum index 3c6b16a..7bb3c79 100644 --- a/go.sum +++ b/go.sum @@ -24,7 +24,6 @@ github.com/clarkmcc/go-typescript v0.7.0 h1:3nVeaPYyTCWjX6Lf8GoEOTxME2bM5tLuWmwh github.com/clarkmcc/go-typescript v0.7.0/go.mod h1:IZ/nzoVeydAmyfX7l6Jmp8lJDOEnae3jffoXwP4UyYg= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -78,7 +77,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -228,9 +226,6 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c h1:fPpdjePK1atuOg28PXfNSqgwf9I/qD1Hlo39JFwKBXk= github.com/rogpeppe/go-internal v1.11.1-0.20231026093722-fa6a31e0812c/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= -github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= @@ -278,6 +273,8 @@ go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZH go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE= go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= @@ -317,8 +314,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= -golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -340,7 +335,6 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/admin/admin_fsm.go b/internal/admin/admin_fsm.go index 5632be1..781cd17 100644 --- a/internal/admin/admin_fsm.go +++ b/internal/admin/admin_fsm.go @@ -13,39 +13,21 @@ import ( type dgateAdminFSM struct { cs changestate.ChangeState logger *zap.Logger + index uint64 } var _ raft.BatchingFSM = (*dgateAdminFSM)(nil) func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdminFSM { - return &dgateAdminFSM{cs, logger} + return &dgateAdminFSM{cs, logger, 0} } -func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool { - return !fsm.cs.Ready() && - log.Index+1 >= fsm.cs.Raft().LastIndex() && - log.Index+1 >= fsm.cs.Raft().AppliedIndex() +func (fsm *dgateAdminFSM) SetIndex(index uint64) { + fsm.index = index } -func (fsm *dgateAdminFSM) checkLast(log *raft.Log) { - rft := fsm.cs.Raft() - if !fsm.cs.Ready() && fsm.isReplay(log) { - fsm.logger.Info("FSM is not ready, setting ready", - zap.Uint64("index", log.Index), - zap.Uint64("applied-index", rft.AppliedIndex()), - zap.Uint64("last-index", rft.LastIndex()), - ) - defer func() { - if err := fsm.cs.ReloadState(false); err != nil { - fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) - } else { - fsm.cs.SetReady() - } - }() - } -} - -func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) { +func (fsm *dgateAdminFSM) applyLog(log *raft.Log, replay bool) (*spec.ChangeLog, error) { + log.Index = fsm.index switch log.Type { case raft.LogCommand: var cl spec.ChangeLog @@ -58,10 +40,8 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) { fsm.logger.Error("Change log ID is empty", zap.Error(err)) panic("change log ID is empty") } - // find a way to apply only if latest index to save time - return &cl, fsm.cs.ProcessChangeLog(&cl, false) - case raft.LogNoop: - fsm.logger.Debug("Noop Log - current leader is still leader") + // find a way to only reload if latest index to save time + return &cl, fsm.cs.ProcessChangeLog(&cl, replay) case raft.LogConfiguration: servers := raft.DecodeConfiguration(log.Data).Servers for i, server := range servers { @@ -70,11 +50,6 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) { zap.Int("index", i), ) } - case raft.LogBarrier: - err := fsm.cs.WaitForChanges() - if err != nil { - fsm.logger.Error("Error waiting for changes", zap.Error(err)) - } default: fsm.logger.Error("Unknown log type in FSM Apply") } @@ -82,42 +57,28 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) { } func (fsm *dgateAdminFSM) Apply(log *raft.Log) any { - defer fsm.checkLast(log) - _, err := fsm.applyLog(log) + _, err := fsm.applyLog(log, true) return err } func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any { - lastLog := logs[len(logs)-1] - if fsm.isReplay(lastLog) { - rft := fsm.cs.Raft() - fsm.logger.Info("applying log batch logs", - zap.Int("size", len(logs)), - zap.Uint64("current", lastLog.Index), - zap.Uint64("applied", rft.AppliedIndex()), - zap.Uint64("commit", rft.CommitIndex()), - zap.Uint64("last", rft.LastIndex()), - ) - } - cls := make([]*spec.ChangeLog, 0, len(logs)) - defer func() { - if !fsm.cs.Ready() { - fsm.checkLast(logs[len(logs)-1]) - return - } - - if err := fsm.cs.ReloadState(true, cls...); err != nil { - fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err)) - } - }() - + rft := fsm.cs.Raft() + lastIndex := len(logs) - 1 + fsm.logger.Debug("apply log batch", + zap.Uint64("applied", rft.AppliedIndex()), + zap.Uint64("commit", rft.CommitIndex()), + zap.Uint64("last", rft.LastIndex()), + zap.Uint64("fsmLastIndex", fsm.index), + zap.Uint64("log[0]", logs[0].Index), + zap.Uint64("log[-1]", logs[lastIndex].Index), + zap.Int("logs", len(logs)), + ) results := make([]any, len(logs)) for i, log := range logs { - var cl *spec.ChangeLog - cl, results[i] = fsm.applyLog(log) - if cl != nil { - cls = append(cls, cl) - } + // TODO: check to see if this can be optimized channels raft node provides + _, results[i] = fsm.applyLog( + log, lastIndex == i, + ) } return results } diff --git a/internal/admin/admin_raft.go b/internal/admin/admin_raft.go index 9aed54a..f9d126f 100644 --- a/internal/admin/admin_raft.go +++ b/internal/admin/admin_raft.go @@ -13,6 +13,7 @@ import ( "github.com/dgate-io/dgate/internal/config" "github.com/dgate-io/dgate/pkg/raftadmin" "github.com/dgate-io/dgate/pkg/rafthttp" + "github.com/dgate-io/dgate/pkg/spec" "github.com/dgate-io/dgate/pkg/storage" "github.com/dgate-io/dgate/pkg/util/logadapter" raftbadgerdb "github.com/dgate-io/raft-badger" @@ -53,23 +54,18 @@ func setupRaft( default: panic(fmt.Errorf("invalid storage type: %s", conf.Storage.StorageType)) } - raftId := adminConfig.Replication.RaftID - if raftId == "" { - raftId = conf.NodeId - } - raftConfig := adminConfig.Replication.LoadRaftConfig( &raft.Config{ ProtocolVersion: raft.ProtocolVersionMax, - LocalID: raft.ServerID(raftId), + LocalID: raft.ServerID(adminConfig.Replication.RaftID), HeartbeatTimeout: time.Second * 4, ElectionTimeout: time.Second * 5, CommitTimeout: time.Second * 4, - BatchApplyCh: true, - MaxAppendEntries: 16, + BatchApplyCh: false, + MaxAppendEntries: 512, LeaderLeaseTimeout: time.Second * 4, // TODO: Support snapshots - SnapshotInterval: time.Hour * 24, + SnapshotInterval: time.Hour*2 ^ 32, SnapshotThreshold: ^uint64(0), Logger: logadapter.NewZap2HCLogAdapter(logger), }, @@ -90,15 +86,21 @@ func setupRaft( address, http.DefaultClient, raftHttpLogger, adminConfig.Replication.AdvertScheme+"://(address)/raft", ) + fsmLogger := logger.Named("fsm") + snapstore := raft.NewInmemSnapshotStore() + fsm := newDGateAdminFSM(fsmLogger, cs) raftNode, err := raft.NewRaft( - raftConfig, newDGateAdminFSM(logger.Named("fsm"), cs), - lstore, sstore, raft.NewInmemSnapshotStore(), transport, + raftConfig, fsm, lstore, + sstore, snapstore, transport, ) if err != nil { panic(err) } - cs.SetupRaft(raftNode, raftConfig) + observerChan := make(chan raft.Observation, 10) + raftNode.RegisterObserver(raft.NewObserver(observerChan, false, nil)) + cs.SetupRaft(raftNode, observerChan) + // Setup raft handler server.Handle("/raft/*", transport) @@ -120,16 +122,27 @@ func setupRaft( }) configFuture := raftNode.GetConfiguration() - if err = configFuture.Error(); err != nil { panic(err) } serverConfig := configFuture.Configuration() + raftId := string(raftConfig.LocalID) + logger.Info("replication config", + zap.String("raft_id", raftId), + zap.Any("config", serverConfig), + zap.Int("max_append_entries", raftConfig.MaxAppendEntries), + zap.Bool("batch_chan", raftConfig.BatchApplyCh), + zap.Duration("commit_timeout", raftConfig.CommitTimeout), + zap.Int("config_proto", int(raftConfig.ProtocolVersion)), + ) - logger.Debug("Replication config", - zap.Any("config", serverConfig)) + defer cs.ProcessChangeLog(spec.NewNoopChangeLog(), false) - if adminConfig.Replication.BootstrapCluster { + if adminConfig.Replication.BootstrapCluster && len(serverConfig.Servers) == 0 { + logger.Info("bootstrapping cluster", + zap.String("id", raftId), + zap.String("advert_addr", advertAddr), + ) raftNode.BootstrapCluster(raft.Configuration{ Servers: []raft.Server{ { @@ -162,6 +175,11 @@ func setupRaft( addresses = append(addresses, fmt.Sprintf("%s:%d", addr, adminConfig.Port)) } } + logger.Info("no servers found in configuration, adding myself to cluster", + zap.String("id", raftId), + zap.String("advert_addr", advertAddr), + zap.Strings("cluster_addrs", addresses), + ) if adminConfig.Replication.ClusterAddrs != nil && len(adminConfig.Replication.ClusterAddrs) > 0 { addresses = append(addresses, adminConfig.Replication.ClusterAddrs...) diff --git a/internal/admin/admin_routes.go b/internal/admin/admin_routes.go index 871bdac..4539e60 100644 --- a/internal/admin/admin_routes.go +++ b/internal/admin/admin_routes.go @@ -3,7 +3,6 @@ package admin import ( "fmt" "log" - "net" "net/http" "strings" @@ -75,43 +74,20 @@ func configureRoutes( } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if ipList.Len() > 0 { - remoteHost, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - remoteHost = r.RemoteAddr - } - allowed, err := ipList.Contains(remoteHost) - if !allowed && adminConfig.XForwardedForDepth > 0 { - xForwardedForIps := r.Header.Values("X-Forwarded-For") - count := min(adminConfig.XForwardedForDepth, len(xForwardedForIps)) - for i := 0; i < count; i++ { - allowed, err = ipList.Contains(xForwardedForIps[i]) - if err != nil { - logger.Error("error checking x-forwarded-for ip", - zap.Error(err), - ) - if conf.Debug { - http.Error(w, "Bad Request: could not parse x-forwarded-for IP address", http.StatusBadRequest) - } - http.Error(w, "Bad Request", http.StatusBadRequest) - return - } - if allowed { - break - } - } - } - + remoteIp := util.GetTrustedIP(r, + conf.AdminConfig.XForwardedForDepth) + allowed, err := ipList.Contains(remoteIp) if err != nil { if conf.Debug { http.Error(w, err.Error(), http.StatusInternalServerError) return } - http.Error(w, "Internal Server Error", http.StatusInternalServerError) + http.Error(w, "could not parse X-Forwarded-For IP", http.StatusBadRequest) return } if !allowed { if conf.Debug { - http.Error(w, "Unauthorized IP Address: "+remoteHost, http.StatusUnauthorized) + http.Error(w, "Unauthorized IP Address: "+remoteIp, http.StatusUnauthorized) return } http.Error(w, "Unauthorized", http.StatusUnauthorized) @@ -138,24 +114,26 @@ func configureRoutes( } else if adminConfig.KeyAuth.HeaderName != "" { key = r.Header.Get(adminConfig.KeyAuth.HeaderName) } else { - key = r.Header.Get("X-API-Key") + key = r.Header.Get("X-DGate-Key") } if _, keyFound := keyMap[key]; !keyFound { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } } - raftInstance := cs.Raft() - if r.Method == http.MethodPut && raftInstance != nil { - leader := raftInstance.Leader() - if leader == "" { - util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") - return - } - if raftInstance.State() != raft.Leader { - r.URL.Host = string(leader) - http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) - return + if raftInstance := cs.Raft(); raftInstance != nil { + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + leader := raftInstance.Leader() + if leader == "" { + // TODO: add a way to wait for a leader with a timeout + util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") + return + } + if raftInstance.State() != raft.Leader { + r.URL.Host = string(leader) + http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) + return + } } } @@ -165,10 +143,14 @@ func configureRoutes( server.Get("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") - w.Header().Set("X-DGate-Raft", fmt.Sprintf("%t", cs.Raft() != nil)) w.Header().Set("X-DGate-WatchOnly", fmt.Sprintf("%t", adminConfig.WatchOnly)) w.Header().Set("X-DGate-ChangeHash", fmt.Sprintf("%d", cs.ChangeHash())) - w.Header().Set("X-DGate-AdminAPI", "true") + if raftInstance := cs.Raft(); raftInstance != nil { + w.Header().Set( + "X-DGate-Raft-State", + raftInstance.State().String(), + ) + } w.WriteHeader(http.StatusOK) w.Write([]byte("DGate Admin API")) })) diff --git a/internal/admin/changestate/change_state.go b/internal/admin/changestate/change_state.go index f5bcc5f..8095fab 100644 --- a/internal/admin/changestate/change_state.go +++ b/internal/admin/changestate/change_state.go @@ -17,10 +17,9 @@ type ChangeState interface { // Readiness Ready() bool - SetReady() // Replication - SetupRaft(*raft.Raft, *raft.Config) + SetupRaft(*raft.Raft, chan raft.Observation) Raft() *raft.Raft // Resources diff --git a/internal/admin/changestate/testutil/change_state.go b/internal/admin/changestate/testutil/change_state.go index ea1626f..8ecc33b 100644 --- a/internal/admin/changestate/testutil/change_state.go +++ b/internal/admin/changestate/testutil/change_state.go @@ -65,13 +65,8 @@ func (m *MockChangeState) ReloadState(a bool, cls ...*spec.ChangeLog) error { return m.Called(a, cls).Error(0) } -// SetReady implements changestate.ChangeState. -func (m *MockChangeState) SetReady() { - m.Called() -} - // SetupRaft implements changestate.ChangeState. -func (m *MockChangeState) SetupRaft(*raft.Raft, *raft.Config) { +func (m *MockChangeState) SetupRaft(*raft.Raft, chan raft.Observation) { m.Called().Error(0) } diff --git a/internal/admin/routes/collection_routes.go b/internal/admin/routes/collection_routes.go index 255433d..5e449fc 100644 --- a/internal/admin/routes/collection_routes.go +++ b/internal/admin/routes/collection_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -71,8 +70,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } @@ -279,8 +277,7 @@ func ConfigureCollectionAPI(server chi.Router, logger *zap.Logger, cs changestat } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/domain_routes.go b/internal/admin/routes/domain_routes.go index 06879c3..cb5fc72 100644 --- a/internal/admin/routes/domain_routes.go +++ b/internal/admin/routes/domain_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -49,8 +48,7 @@ func ConfigureDomainAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/misc_routes.go b/internal/admin/routes/misc_routes.go index 4b236ce..7ed280f 100644 --- a/internal/admin/routes/misc_routes.go +++ b/internal/admin/routes/misc_routes.go @@ -3,7 +3,6 @@ package routes import ( "encoding/json" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -14,8 +13,7 @@ import ( func ConfigureChangeLogAPI(server chi.Router, cs changestate.ChangeState, appConfig *config.DGateConfig) { server.Get("/changelog/hash", func(w http.ResponseWriter, r *http.Request) { if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/module_routes.go b/internal/admin/routes/module_routes.go index 75bb1fa..62c6b35 100644 --- a/internal/admin/routes/module_routes.go +++ b/internal/admin/routes/module_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -49,8 +48,7 @@ func ConfigureModuleAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch return } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/namespace_routes.go b/internal/admin/routes/namespace_routes.go index 0cc0563..63d4783 100644 --- a/internal/admin/routes/namespace_routes.go +++ b/internal/admin/routes/namespace_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -43,8 +42,7 @@ func ConfigureNamespaceAPI(server chi.Router, logger *zap.Logger, cs changestate } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/route_routes.go b/internal/admin/routes/route_routes.go index d60813f..f366b73 100644 --- a/internal/admin/routes/route_routes.go +++ b/internal/admin/routes/route_routes.go @@ -4,7 +4,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -51,14 +50,14 @@ func ConfigureRouteAPI(server chi.Router, logger *zap.Logger, cs changestate.Cha } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } } - util.JsonResponse(w, http.StatusCreated, spec.TransformDGateRoutes(rm.GetRoutesByNamespace(route.NamespaceName)...)) + util.JsonResponse(w, http.StatusCreated, spec.TransformDGateRoutes( + rm.GetRoutesByNamespace(route.NamespaceName)...)) }) server.Delete("/route", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/admin/routes/secret_routes.go b/internal/admin/routes/secret_routes.go index 56156f4..a6c004e 100644 --- a/internal/admin/routes/secret_routes.go +++ b/internal/admin/routes/secret_routes.go @@ -5,7 +5,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -49,8 +48,7 @@ func ConfigureSecretAPI(server chi.Router, logger *zap.Logger, cs changestate.Ch return } if repl := cs.Raft(); repl != nil { - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/admin/routes/service_routes.go b/internal/admin/routes/service_routes.go index 082ec8f..4216291 100644 --- a/internal/admin/routes/service_routes.go +++ b/internal/admin/routes/service_routes.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net/http" - "time" "github.com/dgate-io/chi-router" "github.com/dgate-io/dgate/internal/admin/changestate" @@ -67,8 +66,7 @@ func ConfigureServiceAPI(server chi.Router, logger *zap.Logger, cs changestate.C if repl := cs.Raft(); repl != nil { logger.Debug("Waiting for raft barrier") - future := repl.Barrier(time.Second * 5) - if err := future.Error(); err != nil { + if err := cs.WaitForChanges(); err != nil { util.JsonError(w, http.StatusInternalServerError, err.Error()) return } diff --git a/internal/config/config.go b/internal/config/config.go index 1dcd90e..cf1c0c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,19 +39,22 @@ type ( } DGateProxyConfig struct { - Host string `koanf:"host"` - Port int `koanf:"port"` - TLS *DGateTLSConfig `koanf:"tls"` - EnableH2C bool `koanf:"enable_h2c"` - EnableHTTP2 bool `koanf:"enable_http2"` - EnableConsoleLogger bool `koanf:"enable_console_logger"` - RedirectHttpsDomains []string `koanf:"redirect_https"` - AllowedDomains []string `koanf:"allowed_domains"` - GlobalHeaders map[string]string `koanf:"global_headers"` - Transport DGateHttpTransportConfig `koanf:"client_transport"` + Host string `koanf:"host"` + Port int `koanf:"port"` + TLS *DGateTLSConfig `koanf:"tls"` + EnableH2C bool `koanf:"enable_h2c"` + EnableHTTP2 bool `koanf:"enable_http2"` + EnableConsoleLogger bool `koanf:"enable_console_logger"` + RedirectHttpsDomains []string `koanf:"redirect_https"` + AllowedDomains []string `koanf:"allowed_domains"` + GlobalHeaders map[string]string `koanf:"global_headers"` + Transport DGateHttpTransportConfig `koanf:"client_transport"` + DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + StrictMode bool `koanf:"strict_mode"` + XForwardedForDepth int `koanf:"x_forwarded_for_depth"` + // WARN: debug use only - InitResources *DGateResources `koanf:"init_resources"` - DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + InitResources *DGateResources `koanf:"init_resources"` } DGateTestServerConfig struct { diff --git a/internal/config/loader.go b/internal/config/loader.go index f7928e8..533f22c 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -95,7 +95,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { shell := "/bin/sh" if shellEnv := os.Getenv("SHELL"); shellEnv != "" { shell = shellEnv - } + } resolveConfigStringPattern(data, CommandRegex, func(value string, results map[string]string) (string, error) { cmdResult, err := exec.CommandContext( ctx, shell, "-c", results["cmd"]).Output() @@ -182,6 +182,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { } if k.Exists("admin") { kDefault(k, "admin.host", "127.0.0.1") + kDefault(k, "admin.x_forwarded_for_depth", -1) err = kRequireAll(k, "admin.port") if err != nil { return nil, err @@ -192,6 +193,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { } if k.Exists("admin.replication") { + kDefault(k, "admin.replication.raft_id", k.Get("node_id")) err = kRequireAll(k, "admin.host") if err != nil { return nil, err diff --git a/internal/proxy/change_log.go b/internal/proxy/change_log.go index 5a2ab94..9ec21e8 100644 --- a/internal/proxy/change_log.go +++ b/internal/proxy/change_log.go @@ -2,6 +2,7 @@ package proxy import ( "fmt" + "strconv" "time" "errors" @@ -15,11 +16,19 @@ import ( // processChangeLog - processes a change log and applies the change to the proxy state func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (err error) { - if cl == nil { - cl = &spec.ChangeLog{ - Cmd: spec.NoopCommand, + defer func() { + ps.raftReady.Store(true) + }() + if !cl.Cmd.IsNoop() { + if len(ps.changeLogs) > 0 { + xcl := ps.changeLogs[len(ps.changeLogs)-1] + if xcl.ID == cl.ID { + ps.logger.Debug("duplicate change log", zap.String("id", cl.ID)) + return nil + } } - } else if !cl.Cmd.IsNoop() { + strconv.FormatInt(time.Now().UnixNano(), 36) + ps.changeLogs = append(ps.changeLogs, cl) switch cl.Cmd.Resource() { case spec.Namespaces: var item spec.Namespace @@ -88,8 +97,7 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( if reload { if cl.Cmd.IsNoop() || cl.Cmd.Resource().IsRelatedTo(spec.Routes) { ps.logger.Debug("Registering change log", zap.Stringer("cmd", cl.Cmd)) - err = ps.reconfigureState(false, cl) - if err != nil { + if err = ps.reconfigureState(false); err != nil { ps.logger.Error("Error registering change log", zap.Error(err)) return } @@ -108,9 +116,12 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( } if store { if err = ps.store.StoreChangeLog(cl); err != nil { - // TODO: find a way to revert the change and reload the state - // TODO: OR add flag in config to ignore storage errors - ps.logger.Error("Error storing change log", zap.Error(err)) + ps.logger.Error("Error storing change log, restarting state", zap.Error(err)) + ps.restartState(func(err error) { + if err != nil { + go ps.Stop() + } + }) return } } @@ -254,23 +265,7 @@ func (ps *ProxyState) processSecret(scrt *spec.Secret, cl *spec.ChangeLog) (err return err } -// applyChange - apply a change to the proxy state, returns a channel that will receive an error when the state has been updated -func (ps *ProxyState) applyChange(changeLog *spec.ChangeLog) <-chan error { - done := make(chan error, 1) - if changeLog == nil { - changeLog = &spec.ChangeLog{ - Cmd: spec.NoopCommand, - } - } - changeLog.SetErrorChan(done) - if err := ps.processChangeLog(changeLog, true, true); err != nil { - done <- err - } - return done -} - func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { - // restore state change logs logs, err := ps.store.FetchChangeLogs() if err != nil { if err == badger.ErrKeyNotFound { @@ -296,11 +291,12 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { } } if !directApply { - if err = ps.processChangeLog(nil, true, false); err != nil { + cl := spec.NewNoopChangeLog() + if err = ps.processChangeLog(cl, true, false); err != nil { return err } } else { - if err = ps.reconfigureState(false, nil); err != nil { + if err = ps.reconfigureState(false); err != nil { return nil } } diff --git a/internal/proxy/dynamic_proxy.go b/internal/proxy/dynamic_proxy.go index 8b6bcb0..04d67d2 100644 --- a/internal/proxy/dynamic_proxy.go +++ b/internal/proxy/dynamic_proxy.go @@ -17,7 +17,7 @@ import ( "golang.org/x/net/http2/h2c" ) -func (ps *ProxyState) reconfigureState(init bool, log *spec.ChangeLog) (err error) { +func (ps *ProxyState) reconfigureState(init bool) (err error) { defer func() { if err != nil { ps.restartState(func(err error) { @@ -27,7 +27,6 @@ func (ps *ProxyState) reconfigureState(init bool, log *spec.ChangeLog) (err erro } }) } - log.PushError(err) }() ps.proxyLock.Lock() @@ -297,19 +296,24 @@ func (ps *ProxyState) Start() (err error) { func (ps *ProxyState) Stop() { go func() { - <-time.After(10 * time.Second) - defer os.Exit(1) + defer os.Exit(3) + <-time.After(5 * time.Second) ps.logger.Error("Failed to stop proxy server") }() ps.logger.Info("Stopping proxy server") - ps.proxyLock.Lock() - defer ps.proxyLock.Unlock() defer os.Exit(0) defer ps.Logger().Sync() - if raftNode := ps.Raft(); raftNode != nil { - raftNode.Shutdown().Error() + ps.proxyLock.Lock() + raftNode := ps.Raft() + ps.proxyLock.Unlock() + + if raftNode != nil { + ps.logger.Info("Stopping Raft node") + if err := raftNode.Shutdown().Error(); err != nil { + ps.logger.Error("Error stopping Raft node", zap.Error(err)) + } } } diff --git a/internal/proxy/proxy_handler.go b/internal/proxy/proxy_handler.go index b6bb6fa..8c97626 100644 --- a/internal/proxy/proxy_handler.go +++ b/internal/proxy/proxy_handler.go @@ -25,12 +25,20 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) { With( zap.String("route", reqCtx.route.Name), zap.String("namespace", reqCtx.route.Namespace.Name), + zap.String("path", reqCtx.req.URL.Path), + zap.String("method", reqCtx.req.Method), + zap.String("query", reqCtx.req.URL.RawQuery), + zap.String("protocol", reqCtx.req.Proto), + zap.String("remote_address", reqCtx.req.RemoteAddr), + zap.String("user_agent", reqCtx.req.UserAgent()), + zap.Int64("content_length", reqCtx.req.ContentLength), + zap.String("content_type", reqCtx.req.Header.Get("Content-Type")), ) if reqCtx.route.Service != nil { event = event.With(zap.String("service", reqCtx.route.Service.Name)) } - event.Debug("Request Log") + event.Info("Request log") }() defer ps.metrics.MeasureProxyRequest(reqCtx, time.Now()) @@ -156,7 +164,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt }). ErrorHandler(func(w http.ResponseWriter, r *http.Request, reqErr error) { upstreamErr = reqErr - ps.logger.Debug("Error proxying request", + ps.logger.Error("Error proxying request", zap.String("error", reqErr.Error()), zap.String("route", reqCtx.route.Name), zap.String("service", reqCtx.route.Service.Name), @@ -185,7 +193,13 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt } } if !reqCtx.rw.HeadersSent() && reqCtx.rw.BytesWritten() == 0 { - util.WriteStatusCodeError(reqCtx.rw, http.StatusInternalServerError) + ps.logger.Error("Writing error response", + zap.String("error", reqErr.Error()), + zap.String("route", reqCtx.route.Name), + zap.String("service", reqCtx.route.Service.Name), + zap.String("namespace", reqCtx.route.Namespace.Name), + ) + util.WriteStatusCodeError(reqCtx.rw, http.StatusBadGateway) } }) diff --git a/internal/proxy/proxy_replication.go b/internal/proxy/proxy_replication.go index 782084e..76f96e6 100644 --- a/internal/proxy/proxy_replication.go +++ b/internal/proxy/proxy_replication.go @@ -3,13 +3,11 @@ package proxy import "github.com/hashicorp/raft" type ProxyReplication struct { - raft *raft.Raft - raftConfig *raft.Config + raft *raft.Raft } -func NewProxyReplication(raft *raft.Raft, raftConfig *raft.Config) *ProxyReplication { +func NewProxyReplication(raft *raft.Raft) *ProxyReplication { return &ProxyReplication{ - raft: raft, - raftConfig: raftConfig, + raft: raft, } } diff --git a/internal/proxy/proxy_state.go b/internal/proxy/proxy_state.go index 383055a..5ca4df3 100644 --- a/internal/proxy/proxy_state.go +++ b/internal/proxy/proxy_state.go @@ -43,6 +43,7 @@ type ProxyState struct { store *proxystore.ProxyStore proxyLock *sync.RWMutex changeHash uint32 + changeLogs []*spec.ChangeLog metrics *ProxyMetrics sharedCache cache.TCache @@ -167,15 +168,6 @@ func (ps *ProxyState) ChangeHash() uint32 { return ps.changeHash } -func (ps *ProxyState) SetReady() { - if ps.replicationEnabled && !ps.raftReady.Load() { - ps.logger.Info("Replication status is now ready after " + - time.Since(ps.startTime).String()) - ps.raftReady.Store(true) - return - } -} - func (ps *ProxyState) Ready() bool { if ps.replicationEnabled { return ps.raftReady.Load() @@ -190,16 +182,46 @@ func (ps *ProxyState) Raft() *raft.Raft { return nil } -func (ps *ProxyState) SetupRaft(r *raft.Raft, rc *raft.Config) { +func (ps *ProxyState) SetupRaft(r *raft.Raft, oc chan raft.Observation) { ps.proxyLock.Lock() defer ps.proxyLock.Unlock() - ps.replicationSettings = NewProxyReplication(r, rc) + go func() { + for obs := range oc { + switch raftObs := obs.Data.(type) { + case raft.PeerObservation: + ps.logger.Info("peer observation", + zap.Stringer("suffrage", raftObs.Peer.Suffrage), + zap.String("address", string(raftObs.Peer.Address)), + zap.String("id", string(raftObs.Peer.ID)), + ) + case raft.LeaderObservation: + ps.logger.Info("leader observation", + zap.String("leader_addr", string(raftObs.LeaderAddr)), + zap.String("leader_id", string(raftObs.LeaderID)), + ) + case raft.RequestVoteRequest: + ps.logger.Info("request vote request", + zap.String("candidate_id", string(raftObs.GetRPCHeader().ID)), + zap.String("candidate_addr", string(raftObs.GetRPCHeader().Addr)), + zap.Uint64("term", raftObs.Term), + zap.Uint64("last-log-index", raftObs.LastLogIndex), + zap.Uint64("last-log-term", raftObs.LastLogTerm), + ) + } + } + + }() + + ps.replicationSettings = NewProxyReplication(r) } func (ps *ProxyState) WaitForChanges() error { ps.proxyLock.RLock() defer ps.proxyLock.RUnlock() - return <-ps.applyChange(nil) + if rft := ps.Raft(); rft != nil { + return rft.Barrier(time.Second * 5).Error() + } + return nil } func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { @@ -218,6 +240,10 @@ func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { raftLog := raft.Log{ Data: encodedCL, } + err = ps.ProcessChangeLog(log, true) + if err != nil { + return err + } future := r.ApplyLog(raftLog, time.Second*15) ps.logger.With(). Debug("waiting for reply from raft", @@ -289,7 +315,7 @@ func (ps *ProxyState) ReloadState(check bool, logs ...*spec.ChangeLog) error { } } if reload { - <-ps.applyChange(nil) + return ps.processChangeLog(nil, true, false) } return nil } @@ -298,8 +324,9 @@ func (ps *ProxyState) ProcessChangeLog(log *spec.ChangeLog, reload bool) error { err := ps.processChangeLog(log, reload, !ps.replicationEnabled) if err != nil { ps.logger.Error("processing error", zap.Error(err)) + return err } - return err + return nil } func (ps *ProxyState) DynamicTLSConfig(certFile, keyFile string) *tls.Config { @@ -397,7 +424,11 @@ func (ps *ProxyState) initConfigResources(resources *config.DGateResources) erro return err } if numChanges > 0 { - defer ps.processChangeLog(nil, false, false) + defer func() { + if err != nil { + err = ps.processChangeLog(nil, false, false) + } + }() } ps.logger.Info("Initializing resources") for _, ns := range resources.Namespaces { @@ -572,19 +603,30 @@ func (ps *ProxyState) ServeHTTP(w http.ResponseWriter, r *http.Request) { if router, ok := ps.routers.Find(ns.Name); ok { router.ServeHTTP(w, r) } else { - ps.logger.Debug("No router found for namespace", - zap.String("namespace", ns.Name), - ) util.WriteStatusCodeError(w, http.StatusNotFound) } } else { + if ps.config.ProxyConfig.StrictMode { + closeConnection(w) + return + } + trustedIp := util.GetTrustedIP(r, ps.config.ProxyConfig.XForwardedForDepth) ps.logger.Debug("No namespace found for request", zap.String("protocol", r.Proto), zap.String("host", r.Host), zap.String("path", r.URL.Path), zap.Bool("secure", r.TLS != nil), - zap.String("remote_addr", r.RemoteAddr), + zap.String("remote_addr", trustedIp), ) util.WriteStatusCodeError(w, http.StatusNotFound) } } + +func closeConnection(w http.ResponseWriter) { + if loot, ok := w.(http.Hijacker); ok { + if conn, _, err := loot.Hijack(); err == nil { + defer conn.Close() + return + } + } +} diff --git a/pkg/dgclient/common.go b/pkg/dgclient/common.go index bfe9b1f..681216d 100644 --- a/pkg/dgclient/common.go +++ b/pkg/dgclient/common.go @@ -150,5 +150,5 @@ func parseApiError(body io.Reader, wrapErr error) error { if err := json.NewDecoder(body).Decode(&apiError); err != nil || apiError.Error == "" { return wrapErr } - return fmt.Errorf("%d: %s", wrapErr, apiError.Error) + return fmt.Errorf("%s: %s", wrapErr, apiError.Error) } diff --git a/pkg/rafthttp/rafthttp.go b/pkg/rafthttp/rafthttp.go index d01ea35..7214a09 100644 --- a/pkg/rafthttp/rafthttp.go +++ b/pkg/rafthttp/rafthttp.go @@ -297,6 +297,7 @@ func (t *HTTPTransport) ServeHTTP(res http.ResponseWriter, req *http.Request) { // SetHeartbeatHandler implements the raft.Transport interface. func (t *HTTPTransport) SetHeartbeatHandler(cb func(rpc raft.RPC)) { // Not supported + } // TimeoutNow implements the raft.Transport interface. diff --git a/pkg/spec/change_log.go b/pkg/spec/change_log.go index 90d3d26..d16899b 100644 --- a/pkg/spec/change_log.go +++ b/pkg/spec/change_log.go @@ -13,7 +13,14 @@ type ChangeLog struct { Namespace string `json:"namespace"` Item any `json:"item"` Version int `json:"version"` - errChan chan error +} + +func NewNoopChangeLog() *ChangeLog { + return &ChangeLog{ + Version: 1, + ID: strconv.FormatInt(time.Now().UnixNano(), 36), + Cmd: NoopCommand, + } } func NewChangeLog(item Named, namespace string, cmd Command) *ChangeLog { @@ -36,19 +43,6 @@ func NewChangeLog(item Named, namespace string, cmd Command) *ChangeLog { } } -func (cl *ChangeLog) SetErrorChan(errChan chan error) { - cl.errChan = errChan -} - -func (cl *ChangeLog) PushError(err error) { - if cl == nil { - return - } - if cl.errChan != nil { - cl.errChan <- err - } -} - type Command string type Action string diff --git a/pkg/spec/response_writer_tracker.go b/pkg/spec/response_writer_tracker.go index 26dcb08..6a872ee 100644 --- a/pkg/spec/response_writer_tracker.go +++ b/pkg/spec/response_writer_tracker.go @@ -1,6 +1,8 @@ package spec import ( + "bufio" + "net" "net/http" ) @@ -17,6 +19,7 @@ type rwTracker struct { bytesWritten int64 } +var _ http.Hijacker = (*rwTracker)(nil) var _ ResponseWriterTracker = (*rwTracker)(nil) func NewResponseWriterTracker(rw http.ResponseWriter) ResponseWriterTracker { @@ -61,3 +64,11 @@ func (t *rwTracker) HeadersSent() bool { func (t *rwTracker) BytesWritten() int64 { return t.bytesWritten } + +func (t *rwTracker) Hijack() (net.Conn, *bufio.ReadWriter, error) { + hijacker, ok := t.rw.(http.Hijacker) + if !ok { + return nil, nil, http.ErrNotSupported + } + return hijacker.Hijack() +} diff --git a/pkg/util/http.go b/pkg/util/http.go index 2f8e007..741f6af 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -1,7 +1,7 @@ package util import ( - "fmt" + "net" "net/http" ) @@ -9,7 +9,20 @@ func WriteStatusCodeError(w http.ResponseWriter, code int) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") w.WriteHeader(code) - w.Write([]byte( - fmt.Sprintf("DGate: %d %s", code, http.StatusText(code)), - )) +} + +// GetTrustedIP returns the trusted IP address of the client. It checks the +// X-Forwarded-For header first, and falls back to the RemoteAddr field of the +// request if the header is not present. depth is the number of proxies that +// the request has passed through. +func GetTrustedIP(r *http.Request, depth int) string { + ips := r.Header.Values("X-Forwarded-For") + if len(ips) == 0 || depth > len(ips) { + remoteHost, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return r.RemoteAddr + } + return remoteHost + } + return ips[len(ips)-depth] }