Skip to content

Commit

Permalink
Fix Segfault with MSK new 3.7.x version candidates
Browse files Browse the repository at this point in the history
## Why
AWS [changed their version naming convention on
MSK](https://docs.aws.amazon.com/msk/latest/developerguide/supported-kafka-versions.html#3.7.kraft)
leading to a new 3.7.x version candidate for upgrade (automatic patch
update handled by AWS)

This is generating Segfault in this application with the
go-version/semver library not parsing it correctly.

We fixed this issue updating MSK handler to replace those 'x' version
part by a valid number.

Because AWS is handling all patch version, replacing by `0` should
produce the expected result we want on the upgrade manager service.

## How
- Update MSK source version connector to handle semver containing 'x'
- Update the semver Sort function to return proper error instead of
crashing with segfault
- The codebase already [expose a metric with the number of
processingError](https://github.com/qonto/upgrade-manager/blob/main/internal/app/app.go#L281).
Alert can be built on this metric to be notified of future version issue
without having the application crashing and failing to produce report
for all other monitored source
  • Loading branch information
Jumanjii committed Jul 15, 2024
1 parent 86aec00 commit 9190def
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 35 deletions.
6 changes: 5 additions & 1 deletion internal/app/calculators/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func (c *DefaultCalculator) CalculateObsolescenceScore(s *soft.Software) error {
c.log.Debug(fmt.Sprintf("Total of %d softwares to compute in order to compute software %s's total score", len(softwaresToCalculate), s.Name))
topLevelScore := 0
for _, software := range softwaresToCalculate {
semver.Sort(software.VersionCandidates)
err := semver.Sort(software.VersionCandidates)
if err != nil {
return fmt.Errorf("failed to sort versions candidate for software %s, %w", software.Name, err)
}

// Retrieve semantic versions
lv, err := goversion.NewSemver(software.VersionCandidates[0].Version)
if err != nil {
Expand Down
26 changes: 23 additions & 3 deletions internal/app/semver/semver.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
package semver

import (
"errors"
"fmt"
"sort"

goversion "github.com/hashicorp/go-version"
"github.com/qonto/upgrade-manager/internal/app/core/software"
)

func Sort(versions []software.Version) {
var ErrorInSemverSortFunction = errors.New("cannot sort software semver versions")

func Sort(versions []software.Version) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%w %v", ErrorInSemverSortFunction, r)
}
}()

sort.Slice(versions, func(i, j int) bool {
iVersion, _ := goversion.NewSemver(versions[i].Version)
jVersion, _ := goversion.NewSemver(versions[j].Version)
iVersion, err := goversion.NewSemver(versions[i].Version)
if err != nil {
panic(fmt.Errorf("cannot sort software %s versions: %w", versions[i].Name, err))
}

jVersion, err := goversion.NewSemver(versions[j].Version)
if err != nil {
panic(fmt.Errorf("cannot sort software %s versions: %w", versions[i].Name, err))
}

// Filtering out versions older than current version
return iVersion.Core().Compare(jVersion.Core()) == 1
})

return nil
}

func ExtractFromString(rawString string) (string, error) {
Expand Down
31 changes: 29 additions & 2 deletions internal/app/semver/semver_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package semver

import (
"errors"
"testing"

"github.com/qonto/upgrade-manager/internal/app/core/software"
Expand All @@ -10,6 +11,7 @@ func TestSortSoftwareVersions(t *testing.T) {
testCases := []struct {
versions []software.Version
expected string
error error
}{
{
versions: []software.Version{
Expand All @@ -24,6 +26,7 @@ func TestSortSoftwareVersions(t *testing.T) {
},
},
expected: "7.0.0",
error: nil,
},
{
versions: []software.Version{
Expand All @@ -38,11 +41,35 @@ func TestSortSoftwareVersions(t *testing.T) {
},
},
expected: "7.0.0",
error: nil,
},
{
versions: []software.Version{
{
Name: "test-soft",
Version: "5.0.0",
},
{
Name: "test-soft",
Version: "6.x.x",
},
{
Name: "test-soft",
Version: "7.0.0",
},
},
expected: "",
error: ErrorInSemverSortFunction,
},
}
for idx, testCase := range testCases {
Sort(testCase.versions)
if testCase.versions[0].Version != testCase.expected {
err := Sort(testCase.versions)

if err != testCase.error && !errors.Is(err, testCase.error) {

Check failure on line 68 in internal/app/semver/semver_test.go

View workflow job for this annotation

GitHub Actions / lint

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Errorf("Case %d, error returned is not error expected. Expected %s, got: %s", idx+1, testCase.error, err)
}

if testCase.error == nil && testCase.versions[0].Version != testCase.expected {
t.Errorf("Case %d, wrong first element in sorted slice. Expected %s, got: %s", idx+1, testCase.expected, testCase.versions[0].Version)
}
}
Expand Down
24 changes: 23 additions & 1 deletion internal/app/sources/aws/msk/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package msk
import (
"context"
"log/slog"
"regexp"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/service/kafka"

Check failure on line 11 in internal/app/sources/aws/msk/source.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
"github.com/qonto/upgrade-manager/internal/app/core/software"
"github.com/qonto/upgrade-manager/internal/app/filters"
"github.com/qonto/upgrade-manager/internal/infra/aws"
Expand All @@ -22,6 +24,12 @@ type Source struct {
const (
MskCluster software.SoftwareType = "msk cluster"
DefaultTimeout time.Duration = time.Second * 15

// From version 3.7.0, AWS start using 3.7.x version with automated patch update manage by themselves.
// See https://docs.aws.amazon.com/msk/latest/developerguide/supported-kafka-versions.html#3.7.kraft
//
// When a candidate with x appears, we replace them with a 0 version to be still able to process them.
SemVerWithX = `(\d+|x)\.(\d+|x)\.(\d+|x)`
)

func (s *Source) Name() string {
Expand Down Expand Up @@ -49,16 +57,18 @@ func (s *Source) Load() ([]*software.Software, error) {
if err != nil {
return nil, err
}

for _, cluster := range res.ClusterInfoList {
res, err := s.api.GetCompatibleKafkaVersions(context.TODO(), &kafka.GetCompatibleKafkaVersionsInput{
ClusterArn: cluster.ClusterArn,
})
if err != nil {
return nil, err
}

versionCandidates := []software.Version{}
for _, v := range res.CompatibleKafkaVersions[0].TargetVersions {
versionCandidate := strings.ReplaceAll(v, ".tiered", "")
versionCandidate := cleanMSKVersionSpecials(v)
versionCandidates = append(versionCandidates, software.Version{Version: versionCandidate})
}
s := &software.Software{
Expand All @@ -73,3 +83,15 @@ func (s *Source) Load() ([]*software.Software, error) {

return softwares, nil
}

func cleanMSKVersionSpecials(version string) string {
semverXRegexp := regexp.MustCompile(SemVerWithX)

// Clean tiered version
versionCandidate := strings.ReplaceAll(version, ".tiered", "")

// Clean .x version to be valid semver
return semverXRegexp.ReplaceAllStringFunc(versionCandidate, func(m string) string {
return regexp.MustCompile(`x`).ReplaceAllString(m, "0")
})
}
147 changes: 119 additions & 28 deletions internal/app/sources/aws/msk/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"log/slog"
"testing"

"github.com/qonto/upgrade-manager/internal/app/core/software"

Check failure on line 7 in internal/app/sources/aws/msk/source_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)

"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"

Check failure on line 10 in internal/app/sources/aws/msk/source_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
"github.com/qonto/upgrade-manager/internal/app/sources/utils"
Expand All @@ -12,38 +14,127 @@ import (
)

func TestLoad(t *testing.T) {
api := new(aws.MockMSKApi)
api.On("ListClustersV2", mock.Anything).Return(
&kafka.ListClustersV2Output{
ClusterInfoList: []types.Cluster{
{
ClusterName: utils.Ptr("mycluster"),
ClusterArn: utils.Ptr("arn:myclusterarn"),
Provisioned: &types.Provisioned{
CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{
KafkaVersion: utils.Ptr("2.0.0"),
testCases := []struct {
name string
initFunc func(*aws.MockMSKApi)
expectedError bool
expectedClusterCount int
expectedVersionCandidates []string
}{
{
name: "happy path",
initFunc: func(api *aws.MockMSKApi) {
api.On("ListClustersV2", mock.Anything).Return(
&kafka.ListClustersV2Output{
ClusterInfoList: []types.Cluster{
{
ClusterName: utils.Ptr("mycluster"),
ClusterArn: utils.Ptr("arn:myclusterarn"),
Provisioned: &types.Provisioned{
CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{
KafkaVersion: utils.Ptr("2.0.0"),
},
},
},
},
})

api.On("GetCompatibleKafkaVersions", mock.Anything).Return(
&kafka.GetCompatibleKafkaVersionsOutput{
CompatibleKafkaVersions: []types.CompatibleKafkaVersion{
{
TargetVersions: []string{
"2.2.3",
"2.3.4",
},
},
},
},
},
})
},
})
api.On("GetCompatibleKafkaVersions", mock.Anything).Return(
&kafka.GetCompatibleKafkaVersionsOutput{
CompatibleKafkaVersions: []types.CompatibleKafkaVersion{
{
TargetVersions: []string{
"2.2.3",
"2.3.4.tiered",
},
},
expectedError: false,
expectedClusterCount: 1,
expectedVersionCandidates: []string{
"2.2.3",
"2.3.4",
},
},
{
name: "msk special versions",
initFunc: func(api *aws.MockMSKApi) {
api.On("ListClustersV2", mock.Anything).Return(
&kafka.ListClustersV2Output{
ClusterInfoList: []types.Cluster{
{
ClusterName: utils.Ptr("mycluster"),
ClusterArn: utils.Ptr("arn:myclusterarn"),
Provisioned: &types.Provisioned{
CurrentBrokerSoftwareInfo: &types.BrokerSoftwareInfo{
KafkaVersion: utils.Ptr("2.0.0"),
},
},
},
},
})

api.On("GetCompatibleKafkaVersions", mock.Anything).Return(
&kafka.GetCompatibleKafkaVersionsOutput{
CompatibleKafkaVersions: []types.CompatibleKafkaVersion{
{
TargetVersions: []string{
"2.2.3",
"2.3.4.tiered",
"3.7.x",
},
},
},
})
},
expectedError: false,
expectedClusterCount: 1,
expectedVersionCandidates: []string{
"2.2.3",
"2.3.4",
"3.7.0",
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
api := new(aws.MockMSKApi)
tc.initFunc(api)

source, err := NewSource(api, slog.Default(), &Config{})
if err != nil {
t.Error(err)
}
softwares, err := source.Load()
if err != nil {
t.Error(err)
}

if len(softwares) != tc.expectedClusterCount {
t.Errorf("expected %d cluster", tc.expectedClusterCount)
}

if len(softwares[0].VersionCandidates) != len(tc.expectedVersionCandidates) {
t.Errorf("expected %d version candidates, got %d", len(tc.expectedVersionCandidates), len(softwares[0].VersionCandidates))
}

for _, expectedCandidate := range tc.expectedVersionCandidates {
if !contains(softwares[0].VersionCandidates, expectedCandidate) {
t.Errorf("does not find version %s in result", expectedCandidate)
}
}
})
source, err := NewSource(api, slog.Default(), &Config{})
if err != nil {
t.Error(err)
}
_, err = source.Load()
if err != nil {
t.Error(err)
}

func contains(slice []software.Version, value string) bool {
for _, v := range slice {
if v.Version == value {
return true
}
}
return false
}

0 comments on commit 9190def

Please sign in to comment.