Skip to content

Commit 7f2f164

Browse files
committed
[wip] Add initial logic for the client
1 parent 1e4e0f7 commit 7f2f164

File tree

4 files changed

+391
-0
lines changed

4 files changed

+391
-0
lines changed

cmd/sync/client/client.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
7+
"github.com/adobe/cluster-registry/pkg/config"
8+
"github.com/adobe/cluster-registry/pkg/sqs"
9+
client "github.com/adobe/cluster-registry/pkg/sync/client"
10+
awssqs "github.com/aws/aws-sdk-go/service/sqs"
11+
"github.com/davecgh/go-spew/spew"
12+
"github.com/sirupsen/logrus"
13+
log "github.com/sirupsen/logrus"
14+
"github.com/spf13/cobra"
15+
)
16+
17+
var (
18+
logLevel, logFormat string
19+
appConfig *config.AppConfig
20+
namespace string
21+
//clusterName string
22+
//cfgFile string
23+
)
24+
25+
func InitCLI() *cobra.Command {
26+
27+
var rootCmd = &cobra.Command{
28+
Use: "cluster-registry-client-sync",
29+
Short: "Cluster Registry Sync Client is a service that keep in sync the cluster CRD",
30+
Long: "\nCluster Registry Sync Client is a service that creates or updates the cluster CRD based on the messages received from the Cluster Registry Sync manager",
31+
PersistentPreRun: loadAppConfig,
32+
Run: run,
33+
}
34+
35+
initFlags(rootCmd)
36+
37+
return rootCmd
38+
}
39+
40+
func initFlags(rootCmd *cobra.Command) {
41+
42+
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", logrus.DebugLevel.String(), "The verbosity level of the logs, can be [panic|fatal|error|warn|info|debug|trace]")
43+
rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text", "The output format of the logs, can be [text|json]")
44+
//rootCmd.PersistentFlags().StringVar(&cfgFile, "config-file", "", "The path to the configuration file")
45+
rootCmd.PersistentFlags().StringVar(&namespace, "namespace", "cluster-registry", "The namespace where cluster-registry-sync-client will run.")
46+
}
47+
48+
func loadAppConfig(cmd *cobra.Command, args []string) {
49+
50+
client.InitLogger(logLevel, logFormat)
51+
52+
log.Info("Starting the Cluster Registry Sync Client")
53+
54+
log.Info("Loading the configuration")
55+
appConfig, err := config.LoadSyncClientConfig()
56+
if err != nil {
57+
log.Error("Cannot load the cluster-registry-sync-client configuration:", err.Error())
58+
os.Exit(1)
59+
}
60+
log.Info("Config loaded successfully")
61+
log.Info("Cluster (custom resource) to be checked:", appConfig.ClusterName)
62+
}
63+
64+
func run(cmd *cobra.Command, args []string) {
65+
66+
log.Info("Cluster Registry Sync Client is running")
67+
68+
// Consume the messages from the queue using a sync consumer
69+
sqsInstance := sqs.NewSQS(appConfig)
70+
log.Info("Starting the SQS sync consumer")
71+
72+
handler := func(m *awssqs.Message) error {
73+
spew.Dump(m)
74+
// TODO
75+
return nil
76+
}
77+
syncConsumer := sqs.NewSyncConsumer(sqsInstance, appConfig, handler)
78+
go syncConsumer.Consume()
79+
80+
// Block the thread
81+
select {}
82+
}
83+
84+
func main() {
85+
86+
rootCmd := InitCLI()
87+
88+
// Execute the CLI application
89+
if err := rootCmd.Execute(); err != nil {
90+
fmt.Println(err)
91+
os.Exit(1)
92+
}
93+
94+
//TODO
95+
96+
}

