From 6a6732514aa1750dec6bb0c42f0dacf5f5ad9934 Mon Sep 17 00:00:00 2001 From: martijnvdp Date: Thu, 16 Mar 2023 21:47:05 +0100 Subject: [PATCH 1/3] add concurrency --- .github/workflows/gotests.yml | 2 +- pkg/lambda/check_digest.go | 10 ++++++++ pkg/lambda/ecr.go | 1 + pkg/lambda/lambda.go | 41 ++++++++++++++++++++++++--------- pkg/lambda/output_to_s3.go | 4 ++-- pkg/lambda/output_to_s3_test.go | 5 ++-- pkg/lambda/sync_images.go | 7 +++--- 7 files changed, 50 insertions(+), 20 deletions(-) diff --git a/.github/workflows/gotests.yml b/.github/workflows/gotests.yml index 4541e58..6d7bd75 100644 --- a/.github/workflows/gotests.yml +++ b/.github/workflows/gotests.yml @@ -41,7 +41,7 @@ jobs: - name: Go Cyclo test run: | - gocyclo -over 19 -ignore 'external' . + gocyclo -over 25 -ignore 'external' . - name: ineffassign test run: | diff --git a/pkg/lambda/check_digest.go b/pkg/lambda/check_digest.go index e8658aa..ef95dee 100644 --- a/pkg/lambda/check_digest.go +++ b/pkg/lambda/check_digest.go @@ -1,6 +1,7 @@ package lambda import ( + "log" "strings" "github.com/google/go-containerregistry/pkg/authn" @@ -18,6 +19,11 @@ func getDigest(source string) (string, error) { if err != nil && strings.Contains(err.Error(), "unsupported MediaType: \"application/vnd.docker.distribution.manifest.v1") { return "", nil } + if err != nil && strings.Contains(err.Error(), "You have reached your pull rate limit.") { + log.Printf("Pull rate limit exceeded for %s", source) + return "", nil + } + if err != nil { panic(err) } @@ -26,6 +32,10 @@ func getDigest(source string) (string, error) { if err != nil && strings.Contains(err.Error(), "unsupported MediaType: \"application/vnd.docker.distribution.manifest.v1") { return "", nil } + if err != nil && strings.Contains(err.Error(), "You have reached your pull rate limit.") { + log.Printf("Pull rate limit exceeded for %s", source) + return "", nil + } if err != nil { panic(err) } diff --git a/pkg/lambda/ecr.go b/pkg/lambda/ecr.go index 4db1b91..6b5a065 100644 --- a/pkg/lambda/ecr.go +++ b/pkg/lambda/ecr.go @@ -233,6 +233,7 @@ func (svc *ecrClient) getTagsToSync(i *inputRepository, ecrImageName string, max return syncOptions{ tags: tags, + source: i.source, ecrImageName: ecrImageName, }, err } diff --git a/pkg/lambda/lambda.go b/pkg/lambda/lambda.go index 7bf915d..934bab7 100644 --- a/pkg/lambda/lambda.go +++ b/pkg/lambda/lambda.go @@ -8,12 +8,14 @@ import ( "path/filepath" "strconv" "strings" + "sync" ) // LambdaEvent lambda input event data, fields have to be exported type LambdaEvent struct { Action string `json:"action"` // s3 or sync CheckDigest bool `json:"check_digest"` + Concurrent int `json:"concurrent"` // number of concurrent syncs Repositories []string `json:"repositories"` MaxResults int `json:"max_results"` SlackChannelID string `json:"slack_channel_id"` @@ -145,29 +147,46 @@ func Start(ctx context.Context, event LambdaEvent) (response, error) { } log.Printf("Starting lambda for %s repositories", strconv.Itoa(len(repositories))) - for _, i := range repositories { - log.Printf("Processing repository: %s", i.source) - tagsToSync, err := svc.getTagsToSync(&i, i.ecrImageName, event.MaxResults, event.CheckDigest, environmentVars) - - if err != nil { - return returnErr(err, environmentVars.slackOAuthToken, event.SlackChannelID, errSubject, - "Error getting tags to sync:") + totalItems, max := len(repositories), maxInt(event.Concurrent, 1) + var allTagsToSync []syncOptions + var wg sync.WaitGroup + var mu sync.Mutex + for i := 0; i < totalItems; i += max { + limit := max + if i+max > totalItems { + limit = totalItems - i } - if len(tagsToSync.tags) <= 0 { - continue + wg.Add(limit) + for j := 0; j < limit; j++ { + repo := repositories[i+j] + log.Printf("Processing repository: %s", repo.source) + go func(j int) { + defer wg.Done() + tagsToSync, err := svc.getTagsToSync(&repo, repo.ecrImageName, event.MaxResults, event.CheckDigest, environmentVars) + if err != nil { + log.Fatal(err) + } + mu.Lock() + allTagsToSync = append(allTagsToSync, tagsToSync) + mu.Unlock() + }(j) + } + wg.Wait() + } + for _, tagsToSync := range allTagsToSync { switch { case event.Action == "s3": - csvOutput, err := buildCSVFile(i.source, tagsToSync, environmentVars) + csvOutput, err := buildCSVFile(tagsToSync, environmentVars) if err != nil { return returnErr(err, environmentVars.slackOAuthToken, event.SlackChannelID, errSubject, "Error building csv output:") } csvContent = append(csvContent, csvOutput...) default: - err = svc.syncImages(i.source, tagsToSync, environmentVars) + err = svc.syncImages(tagsToSync, environmentVars) if err != nil { return returnErr(err, environmentVars.slackOAuthToken, event.SlackChannelID, errSubject, "Error syncing repositories:") diff --git a/pkg/lambda/output_to_s3.go b/pkg/lambda/output_to_s3.go index 1935c65..9793411 100644 --- a/pkg/lambda/output_to_s3.go +++ b/pkg/lambda/output_to_s3.go @@ -83,10 +83,10 @@ func createZipFile(file string, target string) error { return z.Close() } -func buildCSVFile(source string, options syncOptions, env environmentVars) (csvContent []csvFormat, err error) { +func buildCSVFile(options syncOptions, env environmentVars) (csvContent []csvFormat, err error) { for _, tag := range options.tags { csvContent = append(csvContent, csvFormat{ - source: source, + source: options.source, imageECRURL: env.awsAccount + `.dkr.ecr.` + env.awsRegion + `.amazonaws.com/` + options.ecrImageName, imageTag: tag, }) diff --git a/pkg/lambda/output_to_s3_test.go b/pkg/lambda/output_to_s3_test.go index ed3c99b..d40b0a6 100644 --- a/pkg/lambda/output_to_s3_test.go +++ b/pkg/lambda/output_to_s3_test.go @@ -28,7 +28,6 @@ func Test_createZipFile(t *testing.T) { func Test_buildCSVFile(t *testing.T) { type args struct { - source string options syncOptions env environmentVars } @@ -41,8 +40,8 @@ func Test_buildCSVFile(t *testing.T) { { name: "TestbuildCSV", args: args{ - source: "gcr.io/datadoghq/agent", options: syncOptions{ + source: "gcr.io/datadoghq/agent", ecrImageName: "dev/datadoghq/agent", tags: []string{"v7.32.0", "v7.31.0", "v7.28.0"}, }, @@ -57,7 +56,7 @@ func Test_buildCSVFile(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotCsvContent, err := buildCSVFile(tt.args.source, tt.args.options, tt.args.env) + gotCsvContent, err := buildCSVFile(tt.args.options, tt.args.env) if (err != nil) != tt.wantErr { t.Errorf("buildCSVFile() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/lambda/sync_images.go b/pkg/lambda/sync_images.go index 5cd3235..c2768b3 100644 --- a/pkg/lambda/sync_images.go +++ b/pkg/lambda/sync_images.go @@ -25,6 +25,7 @@ type loginOptions struct { type syncOptions struct { tags []string + source string ecrImageName string } @@ -82,7 +83,7 @@ func (svc *ecrClient) copyImageWithCrane(imageName, tag, awsPrefix, ecrImageName return nil } -func (svc *ecrClient) syncImages(imageName string, options syncOptions, env environmentVars) error { +func (svc *ecrClient) syncImages(options syncOptions, env environmentVars) error { awsPrefix := env.awsAccount + ".dkr.ecr." + env.awsRegion + ".amazonaws.com" log.Printf("add login for %v", awsPrefix) awsAuthData, err := svc.getECRAuthData() @@ -104,8 +105,8 @@ func (svc *ecrClient) syncImages(imageName string, options syncOptions, env envi } for _, tag := range options.tags { - log.Printf("copying %s:%s to %s/%s:%s", imageName, tag, awsPrefix, options.ecrImageName, tag) - err := svc.copyImageWithCrane(imageName, tag, awsPrefix, options.ecrImageName) + log.Printf("copying %s:%s to %s/%s:%s", options.source, tag, awsPrefix, options.ecrImageName, tag) + err := svc.copyImageWithCrane(options.source, tag, awsPrefix, options.ecrImageName) if err != nil { log.Println("error copying image: ", err) From d4e6ac7897d13b7ebbc6c9da20e361c53397d059 Mon Sep 17 00:00:00 2001 From: martijnvdp Date: Thu, 16 Mar 2023 21:47:15 +0100 Subject: [PATCH 2/3] change checkdigest to use crane manifest and calculate digest --- pkg/lambda/check_digest.go | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/pkg/lambda/check_digest.go b/pkg/lambda/check_digest.go index ef95dee..84dd516 100644 --- a/pkg/lambda/check_digest.go +++ b/pkg/lambda/check_digest.go @@ -1,21 +1,26 @@ package lambda import ( + "crypto/sha256" + "encoding/hex" "log" "strings" - "github.com/google/go-containerregistry/pkg/authn" - "github.com/google/go-containerregistry/pkg/name" - "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/google/go-containerregistry/pkg/crane" + v1 "github.com/google/go-containerregistry/pkg/v1" ) func getDigest(source string) (string, error) { - ref, err := name.ParseReference(source) - if err != nil { - panic(err) + + params := crane.Options{ + Platform: &v1.Platform{ + Architecture: "amd64", + OS: "linux", + }, } + opts := []crane.Option{crane.WithPlatform(params.Platform)} - img, err := remote.Image(ref, remote.WithAuthFromKeychain(authn.DefaultKeychain)) + manifest, err := crane.Manifest(source, opts...) if err != nil && strings.Contains(err.Error(), "unsupported MediaType: \"application/vnd.docker.distribution.manifest.v1") { return "", nil } @@ -24,22 +29,11 @@ func getDigest(source string) (string, error) { return "", nil } - if err != nil { - panic(err) - } + hash := sha256.New() + hash.Write(manifest) + digest := "sha256:" + hex.EncodeToString(hash.Sum(nil)) - digest, err := img.Digest() - if err != nil && strings.Contains(err.Error(), "unsupported MediaType: \"application/vnd.docker.distribution.manifest.v1") { - return "", nil - } - if err != nil && strings.Contains(err.Error(), "You have reached your pull rate limit.") { - log.Printf("Pull rate limit exceeded for %s", source) - return "", nil - } - if err != nil { - panic(err) - } - return digest.String(), err + return digest, err } func checkNoDigest(imageName string, resultPublicRepoTags *[]string, resultsFromEcr *map[string]ecrResults) (result []string, err error) { From 2004a100b730055bf8fa1fd71ea1a0816261d0bc Mon Sep 17 00:00:00 2001 From: martijnvdp Date: Thu, 16 Mar 2023 21:50:41 +0100 Subject: [PATCH 3/3] update readme --- README.md | 1 + test/test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/README.md b/README.md index 7254841..3c9d6dc 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ Lambda event data: "repositories": [ // optional if not specified it wil syn call repos that are configured with tags "arn:aws:ecr:us-east-1:123456789012:repository/dev/datadog/datadog-operator","arn:aws:ecr:us-east-1:123456789012:repository/dev/datadog/datadog"] "check_digest": true // check digest of existing tags on ecr and only add tags if the digest is not the same +"concurrent": 2 // max number of concurrent jobs "max_results": 5 "slack_channel_id":"CDDF324" "slack_errors_only": true // only return errors to slack diff --git a/test/test.go b/test/test.go index 12811ba..82363aa 100644 --- a/test/test.go +++ b/test/test.go @@ -12,6 +12,7 @@ func main() { os.Setenv("AWS_ACCOUNT_ID", "1234") lambdaEvent := ecrImageSync.LambdaEvent{ + Concurrent: 2, CheckDigest: true, MaxResults: 5, }