diff --git a/.github/workflows/discord.yml b/.github/workflows/discord.yml new file mode 100644 index 0000000..a9fff71 --- /dev/null +++ b/.github/workflows/discord.yml @@ -0,0 +1,21 @@ +on: + release: + types: [published] + +jobs: + github-releases-to-discord: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Github Releases To Discord + uses: SethCohen/github-releases-to-discord@v1.13.1 + with: + webhook_url: ${{ secrets.DISCORD_WEBHOOK_URL }} + color: "2105893" + username: "Release Changelog" + avatar_url: "https://github.com/dgate-io.png" + content: "||@everyone||" + footer_title: "Changelog" + footer_icon_url: "https://cdn.discordapp.com/avatars/487431320314576937/bd64361e4ba6313d561d54e78c9e7171.png" + footer_timestamp: true \ No newline at end of file diff --git a/TODO.md b/TODO.md index 00ff125..5bb5d29 100644 --- a/TODO.md +++ b/TODO.md @@ -7,7 +7,6 @@ - server management (start-proxy, stop-proxy, restart, status, logs, stats, etc.) - cluster management (raft commands, replica commands, etc.) (low priority) - other commands (backup, restore, etc.) (low priority) - - replace k6 with wrk for performance tests ## Add Module Tests @@ -16,9 +15,6 @@ - [ ] - Add option to specify export variables when ambiguous (?) - [ ] - check how global variable conflicts are handled -## Start using Pkl - -replace dgate server config with pkl. ## dgate-cli declaritive config @@ -70,10 +66,6 @@ expose metrics for the following: - Add Transactions - [ ] - Add transactional support for admin API -## DGate Documentation (dgate.io/docs) - -Use Docusaurus to create the documentation for DGate. - ## DGate Admin Console (low priority) Admin Console is a web-based interface that can be used to manage the state of the cluster. Manage resource, view logs, stats, and more. It can also be used to develop and test modules directly in the browser. @@ -136,14 +128,6 @@ A good example of a bundle would be a bundle that adds support for OAuth2 authen Differing from common resource versioning, modules can have multiple versions that can be used at the same time. This can be used to test new versions of modules before deploying them to the cluster. -## DGate CLI - argument variable suggestions - -For example, if the user types an argument that is not recognized, the CLI can suggest the correct argument by search the available arguments and finding the closest match. -``` -dgate-cli ns mk my-ns nmae=my-ns -Variable 'nmae' is not recognized. Did you mean 'name'? -``` - ## DGate CLI - help command show required variables When the user runs the help command, the CLI should show the required variables for the command. For example, if the user runs `dgate-cli ns mk --help`, the CLI should show the required variables for the `ns mk` command. `name` is a required variable for the `ns mk` command. Also, the CLI should show non-required variables. @@ -159,4 +143,8 @@ Add stack tracing for typescript modules. Currently, Raft Implementation is tightly coupled with the Admin API. This makes it difficult to change the Raft Implementation without changing the Admin API. Decouple the Raft Implementation from the Admin API to make it easier to change the Raft Implementation. -## Add Telemetry (sentry, datadog, etc.) \ No newline at end of file +## Add Telemetry (sentry, datadog, etc.) + +## ResourceManager callback for resource changes + +Add a callback to the ResourceManager that is called when a resource is changed. This can be used to invalidate caches, update modules, and more. \ No newline at end of file diff --git a/functional-tests/raft_tests/raft_test.sh b/functional-tests/raft_tests/raft_test.sh index 4882466..00afa47 100755 --- a/functional-tests/raft_tests/raft_test.sh +++ b/functional-tests/raft_tests/raft_test.sh @@ -34,7 +34,7 @@ dgate-cli -f domain create name=dm-$id \ dgate-cli -f service create \ name=svc-$id namespace=ns-$id \ - urls="http://localhost:8888/$RANDOM" + urls="http://localhost:8081/$RANDOM" dgate-cli -f route create \ name=rt-$id \ diff --git a/functional-tests/raft_tests/test1.yaml b/functional-tests/raft_tests/test1.yaml index 5c18c1c..a3160d2 100644 --- a/functional-tests/raft_tests/test1.yaml +++ b/functional-tests/raft_tests/test1.yaml @@ -1,5 +1,6 @@ version: v1 -log_level: info +debug: true +log_level: debug tags: - "dev" diff --git a/go.mod b/go.mod index 3b9198e..bb0b98d 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,8 @@ require ( github.com/dop251/goja v0.0.0-20240220182346-e401ed450204 github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c github.com/google/uuid v1.3.1 - github.com/hashicorp/go-hclog v1.6.2 - github.com/hashicorp/raft v1.6.0 + github.com/hashicorp/go-hclog v1.6.3 + github.com/hashicorp/raft v1.7.0 github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/toml v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 @@ -46,7 +46,7 @@ require ( github.com/dlclark/regexp2 v1.10.0 // indirect github.com/dop251/base64dec v0.0.0-20231022112746-c6c9f9a96217 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/fatih/color v1.14.1 // indirect + github.com/fatih/color v1.17.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -59,12 +59,12 @@ require ( github.com/google/flatbuffers v1.12.1 // indirect github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect - github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect - github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect + github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pelletier/go-toml v1.9.5 // indirect @@ -82,7 +82,7 @@ require ( go.opentelemetry.io/otel/trace v1.26.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/sys v0.19.0 // indirect + golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 3c6b16a..bd76858 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= +github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= +github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -125,19 +127,27 @@ github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I= github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I= github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4= +github.com/hashicorp/go-msgpack/v2 v2.1.2 h1:4Ee8FTp834e+ewB71RDrQ0VKpyFdrKOjvYtnQ/ltVj0= +github.com/hashicorp/go-msgpack/v2 v2.1.2/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= +github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/raft v1.6.0 h1:tkIAORZy2GbJ2Trp5eUSggLXDPOJLXC+JJLNMMqtgtM= github.com/hashicorp/raft v1.6.0/go.mod h1:Xil5pDgeGwRWuX4uPUmwa+7Vagg4N804dz6mhNi6S7o= +github.com/hashicorp/raft v1.7.0 h1:4u24Qn6lQ6uwziM++UgsyiT64Q8GyRn43CV41qPiz1o= +github.com/hashicorp/raft v1.7.0/go.mod h1:N1sKh6Vn47mrWvEArQgILTyng8GoDRNYlgKyK7PMjs0= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -182,6 +192,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= @@ -343,6 +355,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= diff --git a/internal/admin/admin_fsm.go b/internal/admin/admin_fsm.go index 5632be1..8f22399 100644 --- a/internal/admin/admin_fsm.go +++ b/internal/admin/admin_fsm.go @@ -21,27 +21,17 @@ func newDGateAdminFSM(logger *zap.Logger, cs changestate.ChangeState) *dgateAdmi return &dgateAdminFSM{cs, logger} } -func (fsm *dgateAdminFSM) isReplay(log *raft.Log) bool { - return !fsm.cs.Ready() && - log.Index+1 >= fsm.cs.Raft().LastIndex() && - log.Index+1 >= fsm.cs.Raft().AppliedIndex() +func (fsm *dgateAdminFSM) isLatestLog(log *raft.Log) bool { + rft := fsm.cs.Raft() + return log.Index == rft.CommitIndex() || + log.Index+1 == rft.CommitIndex() } -func (fsm *dgateAdminFSM) checkLast(log *raft.Log) { - rft := fsm.cs.Raft() - if !fsm.cs.Ready() && fsm.isReplay(log) { - fsm.logger.Info("FSM is not ready, setting ready", - zap.Uint64("index", log.Index), - zap.Uint64("applied-index", rft.AppliedIndex()), - zap.Uint64("last-index", rft.LastIndex()), - ) - defer func() { - if err := fsm.cs.ReloadState(false); err != nil { - fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) - } else { - fsm.cs.SetReady() - } - }() +func (fsm *dgateAdminFSM) reload(cls ...*spec.ChangeLog) { + if err := fsm.cs.ReloadState(false, cls...); err != nil { + fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) + } else { + fsm.cs.SetReady() } } @@ -82,43 +72,56 @@ func (fsm *dgateAdminFSM) applyLog(log *raft.Log) (*spec.ChangeLog, error) { } func (fsm *dgateAdminFSM) Apply(log *raft.Log) any { - defer fsm.checkLast(log) - _, err := fsm.applyLog(log) + rft := fsm.cs.Raft() + fsm.logger.Debug("applying log", + zap.Uint64("current", log.Index), + zap.Uint64("applied", rft.AppliedIndex()), + zap.Uint64("commit", rft.CommitIndex()), + zap.Uint64("last", rft.LastIndex()), + ) + cl, err := fsm.applyLog(log) + if err != nil && !fsm.cs.Ready() { + fsm.reload(cl) + } else { + fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) + } return err } func (fsm *dgateAdminFSM) ApplyBatch(logs []*raft.Log) []any { - lastLog := logs[len(logs)-1] - if fsm.isReplay(lastLog) { - rft := fsm.cs.Raft() - fsm.logger.Info("applying log batch logs", - zap.Int("size", len(logs)), - zap.Uint64("current", lastLog.Index), - zap.Uint64("applied", rft.AppliedIndex()), - zap.Uint64("commit", rft.CommitIndex()), - zap.Uint64("last", rft.LastIndex()), - ) + if len(logs) == 0 || logs == nil { + fsm.logger.Warn("No logs to apply in ApplyBatch") + return nil } - cls := make([]*spec.ChangeLog, 0, len(logs)) - defer func() { - if !fsm.cs.Ready() { - fsm.checkLast(logs[len(logs)-1]) - return - } - - if err := fsm.cs.ReloadState(true, cls...); err != nil { - fsm.logger.Error("Error reloading state @ FSM ApplyBatch", zap.Error(err)) - } - }() + lastLog := logs[len(logs)-1] + rft := fsm.cs.Raft() + fsm.logger.Info("applying batch logs", + zap.Int("size", len(logs)), + zap.Uint64("current", lastLog.Index), + zap.Uint64("applied", rft.AppliedIndex()), + zap.Uint64("commit", rft.CommitIndex()), + zap.Uint64("last", rft.LastIndex()), + ) + cls := make([]*spec.ChangeLog, 0, len(logs)) results := make([]any, len(logs)) for i, log := range logs { - var cl *spec.ChangeLog - cl, results[i] = fsm.applyLog(log) - if cl != nil { + var ( + cl *spec.ChangeLog + err error + ) + if cl, err = fsm.applyLog(log); err != nil { + results[i] = err + fsm.logger.Error("Error processing change log in FSM", zap.Error(err)) + } else { cls = append(cls, cl) } } + + if fsm.cs.Ready() || fsm.isLatestLog(lastLog) { + fsm.reload(cls...) + } + return results } diff --git a/internal/admin/admin_routes.go b/internal/admin/admin_routes.go index 871bdac..2f51cce 100644 --- a/internal/admin/admin_routes.go +++ b/internal/admin/admin_routes.go @@ -3,7 +3,6 @@ package admin import ( "fmt" "log" - "net" "net/http" "strings" @@ -75,43 +74,20 @@ func configureRoutes( } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if ipList.Len() > 0 { - remoteHost, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - remoteHost = r.RemoteAddr - } - allowed, err := ipList.Contains(remoteHost) - if !allowed && adminConfig.XForwardedForDepth > 0 { - xForwardedForIps := r.Header.Values("X-Forwarded-For") - count := min(adminConfig.XForwardedForDepth, len(xForwardedForIps)) - for i := 0; i < count; i++ { - allowed, err = ipList.Contains(xForwardedForIps[i]) - if err != nil { - logger.Error("error checking x-forwarded-for ip", - zap.Error(err), - ) - if conf.Debug { - http.Error(w, "Bad Request: could not parse x-forwarded-for IP address", http.StatusBadRequest) - } - http.Error(w, "Bad Request", http.StatusBadRequest) - return - } - if allowed { - break - } - } - } - + remoteIp := util.GetTrustedIP(r, + conf.AdminConfig.XForwardedForDepth) + allowed, err := ipList.Contains(remoteIp) if err != nil { if conf.Debug { http.Error(w, err.Error(), http.StatusInternalServerError) return } - http.Error(w, "Internal Server Error", http.StatusInternalServerError) + http.Error(w, "could not parse X-Forwarded-For IP", http.StatusBadRequest) return } if !allowed { if conf.Debug { - http.Error(w, "Unauthorized IP Address: "+remoteHost, http.StatusUnauthorized) + http.Error(w, "Unauthorized IP Address: "+remoteIp, http.StatusUnauthorized) return } http.Error(w, "Unauthorized", http.StatusUnauthorized) @@ -138,24 +114,25 @@ func configureRoutes( } else if adminConfig.KeyAuth.HeaderName != "" { key = r.Header.Get(adminConfig.KeyAuth.HeaderName) } else { - key = r.Header.Get("X-API-Key") + key = r.Header.Get("X-DGate-Key") } if _, keyFound := keyMap[key]; !keyFound { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } } - raftInstance := cs.Raft() - if r.Method == http.MethodPut && raftInstance != nil { - leader := raftInstance.Leader() - if leader == "" { - util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") - return - } - if raftInstance.State() != raft.Leader { - r.URL.Host = string(leader) - http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) - return + if raftInstance := cs.Raft(); raftInstance != nil { + if r.Method == http.MethodPut || r.Method == http.MethodDelete { + leader := raftInstance.Leader() + if leader == "" { + util.JsonError(w, http.StatusServiceUnavailable, "raft: no leader") + return + } + if raftInstance.State() != raft.Leader { + r.URL.Host = string(leader) + http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect) + return + } } } @@ -165,10 +142,14 @@ func configureRoutes( server.Get("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") - w.Header().Set("X-DGate-Raft", fmt.Sprintf("%t", cs.Raft() != nil)) w.Header().Set("X-DGate-WatchOnly", fmt.Sprintf("%t", adminConfig.WatchOnly)) w.Header().Set("X-DGate-ChangeHash", fmt.Sprintf("%d", cs.ChangeHash())) - w.Header().Set("X-DGate-AdminAPI", "true") + if raftInstance := cs.Raft(); raftInstance != nil { + w.Header().Set( + "X-DGate-Raft-State", + raftInstance.State().String(), + ) + } w.WriteHeader(http.StatusOK) w.Write([]byte("DGate Admin API")) })) diff --git a/internal/admin/changestate/change_state.go b/internal/admin/changestate/change_state.go index f5bcc5f..6723e7f 100644 --- a/internal/admin/changestate/change_state.go +++ b/internal/admin/changestate/change_state.go @@ -10,7 +10,7 @@ import ( type ChangeState interface { // Change state ApplyChangeLog(cl *spec.ChangeLog) error - ProcessChangeLog(*spec.ChangeLog, bool) error + ProcessChangeLog(cl *spec.ChangeLog, reload bool) error WaitForChanges() error ReloadState(bool, ...*spec.ChangeLog) error ChangeHash() uint32 diff --git a/internal/config/config.go b/internal/config/config.go index 1dcd90e..cf1c0c5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -39,19 +39,22 @@ type ( } DGateProxyConfig struct { - Host string `koanf:"host"` - Port int `koanf:"port"` - TLS *DGateTLSConfig `koanf:"tls"` - EnableH2C bool `koanf:"enable_h2c"` - EnableHTTP2 bool `koanf:"enable_http2"` - EnableConsoleLogger bool `koanf:"enable_console_logger"` - RedirectHttpsDomains []string `koanf:"redirect_https"` - AllowedDomains []string `koanf:"allowed_domains"` - GlobalHeaders map[string]string `koanf:"global_headers"` - Transport DGateHttpTransportConfig `koanf:"client_transport"` + Host string `koanf:"host"` + Port int `koanf:"port"` + TLS *DGateTLSConfig `koanf:"tls"` + EnableH2C bool `koanf:"enable_h2c"` + EnableHTTP2 bool `koanf:"enable_http2"` + EnableConsoleLogger bool `koanf:"enable_console_logger"` + RedirectHttpsDomains []string `koanf:"redirect_https"` + AllowedDomains []string `koanf:"allowed_domains"` + GlobalHeaders map[string]string `koanf:"global_headers"` + Transport DGateHttpTransportConfig `koanf:"client_transport"` + DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + StrictMode bool `koanf:"strict_mode"` + XForwardedForDepth int `koanf:"x_forwarded_for_depth"` + // WARN: debug use only - InitResources *DGateResources `koanf:"init_resources"` - DisableXForwardedHeaders bool `koanf:"disable_x_forwarded_headers"` + InitResources *DGateResources `koanf:"init_resources"` } DGateTestServerConfig struct { diff --git a/internal/config/loader.go b/internal/config/loader.go index f7928e8..8e41595 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -95,7 +95,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { shell := "/bin/sh" if shellEnv := os.Getenv("SHELL"); shellEnv != "" { shell = shellEnv - } + } resolveConfigStringPattern(data, CommandRegex, func(value string, results map[string]string) (string, error) { cmdResult, err := exec.CommandContext( ctx, shell, "-c", results["cmd"]).Output() @@ -182,6 +182,7 @@ func LoadConfig(dgateConfigPath string) (*DGateConfig, error) { } if k.Exists("admin") { kDefault(k, "admin.host", "127.0.0.1") + kDefault(k, "admin.x_forwarded_for_depth", -1) err = kRequireAll(k, "admin.port") if err != nil { return nil, err diff --git a/internal/proxy/change_log.go b/internal/proxy/change_log.go index 5a2ab94..0e7537a 100644 --- a/internal/proxy/change_log.go +++ b/internal/proxy/change_log.go @@ -14,12 +14,13 @@ import ( ) // processChangeLog - processes a change log and applies the change to the proxy state -func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) (err error) { +func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) error { if cl == nil { cl = &spec.ChangeLog{ Cmd: spec.NoopCommand, } } else if !cl.Cmd.IsNoop() { + var err error switch cl.Cmd.Resource() { case spec.Namespaces: var item spec.Namespace @@ -81,37 +82,39 @@ func (ps *ProxyState) processChangeLog(cl *spec.ChangeLog, reload, store bool) ( err = fmt.Errorf("unknown command: %s", cl.Cmd) } if err != nil { - ps.logger.Error("decoding or processing change log", zap.Error(err)) - return + ps.logger.Error("error processing change log", zap.Error(err)) + return err } } if reload { if cl.Cmd.IsNoop() || cl.Cmd.Resource().IsRelatedTo(spec.Routes) { ps.logger.Debug("Registering change log", zap.Stringer("cmd", cl.Cmd)) - err = ps.reconfigureState(false, cl) + err := ps.reconfigureState(false, cl) if err != nil { ps.logger.Error("Error registering change log", zap.Error(err)) - return + return err } // update change log hash only when the change is successfully applied // even if the change is a noop, we still need to update the hash changeHash, err := HashAny(ps.changeHash, cl) if err != nil { - if !ps.config.Debug { - return err - } ps.logger.Error("error updating change log hash", zap.Error(err)) + return err } else { ps.changeHash = changeHash } } } if store { - if err = ps.store.StoreChangeLog(cl); err != nil { + if cl.Cmd.IsNoop() { + ps.logger.Debug("Noop change log, skipping storage") + return nil + } + if err := ps.store.StoreChangeLog(cl); err != nil { // TODO: find a way to revert the change and reload the state // TODO: OR add flag in config to ignore storage errors ps.logger.Error("Error storing change log", zap.Error(err)) - return + return err } } @@ -254,25 +257,8 @@ func (ps *ProxyState) processSecret(scrt *spec.Secret, cl *spec.ChangeLog) (err return err } -// applyChange - apply a change to the proxy state, returns a channel that will receive an error when the state has been updated -func (ps *ProxyState) applyChange(changeLog *spec.ChangeLog) <-chan error { - done := make(chan error, 1) - if changeLog == nil { - changeLog = &spec.ChangeLog{ - Cmd: spec.NoopCommand, - } - } - changeLog.SetErrorChan(done) - if err := ps.processChangeLog(changeLog, true, true); err != nil { - done <- err - } - return done -} - func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { - // restore state change logs - logs, err := ps.store.FetchChangeLogs() - if err != nil { + if logs, err := ps.store.FetchChangeLogs(); err != nil { if err == badger.ErrKeyNotFound { ps.logger.Debug("no state change logs found in storage") } else { @@ -295,17 +281,12 @@ func (ps *ProxyState) restoreFromChangeLogs(directApply bool) error { return err } } - if !directApply { - if err = ps.processChangeLog(nil, true, false); err != nil { - return err - } - } else { + if directApply { if err = ps.reconfigureState(false, nil); err != nil { - return nil + return err } } - // TODO: optionally compact change logs through a flag in config? if len(logs) > 1 { removed, err := ps.compactChangeLogs(logs) if err != nil { diff --git a/internal/proxy/proxy_handler.go b/internal/proxy/proxy_handler.go index b6bb6fa..2b2806b 100644 --- a/internal/proxy/proxy_handler.go +++ b/internal/proxy/proxy_handler.go @@ -25,12 +25,24 @@ func proxyHandler(ps *ProxyState, reqCtx *RequestContext) { With( zap.String("route", reqCtx.route.Name), zap.String("namespace", reqCtx.route.Namespace.Name), + zap.String("path", reqCtx.req.URL.Path), + zap.String("method", reqCtx.req.Method), + zap.String("query", reqCtx.req.URL.RawQuery), + zap.String("protocol", reqCtx.req.Proto), + zap.String("remote_address", reqCtx.req.RemoteAddr), + zap.String("user_agent", reqCtx.req.UserAgent()), + zap.Int64("content_length", reqCtx.req.ContentLength), + zap.String("content_type", reqCtx.req.Header.Get("Content-Type")), ) if reqCtx.route.Service != nil { event = event.With(zap.String("service", reqCtx.route.Service.Name)) } - event.Debug("Request Log") + if ps.config.ProxyConfig.StrictMode { + event.Info("Request log") + } else { + event.Debug("Request log") + } }() defer ps.metrics.MeasureProxyRequest(reqCtx, time.Now()) @@ -156,7 +168,7 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt }). ErrorHandler(func(w http.ResponseWriter, r *http.Request, reqErr error) { upstreamErr = reqErr - ps.logger.Debug("Error proxying request", + ps.logger.Error("Error proxying request", zap.String("error", reqErr.Error()), zap.String("route", reqCtx.route.Name), zap.String("service", reqCtx.route.Service.Name), @@ -185,6 +197,12 @@ func handleServiceProxy(ps *ProxyState, reqCtx *RequestContext, modExt ModuleExt } } if !reqCtx.rw.HeadersSent() && reqCtx.rw.BytesWritten() == 0 { + ps.logger.Error("Writing error response", + zap.String("error", reqErr.Error()), + zap.String("route", reqCtx.route.Name), + zap.String("service", reqCtx.route.Service.Name), + zap.String("namespace", reqCtx.route.Namespace.Name), + ) util.WriteStatusCodeError(reqCtx.rw, http.StatusInternalServerError) } }) diff --git a/internal/proxy/proxy_state.go b/internal/proxy/proxy_state.go index 383055a..84a5223 100644 --- a/internal/proxy/proxy_state.go +++ b/internal/proxy/proxy_state.go @@ -169,8 +169,7 @@ func (ps *ProxyState) ChangeHash() uint32 { func (ps *ProxyState) SetReady() { if ps.replicationEnabled && !ps.raftReady.Load() { - ps.logger.Info("Replication status is now ready after " + - time.Since(ps.startTime).String()) + ps.logger.Info("Replication status is now ready") ps.raftReady.Store(true) return } @@ -199,32 +198,59 @@ func (ps *ProxyState) SetupRaft(r *raft.Raft, rc *raft.Config) { func (ps *ProxyState) WaitForChanges() error { ps.proxyLock.RLock() defer ps.proxyLock.RUnlock() - return <-ps.applyChange(nil) + if rft := ps.Raft(); rft != nil { + return rft.Barrier(time.Second * 5).Error() + } + return nil } func (ps *ProxyState) ApplyChangeLog(log *spec.ChangeLog) error { + ps.logger.Debug("applying change log", + zap.Bool("replication", ps.replicationEnabled), + ) if ps.replicationEnabled { - if log.Cmd.IsNoop() { - return ps.processChangeLog(log, true, false) - } - r := ps.replicationSettings.raft + r := ps.Raft() if r.State() != raft.Leader { return raft.ErrNotLeader } - encodedCL, err := json.Marshal(log) - if err != nil { + if ecl, err := json.Marshal(log); err != nil { return err + } else { + restartCallback := func(err error) { + if err != nil { + ps.logger.Error("Error restarting state", zap.Error(err)) + go ps.Stop() + } else { + ps.logger.Info("State successfully restarted") + } + } + raftLog := raft.Log{Data: ecl} + future := r.ApplyLog(raftLog, time.Second*15) + if err := future.Error(); err != nil { + ps.restartState(restartCallback) + return err + } + resp := future.Response() + if resp != nil { + ps.restartState(restartCallback) + switch val := resp.(type) { + case error: + return val + default: + ps.logger.Error("unexpected response from raft", + zap.Any("response", val), + ) + return errors.New("unexpected response from raft") + } + } + ps.logger.With(). + Info("waiting for reply from raft", + zap.String("id", log.ID), + zap.Stringer("command", log.Cmd), + zap.Error(err), + ) + return nil } - raftLog := raft.Log{ - Data: encodedCL, - } - future := r.ApplyLog(raftLog, time.Second*15) - ps.logger.With(). - Debug("waiting for reply from raft", - zap.String("id", log.ID), - zap.Stringer("command", log.Cmd), - ) - return future.Error() } else { return ps.processChangeLog(log, true, true) } @@ -289,7 +315,7 @@ func (ps *ProxyState) ReloadState(check bool, logs ...*spec.ChangeLog) error { } } if reload { - <-ps.applyChange(nil) + return ps.processChangeLog(nil, true, false) } return nil } @@ -298,8 +324,9 @@ func (ps *ProxyState) ProcessChangeLog(log *spec.ChangeLog, reload bool) error { err := ps.processChangeLog(log, reload, !ps.replicationEnabled) if err != nil { ps.logger.Error("processing error", zap.Error(err)) + return err } - return err + return nil } func (ps *ProxyState) DynamicTLSConfig(certFile, keyFile string) *tls.Config { @@ -397,7 +424,11 @@ func (ps *ProxyState) initConfigResources(resources *config.DGateResources) erro return err } if numChanges > 0 { - defer ps.processChangeLog(nil, false, false) + defer func() { + if err != nil { + err = ps.processChangeLog(nil, false, false) + } + }() } ps.logger.Info("Initializing resources") for _, ns := range resources.Namespaces { @@ -572,19 +603,30 @@ func (ps *ProxyState) ServeHTTP(w http.ResponseWriter, r *http.Request) { if router, ok := ps.routers.Find(ns.Name); ok { router.ServeHTTP(w, r) } else { - ps.logger.Debug("No router found for namespace", - zap.String("namespace", ns.Name), - ) util.WriteStatusCodeError(w, http.StatusNotFound) } } else { + if ps.config.ProxyConfig.StrictMode { + closeConnection(w) + return + } + trustedIp := util.GetTrustedIP(r, ps.config.ProxyConfig.XForwardedForDepth) ps.logger.Debug("No namespace found for request", zap.String("protocol", r.Proto), zap.String("host", r.Host), zap.String("path", r.URL.Path), zap.Bool("secure", r.TLS != nil), - zap.String("remote_addr", r.RemoteAddr), + zap.String("remote_addr", trustedIp), ) util.WriteStatusCodeError(w, http.StatusNotFound) } } + +func closeConnection(w http.ResponseWriter) { + if loot, ok := w.(http.Hijacker); ok { + if conn, _, err := loot.Hijack(); err == nil { + defer conn.Close() + return + } + } +} diff --git a/pkg/dgclient/common.go b/pkg/dgclient/common.go index bfe9b1f..681216d 100644 --- a/pkg/dgclient/common.go +++ b/pkg/dgclient/common.go @@ -150,5 +150,5 @@ func parseApiError(body io.Reader, wrapErr error) error { if err := json.NewDecoder(body).Decode(&apiError); err != nil || apiError.Error == "" { return wrapErr } - return fmt.Errorf("%d: %s", wrapErr, apiError.Error) + return fmt.Errorf("%s: %s", wrapErr, apiError.Error) } diff --git a/pkg/spec/change_log.go b/pkg/spec/change_log.go index 90d3d26..6501a26 100644 --- a/pkg/spec/change_log.go +++ b/pkg/spec/change_log.go @@ -1,6 +1,7 @@ package spec import ( + "encoding/json" "strconv" "strings" "time" @@ -16,6 +17,14 @@ type ChangeLog struct { errChan chan error } +func NewNoopChangeLog() *ChangeLog { + return &ChangeLog{ + Version: 1, + ID: strconv.FormatInt(time.Now().UnixNano(), 36), + Cmd: NoopCommand, + } +} + func NewChangeLog(item Named, namespace string, cmd Command) *ChangeLog { if item == nil { panic("item cannot be nil") @@ -49,6 +58,10 @@ func (cl *ChangeLog) PushError(err error) { } } +func (cl *ChangeLog) JSONBytes() ([]byte, error) { + return json.Marshal(cl) +} + type Command string type Action string diff --git a/pkg/spec/response_writer_tracker.go b/pkg/spec/response_writer_tracker.go index 26dcb08..6a872ee 100644 --- a/pkg/spec/response_writer_tracker.go +++ b/pkg/spec/response_writer_tracker.go @@ -1,6 +1,8 @@ package spec import ( + "bufio" + "net" "net/http" ) @@ -17,6 +19,7 @@ type rwTracker struct { bytesWritten int64 } +var _ http.Hijacker = (*rwTracker)(nil) var _ ResponseWriterTracker = (*rwTracker)(nil) func NewResponseWriterTracker(rw http.ResponseWriter) ResponseWriterTracker { @@ -61,3 +64,11 @@ func (t *rwTracker) HeadersSent() bool { func (t *rwTracker) BytesWritten() int64 { return t.bytesWritten } + +func (t *rwTracker) Hijack() (net.Conn, *bufio.ReadWriter, error) { + hijacker, ok := t.rw.(http.Hijacker) + if !ok { + return nil, nil, http.ErrNotSupported + } + return hijacker.Hijack() +} diff --git a/pkg/util/http.go b/pkg/util/http.go index 2f8e007..741f6af 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -1,7 +1,7 @@ package util import ( - "fmt" + "net" "net/http" ) @@ -9,7 +9,20 @@ func WriteStatusCodeError(w http.ResponseWriter, code int) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("X-Content-Type-Options", "nosniff") w.WriteHeader(code) - w.Write([]byte( - fmt.Sprintf("DGate: %d %s", code, http.StatusText(code)), - )) +} + +// GetTrustedIP returns the trusted IP address of the client. It checks the +// X-Forwarded-For header first, and falls back to the RemoteAddr field of the +// request if the header is not present. depth is the number of proxies that +// the request has passed through. +func GetTrustedIP(r *http.Request, depth int) string { + ips := r.Header.Values("X-Forwarded-For") + if len(ips) == 0 || depth > len(ips) { + remoteHost, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return r.RemoteAddr + } + return remoteHost + } + return ips[len(ips)-depth] }