pkg/sqs/synconsumer.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
Copyright 2024 Adobe. All rights reserved.
3+
This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License. You may obtain a copy
5+
of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under
8+
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
OF ANY KIND, either express or implied. See the License for the specific language
10+
governing permissions and limitations under the License.
11+
*/
12+
13+
package sqs
14+
15+
import (
16+
"sync"
17+
"time"
18+
19+
"github.com/aws/aws-sdk-go/aws"
20+
"github.com/aws/aws-sdk-go/service/sqs"
21+
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
22+
"github.com/labstack/gommon/log"
23+
24+
"github.com/adobe/cluster-registry/pkg/config"
25+
monitoring "github.com/adobe/cluster-registry/pkg/monitoring/apiserver"
26+
)
27+
28+
// consumer struct
29+
type synconsumer struct {
30+
sqs sqsiface.SQSAPI
31+
queueURL string
32+
workerPool int
33+
maxMessages int64
34+
pollWaitSeconds int64
35+
retrySeconds int
36+
messageHandler func(*sqs.Message) error
37+
}
38+
39+
// NewSyncConsumer - creates a new SQS message queue consumer
40+
// used by the sync consumer service
41+
// TODO: add metrics later
42+
func NewSyncConsumer(sqsSvc sqsiface.SQSAPI, appConfig *config.AppConfig, h func(*sqs.Message) error) Consumer {
43+
44+
urlResult, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{
45+
QueueName: &appConfig.SqsQueueName,
46+
})
47+
if err != nil {
48+
log.Fatal(err.Error())
49+
}
50+
51+
return &synconsumer{
52+
sqs: sqsSvc,
53+
queueURL: *urlResult.QueueUrl,
54+
workerPool: 10,
55+
maxMessages: 1,
56+
pollWaitSeconds: 1,
57+
retrySeconds: 5,
58+
messageHandler: h,
59+
}
60+
}
61+
62+
// Status verifies the status/connectivity of the sqs service
63+
func (c *synconsumer) Status(appConfig *config.AppConfig, m monitoring.MetricsI) error {
64+
_, err := c.sqs.GetQueueUrl(&sqs.GetQueueUrlInput{
65+
QueueName: &appConfig.SqsQueueName,
66+
})
67+
68+
if err != nil {
69+
log.Error(err.Error())
70+
}
71+
72+
return err
73+
}
74+
75+
// Consume - long pooling
76+
func (c *synconsumer) Consume() {
77+
var wg sync.WaitGroup
78+
79+
for w := 1; w <= c.workerPool; w++ {
80+
wg.Add(1)
81+
go func(w int) {
82+
defer wg.Done()
83+
c.worker(w)
84+
}(w)
85+
}
86+
wg.Wait()
87+
}
88+
89+
func (c *synconsumer) worker(id int) {
90+
for {
91+
output, err := c.sqs.ReceiveMessage((&sqs.ReceiveMessageInput{
92+
QueueUrl: &c.queueURL,
93+
AttributeNames: aws.StringSlice([]string{
94+
"ClusterName", "SentTimestamp",
95+
}),
96+
MaxNumberOfMessages: aws.Int64(c.maxMessages),
97+
WaitTimeSeconds: aws.Int64(c.pollWaitSeconds),
98+
}))
99+
100+
if err != nil {
101+
log.Error(err.Error())
102+
log.Info("Retrying in", c.retrySeconds, " seconds")
103+
time.Sleep(time.Duration(c.retrySeconds) * time.Second)
104+
continue
105+
}
106+
107+
for _, m := range output.Messages {
108+
log.Debug("Messsage ID: ", *m.MessageId)
109+
log.Debug("Message Body: ", *m.Body)
110+
111+
err := c.processMessage(m)
112+
113+
if err != nil {
114+
log.Error(err.Error())
115+
continue
116+
}
117+
err = c.delete(m)
118+
if err != nil {
119+
log.Error(err.Error())
120+
}
121+
}
122+
}
123+
}
124+
125+
// processMessage - process the recieved message
126+
func (c *synconsumer) processMessage(m *sqs.Message) error {
127+
128+
err := c.messageHandler(m)
129+
return err
130+
}
131+
132+
func (c *synconsumer) delete(m *sqs.Message) error {
133+
134+
_, err := c.sqs.DeleteMessage(
135+
&sqs.DeleteMessageInput{QueueUrl: &c.queueURL, ReceiptHandle: m.ReceiptHandle})
136+
137+
if err != nil {
138+
log.Error(err.Error())
139+
}
140+
return err
141+
}

