From 06dc622835e8577d5c4deca75973294d8de6fe64 Mon Sep 17 00:00:00 2001 From: Michal Budzyn Date: Sun, 9 Feb 2025 21:25:30 +0100 Subject: [PATCH] lint fixes --- Makefile | 9 ++- cmd/kafka-proxy/server.go | 4 +- cmd/plugin-auth-ldap/main.go | 5 +- cmd/plugin-auth-user/main.go | 5 +- config/config.go | 86 +++++++++++------------ main.go | 2 +- pkg/libs/googleid-info/factory.go | 5 +- pkg/libs/googleid-info/ticker.go | 6 +- pkg/libs/googleid-provider/factory.go | 5 +- pkg/libs/googleid-provider/plugin.go | 5 +- pkg/libs/googleid-provider/ticker.go | 5 +- pkg/libs/googleid/service_account_test.go | 5 +- pkg/libs/oidc-provider/factory.go | 6 +- pkg/libs/oidc-provider/plugin.go | 29 ++------ pkg/libs/oidc-provider/plugin_test.go | 2 + pkg/libs/oidc-provider/ticker.go | 6 +- pkg/libs/oidc/service_account_test.go | 3 +- pkg/libs/util/watcher.go | 14 ++-- proxy/clientcertvalidate/errors.go | 1 - proxy/common.go | 1 + proxy/processor.go | 6 +- proxy/protocol/encoder_decoder_test.go | 2 +- proxy/protocol/response_header.go | 4 +- proxy/protocol/response_header_v1.go | 4 +- proxy/protocol/responses_test.go | 70 +++++++++--------- proxy/sasl_gssapi.go | 2 +- proxy/tls_test.go | 10 +-- proxy/util_test.go | 5 +- 28 files changed, 155 insertions(+), 152 deletions(-) diff --git a/Makefile b/Makefile index d2c6a9b7..f2fd0378 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,8 @@ PROTOC_VERSION ?= 22.2 PROTOC_BIN_DIR := .tools PROTOC := $(PROTOC_BIN_DIR)/protoc +GOLANGCI_LINT = go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.63.4 + default: build test.race: @@ -33,10 +35,13 @@ test: fmt: go fmt $(GOPKGS) -check: - golint $(GOPKGS) +check: lint-code go vet $(GOPKGS) +.PHONY: lint-code +lint-code: + $(GOLANGCI_LINT) run --timeout 5m + .PHONY: build build: build/$(BINARY) diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index 2b214e8e..452c4f56 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -450,7 +450,7 @@ func Run(_ *cobra.Command, _ []string) { func NewHTTPHandler() http.Handler { m := http.NewServeMux() m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte( + _, _ = w.Write([]byte( ` kafka-proxy service @@ -462,7 +462,7 @@ func NewHTTPHandler() http.Handler { `)) }) m.HandleFunc(c.Http.HealthPath, func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(`OK`)) + _, _ = w.Write([]byte(`OK`)) }) m.Handle(c.Http.MetricsPath, promhttp.Handler()) diff --git a/cmd/plugin-auth-ldap/main.go b/cmd/plugin-auth-ldap/main.go index 0cdeb3a3..0ec86df8 100644 --- a/cmd/plugin-auth-ldap/main.go +++ b/cmd/plugin-auth-ldap/main.go @@ -257,7 +257,10 @@ func main() { pluginMeta := &pluginMeta{} flags := pluginMeta.flagSet() - flags.Parse(os.Args[1:]) + if err := flags.Parse(os.Args[1:]); err != nil { + logrus.Errorf("error parsing flags: %v", err) + os.Exit(1) + } urls, err := pluginMeta.getUrls() if err != nil { diff --git a/cmd/plugin-auth-user/main.go b/cmd/plugin-auth-user/main.go index e91a4852..1ba85301 100644 --- a/cmd/plugin-auth-user/main.go +++ b/cmd/plugin-auth-user/main.go @@ -31,7 +31,10 @@ func (f *PasswordAuthenticator) flagSet() *flag.FlagSet { func main() { passwordAuthenticator := &PasswordAuthenticator{} flags := passwordAuthenticator.flagSet() - flags.Parse(os.Args[1:]) + if err := flags.Parse(os.Args[1:]); err != nil { + logrus.Errorf("error parsing flags: %v", err) + os.Exit(1) + } if passwordAuthenticator.Password == "" { passwordAuthenticator.Password = os.Getenv(EnvSaslPassword) diff --git a/config/config.go b/config/config.go index 51b90805..f9529c91 100644 --- a/config/config.go +++ b/config/config.go @@ -215,60 +215,56 @@ func (c *Config) InitSASLCredentials() (err error) { return nil } func getDialAddressMappings(dialMapping []string) ([]DialAddressMapping, error) { - dialMappings := make([]DialAddressMapping, 0) - if dialMapping != nil { - for _, v := range dialMapping { - pair := strings.Split(v, ",") - if len(pair) != 2 { - return nil, errors.New("dial-mapping must be in form 'srchost:srcport,dsthost:dstport'") - } - srcHost, srcPort, err := util.SplitHostPort(pair[0]) - if err != nil { - return nil, err - } - dstHost, dstPort, err := util.SplitHostPort(pair[1]) - if err != nil { - return nil, err - } - dialMapping := DialAddressMapping{ - SourceAddress: net.JoinHostPort(srcHost, fmt.Sprint(srcPort)), - DestinationAddress: net.JoinHostPort(dstHost, fmt.Sprint(dstPort))} - dialMappings = append(dialMappings, dialMapping) + dialMappings := make([]DialAddressMapping, 0, len(dialMapping)) + for _, v := range dialMapping { + pair := strings.Split(v, ",") + if len(pair) != 2 { + return nil, errors.New("dial-mapping must be in form 'srchost:srcport,dsthost:dstport'") + } + srcHost, srcPort, err := util.SplitHostPort(pair[0]) + if err != nil { + return nil, err } + dstHost, dstPort, err := util.SplitHostPort(pair[1]) + if err != nil { + return nil, err + } + dialAddressMapping := DialAddressMapping{ + SourceAddress: net.JoinHostPort(srcHost, fmt.Sprint(srcPort)), + DestinationAddress: net.JoinHostPort(dstHost, fmt.Sprint(dstPort))} + dialMappings = append(dialMappings, dialAddressMapping) } return dialMappings, nil } func getListenerConfigs(serversMapping []string) ([]ListenerConfig, error) { - listenerConfigs := make([]ListenerConfig, 0) - if serversMapping != nil { - for _, v := range serversMapping { - pair := strings.Split(v, ",") - if len(pair) != 2 && len(pair) != 3 { - return nil, errors.New("server-mapping must be in form 'remotehost:remoteport,localhost:localport(,advhost:advport)'") - } - remoteHost, remotePort, err := util.SplitHostPort(pair[0]) - if err != nil { - return nil, err - } - localHost, localPort, err := util.SplitHostPort(pair[1]) + listenerConfigs := make([]ListenerConfig, 0, len(serversMapping)) + for _, v := range serversMapping { + pair := strings.Split(v, ",") + if len(pair) != 2 && len(pair) != 3 { + return nil, errors.New("server-mapping must be in form 'remotehost:remoteport,localhost:localport(,advhost:advport)'") + } + remoteHost, remotePort, err := util.SplitHostPort(pair[0]) + if err != nil { + return nil, err + } + localHost, localPort, err := util.SplitHostPort(pair[1]) + if err != nil { + return nil, err + } + advertisedHost, advertisedPort := localHost, localPort + if len(pair) == 3 { + advertisedHost, advertisedPort, err = util.SplitHostPort(pair[2]) if err != nil { return nil, err } - advertisedHost, advertisedPort := localHost, localPort - if len(pair) == 3 { - advertisedHost, advertisedPort, err = util.SplitHostPort(pair[2]) - if err != nil { - return nil, err - } - } - - listenerConfig := ListenerConfig{ - BrokerAddress: net.JoinHostPort(remoteHost, fmt.Sprint(remotePort)), - ListenerAddress: net.JoinHostPort(localHost, fmt.Sprint(localPort)), - AdvertisedAddress: net.JoinHostPort(advertisedHost, fmt.Sprint(advertisedPort))} - listenerConfigs = append(listenerConfigs, listenerConfig) } + + listenerConfig := ListenerConfig{ + BrokerAddress: net.JoinHostPort(remoteHost, fmt.Sprint(remotePort)), + ListenerAddress: net.JoinHostPort(localHost, fmt.Sprint(localPort)), + AdvertisedAddress: net.JoinHostPort(advertisedHost, fmt.Sprint(advertisedPort))} + listenerConfigs = append(listenerConfigs, listenerConfig) } return listenerConfigs, nil } @@ -361,7 +357,7 @@ func (c *Config) Validate() error { return errors.New("MaxOpenRequests must be greater than 0") } // proxy - if c.Proxy.BootstrapServers == nil || len(c.Proxy.BootstrapServers) == 0 { + if len(c.Proxy.BootstrapServers) == 0 { return errors.New("list of bootstrap-server-mapping must not be empty") } if c.Proxy.DefaultListenerIP == "" { diff --git a/main.go b/main.go index 3e5534dc..06db1a04 100644 --- a/main.go +++ b/main.go @@ -13,7 +13,7 @@ var RootCmd = &cobra.Command{ Short: "Server that proxies requests to Kafka brokers", Long: ``, Run: func(cmd *cobra.Command, args []string) { - cmd.Help() + _ = cmd.Help() os.Exit(1) }, } diff --git a/pkg/libs/googleid-info/factory.go b/pkg/libs/googleid-info/factory.go index 4890bdaa..dd232484 100644 --- a/pkg/libs/googleid-info/factory.go +++ b/pkg/libs/googleid-info/factory.go @@ -36,7 +36,10 @@ func (t *Factory) New(params []string) (apis.TokenInfo, error) { fs.Var(&pluginMeta.audience, "audience", "The audience of a token") fs.Var(&pluginMeta.emailsRegex, "email-regex", "Regex of the email claim") - fs.Parse(params) + err := fs.Parse(params) + if err != nil { + return nil, err + } opts := TokenInfoOptions{ Timeout: pluginMeta.timeout, diff --git a/pkg/libs/googleid-info/ticker.go b/pkg/libs/googleid-info/ticker.go index 9b1c3fc1..f265887f 100644 --- a/pkg/libs/googleid-info/ticker.go +++ b/pkg/libs/googleid-info/ticker.go @@ -43,7 +43,7 @@ func (p *certsRefresher) refreshLoop() { } } -func (p *certsRefresher) refreshTick() error { +func (p *certsRefresher) refreshTick() { op := func() error { return p.tokenInfo.refreshCerts() } @@ -52,10 +52,10 @@ func (p *certsRefresher) refreshTick() error { backOff.MaxInterval = 2 * time.Minute err := backoff.Retry(op, backOff) if err != nil { - return err + logrus.Errorf("Certs refresh failed : %v", err) + return } kids := p.tokenInfo.getPublicKeyIDs() sort.Strings(kids) logrus.Infof("Refreshed certs Key IDs: %v", kids) - return nil } diff --git a/pkg/libs/googleid-provider/factory.go b/pkg/libs/googleid-provider/factory.go index 5ae99151..00947cc4 100644 --- a/pkg/libs/googleid-provider/factory.go +++ b/pkg/libs/googleid-provider/factory.go @@ -38,7 +38,10 @@ func (t *Factory) New(params []string) (apis.TokenProvider, error) { fs.BoolVar(&pluginMeta.credentialsWatch, "credentials-watch", true, "Watch credential for reload") fs.StringVar(&pluginMeta.targetAudience, "target-audience", "", "URI of audience claim") - fs.Parse(params) + err := fs.Parse(params) + if err != nil { + return nil, err + } options := TokenProviderOptions{ Timeout: pluginMeta.timeout, diff --git a/pkg/libs/googleid-provider/plugin.go b/pkg/libs/googleid-provider/plugin.go index 2a65bac0..529b280c 100644 --- a/pkg/libs/googleid-provider/plugin.go +++ b/pkg/libs/googleid-provider/plugin.go @@ -146,10 +146,7 @@ func renewLatest(token *googleid.Token) bool { } // renew before expiry advExp := token.ClaimSet.Exp - int64(clockSkew.Seconds()) - if nowFn().Unix() > advExp { - return true - } - return false + return nowFn().Unix() > advExp } // GetToken implements apis.TokenProvider.GetToken method diff --git a/pkg/libs/googleid-provider/ticker.go b/pkg/libs/googleid-provider/ticker.go index 2672b14a..60f2fbef 100644 --- a/pkg/libs/googleid-provider/ticker.go +++ b/pkg/libs/googleid-provider/ticker.go @@ -87,8 +87,5 @@ func renewEarliest(claimSet *googleid.ClaimSet) bool { refreshAfter := int64(validity / 2) refreshTime := claimSet.Exp - int64(clockSkew.Seconds()) - refreshAfter // logrus.Debugf("New refresh time %d (%v)", refreshTime, time.Unix(refreshTime, 0)) - if nowFn().Unix() > refreshTime { - return true - } - return false + return nowFn().Unix() > refreshTime } diff --git a/pkg/libs/googleid/service_account_test.go b/pkg/libs/googleid/service_account_test.go index df4e3575..2269f147 100644 --- a/pkg/libs/googleid/service_account_test.go +++ b/pkg/libs/googleid/service_account_test.go @@ -11,14 +11,15 @@ import ( func TestGetServiceAccountIDToken(t *testing.T) { t.Skip() // Uncomment to execute - + a := assert.New(t) credentialsFile := filepath.Join(os.Getenv("HOME"), "kafka-gateway-service-account.json") src, err := NewServiceAccountTokenSource(credentialsFile, "tcp://kafka-gateway.grepplabs.com") + a.Nil(err) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() token, err := src.GetIDToken(ctx) - a := assert.New(t) a.Nil(err) a.NotEmpty(token) } diff --git a/pkg/libs/oidc-provider/factory.go b/pkg/libs/oidc-provider/factory.go index fd715ffd..2d709666 100644 --- a/pkg/libs/oidc-provider/factory.go +++ b/pkg/libs/oidc-provider/factory.go @@ -38,8 +38,10 @@ func (t *Factory) New(params []string) (apis.TokenProvider, error) { fs.BoolVar(&pluginMeta.credentialsWatch, "credentials-watch", true, "Watch credential for reload") fs.StringVar(&pluginMeta.targetAudience, "target-audience", "", "URI of audience claim") - fs.Parse(params) - + err := fs.Parse(params) + if err != nil { + return nil, err + } options := TokenProviderOptions{ Timeout: pluginMeta.timeout, CredentialsWatch: pluginMeta.credentialsWatch, diff --git a/pkg/libs/oidc-provider/plugin.go b/pkg/libs/oidc-provider/plugin.go index 0483e293..0cf89c8c 100644 --- a/pkg/libs/oidc-provider/plugin.go +++ b/pkg/libs/oidc-provider/plugin.go @@ -162,12 +162,7 @@ func renewLatest(token *oidc.Token) bool { } // renew before expiry advExp := token.ClaimSet.Exp - int64(clockSkew.Seconds()) - - if nowFn().Unix() > advExp { - return true - } - - return false + return nowFn().Unix() > advExp } // GetToken implements apis.TokenProvider.GetToken method @@ -225,7 +220,7 @@ func getTokenSource(credentialsFilePath string, targetAud string) (idTokenSource switch grantType.Name { case "password": - passwordGrantSource, err := NewPasswordGrantSource( + passwordGrantSource, err := newPasswordGrantSource( credentialsFilePath, targetAud) @@ -235,7 +230,7 @@ func getTokenSource(credentialsFilePath string, targetAud string) (idTokenSource return passwordGrantSource, nil default: - serviceAccountSource, err := NewServiceAccountSource( + serviceAccountSource, err := newServiceAccountSource( credentialsFilePath, targetAud) @@ -263,14 +258,7 @@ func (p *serviceAccountSource) getServiceAccountTokenSource() *oidc.ServiceAccou return p.source } -func (p *serviceAccountSource) setServiceAccountTokenSource(source *oidc.ServiceAccountTokenSource) { - p.l.Lock() - defer p.l.Unlock() - - p.source = source -} - -func NewServiceAccountSource(credentialsFile string, targetAudience string) (*serviceAccountSource, error) { +func newServiceAccountSource(credentialsFile string, targetAudience string) (*serviceAccountSource, error) { source, err := oidc.NewServiceAccountTokenSource(credentialsFile, targetAudience) if err != nil { @@ -298,14 +286,7 @@ func (p *passwordGrantSource) getPasswordGrantTokenSource() *oidc.PasswordGrantT return p.source } -func (p *passwordGrantSource) setPasswordGrantTokenSource(source *oidc.PasswordGrantTokenSource) { - p.l.Lock() - defer p.l.Unlock() - - p.source = source -} - -func NewPasswordGrantSource(credentialsFile string, targetAudience string) (*passwordGrantSource, error) { +func newPasswordGrantSource(credentialsFile string, targetAudience string) (*passwordGrantSource, error) { source, err := oidc.NewPasswordGrantTokenSource(credentialsFile, targetAudience) if err != nil { diff --git a/pkg/libs/oidc-provider/plugin_test.go b/pkg/libs/oidc-provider/plugin_test.go index 3418c0f5..1c9ad6d5 100644 --- a/pkg/libs/oidc-provider/plugin_test.go +++ b/pkg/libs/oidc-provider/plugin_test.go @@ -112,6 +112,7 @@ func TestGetToken(t *testing.T) { actual, err := prov.GetToken(context.Background(), apis.TokenRequest{}) a.NotNil(token) + a.Nil(err) exp := apis.TokenResponse{Success: true, Status: int32(StatusOK), Token: testToken} a.Equal(exp, actual) @@ -120,6 +121,7 @@ func TestGetToken(t *testing.T) { actual, err = prov.GetToken(context.Background(), apis.TokenRequest{}) a.NotNil(token) + a.Nil(err) exp = apis.TokenResponse{Success: true, Status: int32(StatusOK), Token: testToken} a.Equal(exp, actual) diff --git a/pkg/libs/oidc-provider/ticker.go b/pkg/libs/oidc-provider/ticker.go index 37544293..82ad3f22 100644 --- a/pkg/libs/oidc-provider/ticker.go +++ b/pkg/libs/oidc-provider/ticker.go @@ -109,9 +109,5 @@ func renewEarliest(claimSet *oidc.ClaimSet) bool { refreshAfter := int64(validity / 2) refreshTime := claimSet.Exp - int64(clockSkew.Seconds()) - refreshAfter // logrus.Debugf("New refresh time %d (%v)", refreshTime, time.Unix(refreshTime, 0)) - if nowFn().Unix() > refreshTime { - return true - } - - return false + return nowFn().Unix() > refreshTime } diff --git a/pkg/libs/oidc/service_account_test.go b/pkg/libs/oidc/service_account_test.go index 144fa0aa..73c649c8 100644 --- a/pkg/libs/oidc/service_account_test.go +++ b/pkg/libs/oidc/service_account_test.go @@ -12,14 +12,15 @@ import ( func TestGetServiceAccountIDToken(t *testing.T) { t.Skip() // Uncomment to execute + a := assert.New(t) credentialsFile := filepath.Join(os.Getenv("HOME"), "kafka-gateway-service-account.json") src, err := NewServiceAccountTokenSource(credentialsFile, "tcp://kafka-gateway.grepplabs.com") + a.Nil(err) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() token, err := src.GetIDToken(ctx) - a := assert.New(t) a.Nil(err) a.NotEmpty(token) } diff --git a/pkg/libs/util/watcher.go b/pkg/libs/util/watcher.go index 740984c1..3330de6a 100644 --- a/pkg/libs/util/watcher.go +++ b/pkg/libs/util/watcher.go @@ -70,13 +70,16 @@ func watchFileForUpdates(filename string, done <-chan bool, action func()) error done: for { select { - case _ = <-done: + case <-done: logrus.Printf("Shutting down watcher for: %s", filename) break done case event := <-watcher.Events: if event.Op&(fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod) != 0 { logrus.Debugf("watching interrupted on event: %s", event) - watcher.Remove(filename) + err := watcher.Remove(filename) + if err != nil { + logrus.Debugf("failed to remove %s: %s", filename, err) + } waitForReplacement(filename, event.Op, watcher) } logrus.Infof("execute action after event %s on %s ", event.Op, event.Name) @@ -114,13 +117,16 @@ func watchLinkForUpdates(filename string, done <-chan bool, action func()) error done: for { select { - case _ = <-done: + case <-done: logrus.Printf("Shutting down watcher for: %s", dirname) break done case event := <-watcher.Events: if event.Op&(fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod) != 0 { logrus.Debugf("watching interrupted on event: %s", event) - watcher.Remove(dirname) + err := watcher.Remove(dirname) + if err != nil { + logrus.Debugf("failed to remove %s: %s", dirname, err) + } waitForReplacement(dirname, event.Op, watcher) } if event.Op&(fsnotify.Remove) != 0 { diff --git a/proxy/clientcertvalidate/errors.go b/proxy/clientcertvalidate/errors.go index ef047931..3a3db5a8 100644 --- a/proxy/clientcertvalidate/errors.go +++ b/proxy/clientcertvalidate/errors.go @@ -7,7 +7,6 @@ import ( ) var errUnexpectedPattern = errors.New("errUnexpectedPattern") -var errNestedPattern = errors.New("errNestedPattern") // ClientCertificateRejectedError contains the details of the certificate rejection reason. type ClientCertificateRejectedError struct { diff --git a/proxy/common.go b/proxy/common.go index af6d3cb3..406d641d 100644 --- a/proxy/common.go +++ b/proxy/common.go @@ -36,6 +36,7 @@ type DeadlineReaderWriter interface { // myCopy is similar to io.Copy, but reports whether the returned error was due // to a bad read or write. The returned error will never be nil +// nolint:unused func myCopy(dst io.Writer, src io.Reader) (readErr bool, err error) { buf := make([]byte, 4096) for { diff --git a/proxy/processor.go b/proxy/processor.go index 80ec71df..d7ae6156 100644 --- a/proxy/processor.go +++ b/proxy/processor.go @@ -117,8 +117,10 @@ func (p *processor) RequestsLoop(dst DeadlineWriter, src DeadlineReaderWriter) ( return true, err } } - src.SetDeadline(time.Time{}) - + err = src.SetDeadline(time.Time{}) + if err != nil { + return false, err + } ctx := &RequestsLoopContext{ openRequestsChannel: p.openRequestsChannel, nextRequestHandlerChannel: p.nextRequestHandlerChannel, diff --git a/proxy/protocol/encoder_decoder_test.go b/proxy/protocol/encoder_decoder_test.go index 0ca0216b..cb3e9b76 100644 --- a/proxy/protocol/encoder_decoder_test.go +++ b/proxy/protocol/encoder_decoder_test.go @@ -53,7 +53,7 @@ func TestEncodeDecodeCompactBytes(t *testing.T) { t.Fatalf("Values array lengths differ: expected %v, actual %v", request.values, response.values) } for i := range request.values { - if bytes.Compare(request.values[i], response.values[i]) != 0 { + if !bytes.Equal(request.values[i], response.values[i]) { t.Fatalf("Values differ: index %d, expected %v, actual %v", i, request.values[i], response.values[i]) } } diff --git a/proxy/protocol/response_header.go b/proxy/protocol/response_header.go index 55bd71c9..0f1cd2da 100644 --- a/proxy/protocol/response_header.go +++ b/proxy/protocol/response_header.go @@ -72,9 +72,7 @@ func (r *ResponseHeaderTaggedFields) MaybeRead(reader io.Reader) ([]byte, error) if err != nil { return nil, errors.Wrap(err, "error while reading tagged field size") } - if size < 0 { - return nil, errors.New("negative size of tagged field data") - } else if size == 0 { + if size == 0 { continue } else { if _, err := io.CopyN(io.Discard, reader, int64(size)); err != nil { diff --git a/proxy/protocol/response_header_v1.go b/proxy/protocol/response_header_v1.go index 67d3b4b4..c5dcf9a6 100644 --- a/proxy/protocol/response_header_v1.go +++ b/proxy/protocol/response_header_v1.go @@ -19,7 +19,9 @@ func (r *ResponseHeaderV1) decode(pd packetDecoder) (err error) { return PacketDecodingError{fmt.Sprintf("message of length %d too small", r.Length)} } r.CorrelationID, err = pd.getInt32() - + if err != nil { + return err + } tf := &SchemaTaggedFields{} taggedFields, err := tf.decode(pd) if err != nil { diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index 549e9737..1ba49d41 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -263,7 +263,7 @@ func TestEmptyMetadataResponseV0(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -336,7 +336,7 @@ func TestMetadataResponseV0(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -388,7 +388,7 @@ func TestMetadataResponseV0(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -499,7 +499,7 @@ func TestMetadataResponseV1(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -556,7 +556,7 @@ func TestMetadataResponseV1(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -684,7 +684,7 @@ func TestMetadataResponseV2(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -743,7 +743,7 @@ func TestMetadataResponseV2(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -875,7 +875,7 @@ func TestMetadataResponseV3(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -935,7 +935,7 @@ func TestMetadataResponseV3(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1068,7 +1068,7 @@ func TestMetadataResponseV4(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1128,7 +1128,7 @@ func TestMetadataResponseV4(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1261,7 +1261,7 @@ func TestMetadataResponseV5(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1325,7 +1325,7 @@ func TestMetadataResponseV5(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1463,7 +1463,7 @@ func TestMetadataResponseV6(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1527,7 +1527,7 @@ func TestMetadataResponseV6(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1667,7 +1667,7 @@ func TestMetadataResponseV7(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1732,7 +1732,7 @@ func TestMetadataResponseV7(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1879,7 +1879,7 @@ func TestMetadataResponseV8(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -1947,7 +1947,7 @@ func TestMetadataResponseV8(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2448,7 +2448,7 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2472,7 +2472,7 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2504,7 +2504,7 @@ func TestFindCoordinatorResponseV0(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2528,7 +2528,7 @@ func TestFindCoordinatorResponseV0(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2572,7 +2572,7 @@ func TestFindCoordinatorResponseV1(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2598,7 +2598,7 @@ func TestFindCoordinatorResponseV1(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2644,7 +2644,7 @@ func TestFindCoordinatorResponseV2(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2670,7 +2670,7 @@ func TestFindCoordinatorResponseV2(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2701,7 +2701,7 @@ func TestFindCoordinatorResponseV3(t *testing.T) { s, err := DecodeSchema(bytes, schema) a.Nil(err) - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2728,7 +2728,7 @@ func TestFindCoordinatorResponseV3(t *testing.T) { a.Nil(err) s, err = DecodeSchema(resp, schema) a.Nil(err) - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2870,7 +2870,7 @@ func TestMetadataResponses(t *testing.T) { if err != nil { t.Fatal(err) } - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2897,7 +2897,7 @@ func TestMetadataResponses(t *testing.T) { if err != nil { t.Fatal(err) } - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -2982,7 +2982,7 @@ func TestFindCoordinatorResponse(t *testing.T) { if err != nil { t.Fatal(err) } - dc := NewDecodeCheck() + dc := newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -3009,7 +3009,7 @@ func TestFindCoordinatorResponse(t *testing.T) { if err != nil { t.Fatal(err) } - dc = NewDecodeCheck() + dc = newDecodeCheck() err = dc.Traverse(s) if err != nil { t.Fatal(err) @@ -3022,12 +3022,12 @@ type decodeCheck struct { attrValues []string } -func NewDecodeCheck() *decodeCheck { +func newDecodeCheck() *decodeCheck { return &decodeCheck{attrValues: make([]string, 0)} } func (t *decodeCheck) Traverse(s *Struct) error { - for i, _ := range s.GetSchema().GetFields() { + for i := range s.GetSchema().GetFields() { arg := s.Values[i] if err := t.value(s, arg, i); err != nil { return err diff --git a/proxy/sasl_gssapi.go b/proxy/sasl_gssapi.go index 116187d6..12e28430 100644 --- a/proxy/sasl_gssapi.go +++ b/proxy/sasl_gssapi.go @@ -79,7 +79,7 @@ func (g *SASLGSSAPIAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter, broke logrus.Errorf("Failed to send GSSAPI AP_REQ1") return err } - var receivedBytes []byte = nil + var receivedBytes []byte bytesLen := 0 receivedBytes, bytesLen, err = g.readPackage(conn) requestLatency := time.Since(requestTime) diff --git a/proxy/tls_test.go b/proxy/tls_test.go index cfd710b8..06223bfd 100644 --- a/proxy/tls_test.go +++ b/proxy/tls_test.go @@ -476,9 +476,10 @@ func pingPong(t *testing.T, c1, c2 net.Conn) { clientResult := make(chan error, 1) go func() { // send "ping" - c1.SetDeadline(time.Now().Add(2 * time.Second)) + err := c1.SetDeadline(time.Now().Add(2 * time.Second)) + a.Nil(err) request := bytes.NewBuffer(ping) - _, err := io.Copy(c1, request) + _, err = io.Copy(c1, request) if err != nil { clientResult <- err return @@ -496,9 +497,10 @@ func pingPong(t *testing.T, c1, c2 net.Conn) { clientResult <- nil }() - c2.SetDeadline(time.Now().Add(2 * time.Second)) + err := c2.SetDeadline(time.Now().Add(2 * time.Second)) + a.Nil(err) request := make([]byte, len(ping)) - _, err := io.ReadFull(c2, request) + _, err = io.ReadFull(c2, request) a.Nil(err) a.Equal("ping", string(request)) diff --git a/proxy/util_test.go b/proxy/util_test.go index 5598019f..eaf0bec1 100644 --- a/proxy/util_test.go +++ b/proxy/util_test.go @@ -430,6 +430,9 @@ func generateCertWithSubject(catls *tls.Certificate, certFile *os.File, keyFile } // Private key err = pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)}) + if err != nil { + return err + } err = keyFile.Sync() if err != nil { return err @@ -491,7 +494,7 @@ func generateCA(certFile *os.File, keyFile *os.File) (*tls.Certificate, error) { return nil, err } - ca, err = x509.ParseCertificate(catls.Certificate[0]) + _, err = x509.ParseCertificate(catls.Certificate[0]) if err != nil { return nil, err }