pkg/sync/client/k8s.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
Copyright 2024 Adobe. All rights reserved.
3+
This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License. You may obtain a copy
5+
of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under
8+
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
OF ANY KIND, either express or implied. See the License for the specific language
10+
governing permissions and limitations under the License.
11+
*/
12+
13+
package client
14+
15+
import (
16+
"k8s.io/client-go/dynamic"
17+
"k8s.io/client-go/kubernetes"
18+
"sigs.k8s.io/controller-runtime/pkg/client/config"
19+
)
20+
21+
func getClientSet() (*kubernetes.Clientset, error) {
22+
cfg, err := config.GetConfig()
23+
if err != nil {
24+
return nil, err
25+
}
26+
27+
clientSet, err := kubernetes.NewForConfig(cfg)
28+
if err != nil {
29+
return nil, err
30+
}
31+
32+
return clientSet, nil
33+
}
34+
35+
func getDynamicClientSet() (*dynamic.DynamicClient, error) {
36+
cfg, err := config.GetConfig()
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
dnc, err := dynamic.NewForConfig(cfg)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
return dnc, nil
47+
}

pkg/sync/client/utils.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
Copyright 2024 Adobe. All rights reserved.
3+
This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License. You may obtain a copy
5+
of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
Unless required by applicable law or agreed to in writing, software distributed under
8+
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
OF ANY KIND, either express or implied. See the License for the specific language
10+
governing permissions and limitations under the License.
11+
*/
12+
13+
package client
14+
15+
import (
16+
"encoding/json"
17+
"io"
18+
"log"
19+
"os"
20+
"strings"
21+
22+
registryv1 "github.com/adobe/cluster-registry/pkg/api/registry/v1"
23+
"github.com/sirupsen/logrus"
24+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
25+
"sigs.k8s.io/yaml"
26+
)
27+
28+
func ReadFile(patchFilePath string) ([]byte, error) {
29+
patchFile, err := os.Open(patchFilePath)
30+
if err != nil {
31+
log.Fatalf("Error opening patch YAML file: %v", err)
32+
}
33+
defer patchFile.Close()
34+
35+
patchData, err := io.ReadAll(patchFile)
36+
if err != nil {
37+
log.Fatalf("Error reading patch YAML file: %v", err)
38+
}
39+
return patchData, nil
40+
}
41+
42+
func UnmarshalYaml(data []byte, cluster *[]registryv1.Cluster) error {
43+
err := yaml.Unmarshal(data, cluster)
44+
if err != nil {
45+
log.Panicf("Error while trying to unmarshal yaml data: %v", err.Error())
46+
}
47+
48+
return err
49+
}
50+
51+
func UnmarshalJSON(data []byte, cluster *[]registryv1.Cluster) error {
52+
err := json.Unmarshal(data, cluster)
53+
if err != nil {
54+
log.Panicf("Error while trying to unmarshal json data: %v", err.Error())
55+
}
56+
57+
return err
58+
}
59+
60+
func MarshalJson(patch map[string]interface{}) ([]byte, error) {
61+
jsonData, err := json.Marshal(patch)
62+
if err != nil {
63+
log.Panicf("Error while trying to marshal json data: %v", err.Error())
64+
}
65+
66+
return jsonData, err
67+
}
68+
69+
// toUnstructured converts a Cluster struct to an unstructured.Unstructured object
70+
func toUnstructured(obj interface{}) (*unstructured.Unstructured, error) {
71+
data, err := json.Marshal(obj)
72+
if err != nil {
73+
return nil, err
74+
}
75+
u := &unstructured.Unstructured{}
76+
if err := u.UnmarshalJSON(data); err != nil {
77+
return nil, err
78+
}
79+
return u, nil
80+
}
81+
82+
// TODO; check if there is an utils func - see if no need
83+
// unstructuredToJSON converts an unstructured.Unstructured object to a JSON string
84+
func unstructuredToJSON(obj *unstructured.Unstructured) ([]byte, error) {
85+
return obj.MarshalJSON()
86+
}
87+
88+
func InitLogger(logLevel string, logFormat string) {
89+
90+
level, err := logrus.ParseLevel(logLevel)
91+
if err != nil {
92+
level = logrus.DebugLevel
93+
}
94+
logrus.SetLevel(level)
95+
96+
logFormat = strings.ToLower(logFormat)
97+
if logFormat == "text" {
98+
logrus.SetFormatter(&logrus.TextFormatter{
99+
FullTimestamp: true,
100+
ForceColors: true,
101+
})
102+
} else {
103+
logrus.SetFormatter(&logrus.JSONFormatter{
104+
TimestampFormat: "2006-01-02 15:04:05",
105+
})
106+
}
107+
}

0 commit comments

Comments
 (0)