diff --git a/cmd/optimizely/main.go b/cmd/optimizely/main.go index d58f95b4..2ea00f88 100644 --- a/cmd/optimizely/main.go +++ b/cmd/optimizely/main.go @@ -33,6 +33,7 @@ import ( "github.com/spf13/viper" "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/handlers" "github.com/optimizely/agent/pkg/metrics" "github.com/optimizely/agent/pkg/optimizely" "github.com/optimizely/agent/pkg/routers" @@ -266,7 +267,10 @@ func main() { sdkMetricsRegistry := optimizely.NewRegistry(agentMetricsRegistry) ctx, cancel := context.WithCancel(context.Background()) // Create default service context - sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners + defer cancel() + ctx = context.WithValue(ctx, handlers.LoggerKey, &log.Logger) + + sg := server.NewGroup(ctx, conf.Server) // Create a new server group to manage the individual http listeners optlyCache := optimizely.NewCache(ctx, *conf, sdkMetricsRegistry) optlyCache.Init(conf.SDKKeys) @@ -286,7 +290,7 @@ func main() { log.Info().Str("version", conf.Version).Msg("Starting services.") sg.GoListenAndServe("api", conf.API.Port, apiRouter) - sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(optlyCache, conf.Webhook)) + sg.GoListenAndServe("webhook", conf.Webhook.Port, routers.NewWebhookRouter(ctx, optlyCache, *conf)) sg.GoListenAndServe("admin", conf.Admin.Port, adminRouter) // Admin should be added last. // wait for server group to shutdown diff --git a/config.yaml b/config.yaml index 87b872a0..431ddd6c 100644 --- a/config.yaml +++ b/config.yaml @@ -244,16 +244,20 @@ runtime: ## (For n>1 the details of sampling may change.) mutexProfileFraction: 0 -## synchronization should be enabled when multiple replicas of agent is deployed -## if notification synchronization is enabled, then the active notification event-stream API -## will get the notifications from multiple replicas +## synchronization should be enabled when features for multiple nodes like notification streaming are deployed synchronization: pubsub: redis: host: "redis.demo.svc:6379" password: "" database: 0 - channel: "optimizely-sync" + ## if notification synchronization is enabled, then the active notification event-stream API + ## will get the notifications from available replicas notification: enable: false default: "redis" + ## if datafile synchronization is enabled, then for each webhook API call + ## the datafile will be sent to all available replicas to achieve better eventual consistency + datafile: + enable: false + default: "redis" diff --git a/config/config.go b/config/config.go index 428c3992..67234cf8 100644 --- a/config/config.go +++ b/config/config.go @@ -135,7 +135,7 @@ func NewDefaultConfig() *AgentConfig { "channel": "optimizely-notifications", }, }, - Notification: NotificationConfig{ + Notification: FeatureSyncConfig{ Enable: false, Default: "redis", }, @@ -167,11 +167,12 @@ type AgentConfig struct { // SyncConfig contains Synchronization configuration for the multiple Agent nodes type SyncConfig struct { Pubsub map[string]interface{} `json:"pubsub"` - Notification NotificationConfig `json:"notification"` + Notification FeatureSyncConfig `json:"notification"` + Datafile FeatureSyncConfig `json:"datafile"` } -// NotificationConfig contains Notification Synchronization configuration for the multiple Agent nodes -type NotificationConfig struct { +// FeatureSyncConfig contains Notification Synchronization configuration for the multiple Agent nodes +type FeatureSyncConfig struct { Enable bool `json:"enable"` Default string `json:"default"` } diff --git a/pkg/handlers/notification.go b/pkg/handlers/notification.go index 5c391cf8..356a4f9a 100644 --- a/pkg/handlers/notification.go +++ b/pkg/handlers/notification.go @@ -25,7 +25,6 @@ import ( "net/http" "strings" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/middleware" "github.com/optimizely/agent/pkg/syncer" @@ -212,26 +211,22 @@ func DefaultNotificationReceiver(ctx context.Context) (<-chan syncer.Event, erro return messageChan, nil } -func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc { +func SyncedNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc { return func(ctx context.Context) (<-chan syncer.Event, error) { sdkKey, ok := ctx.Value(SDKKey).(string) if !ok || sdkKey == "" { return nil, errors.New("sdk key not found") } - redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, conf, sdkKey) + ncSyncer, err := syncer.NewSyncedNotificationCenter(ctx, sdkKey, conf) if err != nil { return nil, err } - client := redis.NewClient(&redis.Options{ - Addr: redisSyncer.Host, - Password: redisSyncer.Password, - DB: redisSyncer.Database, - }) - - // Subscribe to a Redis channel - pubsub := client.Subscribe(ctx, syncer.GetChannelForSDKKey(redisSyncer.Channel, sdkKey)) + eventCh, err := ncSyncer.Subscribe(ctx, syncer.GetChannelForSDKKey(syncer.PubSubDefaultChan, sdkKey)) + if err != nil { + return nil, err + } dataChan := make(chan syncer.Event) @@ -244,19 +239,12 @@ func RedisNotificationReceiver(conf config.SyncConfig) NotificationReceiverFunc for { select { case <-ctx.Done(): - client.Close() - pubsub.Close() + close(dataChan) logger.Debug().Msg("context canceled, redis notification receiver is closed") return - default: - msg, err := pubsub.ReceiveMessage(ctx) - if err != nil { - logger.Err(err).Msg("failed to receive message from redis") - continue - } - + case msg := <-eventCh: var event syncer.Event - if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil { + if err := json.Unmarshal([]byte(msg), &event); err != nil { logger.Err(err).Msg("failed to unmarshal redis message") continue } diff --git a/pkg/handlers/notification_test.go b/pkg/handlers/notification_test.go index 82d0b95e..b1e62ab5 100644 --- a/pkg/handlers/notification_test.go +++ b/pkg/handlers/notification_test.go @@ -216,7 +216,7 @@ func (suite *NotificationTestSuite) TestTrackAndProjectConfigWithSynchronization "database": 0, }, }, - Notification: config.NotificationConfig{ + Notification: config.FeatureSyncConfig{ Enable: true, Default: "redis", }, @@ -370,7 +370,7 @@ func TestRedisNotificationReceiver(t *testing.T) { "database": 0, }, }, - Notification: config.NotificationConfig{ + Notification: config.FeatureSyncConfig{ Enable: true, Default: "redis", }, @@ -407,7 +407,7 @@ func TestRedisNotificationReceiver(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := RedisNotificationReceiver(tt.args.conf) + got := SyncedNotificationReceiver(tt.args.conf) if reflect.TypeOf(got) != reflect.TypeOf(tt.want) { t.Errorf("RedisNotificationReceiver() = %v, want %v", got, tt.want) } diff --git a/pkg/handlers/webhook.go b/pkg/handlers/webhook.go index 86fc6ade..1e911216 100644 --- a/pkg/handlers/webhook.go +++ b/pkg/handlers/webhook.go @@ -18,11 +18,13 @@ package handlers import ( + "context" "crypto/hmac" "crypto/sha1" "crypto/subtle" "encoding/hex" "encoding/json" + "fmt" "io" "net/http" "strconv" @@ -30,9 +32,11 @@ import ( "github.com/optimizely/agent/config" "github.com/go-chi/render" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/optimizely/agent/pkg/optimizely" + "github.com/optimizely/agent/pkg/syncer" ) const signatureHeader = "X-Hub-Signature" @@ -56,15 +60,19 @@ type OptlyMessage struct { // OptlyWebhookHandler handles incoming messages from Optimizely type OptlyWebhookHandler struct { - optlyCache optimizely.Cache - ProjectMap map[int64]config.WebhookProject + optlyCache optimizely.Cache + ProjectMap map[int64]config.WebhookProject + configSyncer syncer.Syncer + syncEnabled bool } // NewWebhookHandler returns a new instance of OptlyWebhookHandler -func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject) *OptlyWebhookHandler { +func NewWebhookHandler(optlyCache optimizely.Cache, projectMap map[int64]config.WebhookProject, configSyncer syncer.Syncer) *OptlyWebhookHandler { return &OptlyWebhookHandler{ - optlyCache: optlyCache, - ProjectMap: projectMap, + optlyCache: optlyCache, + ProjectMap: projectMap, + syncEnabled: configSyncer != nil, + configSyncer: configSyncer, } } @@ -140,7 +148,47 @@ func (h *OptlyWebhookHandler) HandleWebhook(w http.ResponseWriter, r *http.Reque // Iterate through all SDK keys and update config for _, sdkKey := range webhookConfig.SDKKeys { - h.optlyCache.UpdateConfigs(sdkKey) + if h.syncEnabled { + if err := h.configSyncer.Sync(r.Context(), syncer.GetDatafileSyncChannel(), sdkKey); err != nil { + errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) + log.Error().Msg(errMsg) + h.optlyCache.UpdateConfigs(sdkKey) + } + } else { + h.optlyCache.UpdateConfigs(sdkKey) + } } w.WriteHeader(http.StatusNoContent) } + +func (h *OptlyWebhookHandler) StartSyncer(ctx context.Context) error { + logger, ok := ctx.Value(LoggerKey).(*zerolog.Logger) + if !ok { + logger = &log.Logger + } + + if !h.syncEnabled { + logger.Debug().Msg("datafile syncer is not enabled") + return nil + } + + dataCh, err := h.configSyncer.Subscribe(ctx, syncer.GetDatafileSyncChannel()) + if err != nil { + return err + } + + go func() { + for { + select { + case <-ctx.Done(): + logger.Debug().Msg("context canceled, syncer is stopped") + return + case key := <-dataCh: + h.optlyCache.UpdateConfigs(key) + logger.Info().Msgf("datafile synced successfully for sdkKey: %s", key) + } + } + }() + logger.Debug().Msg("datafile syncer is started") + return nil +} diff --git a/pkg/handlers/webhook_test.go b/pkg/handlers/webhook_test.go index 50eb81c7..221e3e0f 100644 --- a/pkg/handlers/webhook_test.go +++ b/pkg/handlers/webhook_test.go @@ -19,6 +19,7 @@ package handlers import ( "bytes" + "context" "encoding/json" "net/http" "net/http/httptest" @@ -46,6 +47,28 @@ func NewCache() *TestCache { } } +type TestDFSyncer struct { + syncCalled bool + subscribeCalled bool +} + +func NewTestDFSyncer() *TestDFSyncer { + return &TestDFSyncer{ + syncCalled: false, + subscribeCalled: false, + } +} + +func (t *TestDFSyncer) Sync(_ context.Context, _ string, _ string) error { + t.syncCalled = true + return nil +} + +func (t *TestDFSyncer) Subscribe(_ context.Context, _ string) (chan string, error) { + t.subscribeCalled = true + return make(chan string), nil +} + // GetClient returns a default OptlyClient for testing func (tc *TestCache) GetClient(sdkKey string) (*optimizely.OptlyClient, error) { return &optimizely.OptlyClient{ @@ -111,7 +134,7 @@ func TestHandleWebhookValidMessageInvalidSignature(t *testing.T) { Secret: "I am secret", }, } - optlyHandler := NewWebhookHandler(nil, testWebhookConfigs) + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, nil) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -146,7 +169,7 @@ func TestHandleWebhookSkippedCheckInvalidSignature(t *testing.T) { SkipSignatureCheck: true, }, } - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs) + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -181,7 +204,7 @@ func TestHandleWebhookValidMessage(t *testing.T) { Secret: "I am secret", }, } - optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs) + optlyHandler := NewWebhookHandler(testCache, testWebhookConfigs, nil) webhookMsg := OptlyMessage{ ProjectID: 42, Timestamp: 42424242, @@ -208,3 +231,55 @@ func TestHandleWebhookValidMessage(t *testing.T) { assert.Equal(t, http.StatusNoContent, rec.Code) assert.Equal(t, true, testCache.updateConfigsCalled) } + +func TestHandleWebhookWithDatafileSyncer(t *testing.T) { + var testWebhookConfigs = map[int64]config.WebhookProject{ + 42: { + SDKKeys: []string{"myDatafile"}, + Secret: "I am secret", + }, + } + syncer := NewTestDFSyncer() + + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer) + webhookMsg := OptlyMessage{ + ProjectID: 42, + Timestamp: 42424242, + Event: "project.datafile_updated", + Data: DatafileUpdateData{ + Revision: 101, + OriginURL: "origin.optimizely.com/datafiles/myDatafile", + CDNUrl: "cdn.optimizely.com/datafiles/myDatafile", + Environment: "Production", + }, + } + + validWebhookMessage, _ := json.Marshal(webhookMsg) + + req := httptest.NewRequest("POST", "/webhooks/optimizely", bytes.NewBuffer(validWebhookMessage)) + + // This sha1 has been computed from the Optimizely application + req.Header.Set(signatureHeader, "sha1=e0199de63fb7192634f52136d4ceb7dc6f191da3") + + rec := httptest.NewRecorder() + handler := http.HandlerFunc(optlyHandler.HandleWebhook) + handler.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusNoContent, rec.Code) + assert.Equal(t, true, syncer.syncCalled) +} + +func TestWebhookStartSyncer(t *testing.T) { + var testWebhookConfigs = map[int64]config.WebhookProject{ + 42: { + SDKKeys: []string{"myDatafile"}, + Secret: "I am secret", + }, + } + syncer := NewTestDFSyncer() + + optlyHandler := NewWebhookHandler(nil, testWebhookConfigs, syncer) + err := optlyHandler.StartSyncer(context.Background()) + assert.NoError(t, err) + assert.Equal(t, true, syncer.subscribeCalled) +} diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index f1401311..cef2f8f7 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -41,7 +41,6 @@ import ( odpCachePkg "github.com/optimizely/go-sdk/pkg/odp/cache" cmap "github.com/orcaman/concurrent-map" - "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) @@ -252,11 +251,12 @@ func defaultLoader( } if agentConf.Synchronization.Notification.Enable { - redisSyncer, err := syncer.NewRedisSyncer(&zerolog.Logger{}, agentConf.Synchronization, sdkKey) + syncedNC, err := syncer.NewSyncedNotificationCenter(context.Background(), sdkKey, agentConf.Synchronization) if err != nil { - return nil, err + log.Error().Err(err).Msgf("Failed to create SyncedNotificationCenter, reason: %s", err.Error()) + } else { + clientOptions = append(clientOptions, client.WithNotificationCenter(syncedNC)) } - clientOptions = append(clientOptions, client.WithNotificationCenter(redisSyncer)) } var clientUserProfileService decision.UserProfileService diff --git a/pkg/routers/api.go b/pkg/routers/api.go index 7eb15d6e..79df6103 100644 --- a/pkg/routers/api.go +++ b/pkg/routers/api.go @@ -85,7 +85,7 @@ func NewDefaultAPIRouter(optlyCache optimizely.Cache, conf config.AgentConfig, m if conf.API.EnableNotifications { nStreamHandler = handlers.NotificationEventStreamHandler(handlers.DefaultNotificationReceiver) if conf.Synchronization.Notification.Enable { - nStreamHandler = handlers.NotificationEventStreamHandler(handlers.RedisNotificationReceiver(conf.Synchronization)) + nStreamHandler = handlers.NotificationEventStreamHandler(handlers.SyncedNotificationReceiver(conf.Synchronization)) } } diff --git a/pkg/routers/webhook.go b/pkg/routers/webhook.go index 8d0af18a..50bb98ce 100644 --- a/pkg/routers/webhook.go +++ b/pkg/routers/webhook.go @@ -18,8 +18,13 @@ package routers import ( + "context" + "fmt" + "github.com/optimizely/agent/config" "github.com/optimizely/agent/pkg/handlers" + "github.com/optimizely/agent/pkg/syncer" + "github.com/rs/zerolog/log" "github.com/go-chi/chi/v5" chimw "github.com/go-chi/chi/v5/middleware" @@ -29,12 +34,30 @@ import ( ) // NewWebhookRouter returns HTTP API router -func NewWebhookRouter(optlyCache optimizely.Cache, conf config.WebhookConfig) *chi.Mux { +func NewWebhookRouter(ctx context.Context, optlyCache optimizely.Cache, conf config.AgentConfig) *chi.Mux { r := chi.NewRouter() r.Use(chimw.AllowContentType("application/json")) r.Use(render.SetContentType(render.ContentTypeJSON)) - webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Projects) + + var dfSyncer syncer.Syncer + + if conf.Synchronization.Datafile.Enable { + sc, err := syncer.NewDatafileSyncer(conf.Synchronization) + if err != nil { + errMsg := fmt.Sprintf("datafile synced failed. reason: %s", err.Error()) + log.Error().Msg(errMsg) + } else { + dfSyncer = sc + } + } + + webhookAPI := handlers.NewWebhookHandler(optlyCache, conf.Webhook.Projects, dfSyncer) + if conf.Synchronization.Datafile.Enable { + if err := webhookAPI.StartSyncer(ctx); err != nil { + log.Error().Msgf("failed to start datafile syncer: %s", err.Error()) + } + } r.Post("/webhooks/optimizely", webhookAPI.HandleWebhook) return r diff --git a/pkg/routers/webhook_test.go b/pkg/routers/webhook_test.go index 60f4daaa..ac9f3691 100644 --- a/pkg/routers/webhook_test.go +++ b/pkg/routers/webhook_test.go @@ -19,6 +19,7 @@ package routers import ( "bytes" + "context" "net/http" "net/http/httptest" "testing" @@ -29,8 +30,8 @@ import ( func TestWebhookAllowedContentTypeMiddleware(t *testing.T) { - conf := config.WebhookConfig{} - router := NewWebhookRouter(nil, conf) + conf := config.AgentConfig{} + router := NewWebhookRouter(context.Background(), nil, conf) // Testing unsupported content type body := " test@123.com " diff --git a/pkg/syncer/pubsub.go b/pkg/syncer/pubsub.go new file mode 100644 index 00000000..6436c03e --- /dev/null +++ b/pkg/syncer/pubsub.go @@ -0,0 +1,107 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package syncer provides synchronization across Agent nodes +package syncer + +import ( + "context" + "errors" + + "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/syncer/pubsub" +) + +const ( + // PubSubDefaultChan will be used as default pubsub channel name + PubSubDefaultChan = "optimizely-sync" + // PubSubRedis is the name of pubsub type of Redis + PubSubRedis = "redis" +) + +type SycnFeatureFlag string + +const ( + SyncFeatureFlagNotificaiton SycnFeatureFlag = "sync-feature-flag-notification" + SycnFeatureFlagDatafile SycnFeatureFlag = "sync-feature-flag-datafile" +) + +type PubSub interface { + Publish(ctx context.Context, channel string, message interface{}) error + Subscribe(ctx context.Context, channel string) (chan string, error) +} + +func newPubSub(conf config.SyncConfig, featureFlag SycnFeatureFlag) (PubSub, error) { + if featureFlag == SyncFeatureFlagNotificaiton { + if conf.Notification.Default == PubSubRedis { + return getPubSubRedis(conf) + } else { + return nil, errors.New("pubsub type not supported") + } + } else if featureFlag == SycnFeatureFlagDatafile { + if conf.Datafile.Default == PubSubRedis { + return getPubSubRedis(conf) + } else { + return nil, errors.New("pubsub type not supported") + } + } + return nil, errors.New("provided feature flag not supported") +} + +func getPubSubRedis(conf config.SyncConfig) (PubSub, error) { + pubsubConf, found := conf.Pubsub[PubSubRedis] + if !found { + return nil, errors.New("pubsub redis config not found") + } + + redisConf, ok := pubsubConf.(map[string]interface{}) + if !ok { + return nil, errors.New("pubsub redis config not valid") + } + + hostVal, found := redisConf["host"] + if !found { + return nil, errors.New("pubsub redis host not found") + } + host, ok := hostVal.(string) + if !ok { + return nil, errors.New("pubsub redis host not valid, host must be string") + } + + passwordVal, found := redisConf["password"] + if !found { + return nil, errors.New("pubsub redis password not found") + } + password, ok := passwordVal.(string) + if !ok { + return nil, errors.New("pubsub redis password not valid, password must be string") + } + + databaseVal, found := redisConf["database"] + if !found { + return nil, errors.New("pubsub redis database not found") + } + database, ok := databaseVal.(int) + if !ok { + return nil, errors.New("pubsub redis database not valid, database must be int") + } + + return &pubsub.Redis{ + Host: host, + Password: password, + Database: database, + }, nil +} diff --git a/pkg/syncer/pubsub/redis.go b/pkg/syncer/pubsub/redis.go new file mode 100644 index 00000000..c9251791 --- /dev/null +++ b/pkg/syncer/pubsub/redis.go @@ -0,0 +1,75 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package pubsub provides pubsub functionality for the agent syncer +package pubsub + +import ( + "context" + + "github.com/go-redis/redis/v8" +) + +type Redis struct { + Host string + Password string + Database int +} + +func (r *Redis) Publish(ctx context.Context, channel string, message interface{}) error { + client := redis.NewClient(&redis.Options{ + Addr: r.Host, + Password: r.Password, + DB: r.Database, + }) + defer client.Close() + + return client.Publish(ctx, channel, message).Err() +} + +func (r *Redis) Subscribe(ctx context.Context, channel string) (chan string, error) { + client := redis.NewClient(&redis.Options{ + Addr: r.Host, + Password: r.Password, + DB: r.Database, + }) + + // Subscribe to a Redis channel + pubsub := client.Subscribe(ctx, channel) + + ch := make(chan string) + + go func() { + for { + select { + case <-ctx.Done(): + pubsub.Close() + client.Close() + close(ch) + return + default: + msg, err := pubsub.ReceiveMessage(ctx) + if err != nil { + continue + } + + ch <- msg.Payload + + } + } + }() + return ch, nil +} diff --git a/pkg/syncer/pubsub_test.go b/pkg/syncer/pubsub_test.go new file mode 100644 index 00000000..31b3dc1d --- /dev/null +++ b/pkg/syncer/pubsub_test.go @@ -0,0 +1,276 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package syncer provides synchronization across Agent nodes +package syncer + +import ( + "reflect" + "testing" + + "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/syncer/pubsub" +) + +func TestNewPubSub(t *testing.T) { + type args struct { + conf config.SyncConfig + flag SycnFeatureFlag + } + tests := []struct { + name string + args args + want PubSub + wantErr bool + }{ + { + name: "Test with valid config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, + }, + wantErr: false, + }, + { + name: "Test with valid config for datafile", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Datafile: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SycnFeatureFlagDatafile, + }, + want: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, + }, + wantErr: false, + }, + { + name: "Test with invalid config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "nopt-redis": map[string]interface{}{}, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with nil config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": nil, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with empty config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": nil, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": 123, + "password": "", + "database": "invalid-db", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config without host", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config without password", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config without db", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config with invalid password", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": 1234, + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with invalid redis config with invalid database", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": "invalid-db", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + flag: SyncFeatureFlagNotificaiton, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := newPubSub(tt.args.conf, tt.args.flag) + if (err != nil) != tt.wantErr { + t.Errorf("NewPubSub() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewPubSub() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index aa102188..93ad6fb4 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -20,120 +20,88 @@ package syncer import ( "context" "encoding/json" - "errors" "fmt" "sync" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/optimizely/go-sdk/pkg/notification" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" ) const ( - // PubSubDefaultChan will be used as default pubsub channel name - PubSubDefaultChan = "optimizely-sync" - // PubSubRedis is the name of pubsub type of Redis - PubSubRedis = "redis" + LoggerCtxKey = "syncer-logger" ) var ( - ncCache = make(map[string]*RedisSyncer) + ncCache = make(map[string]NotificationSyncer) mutexLock = &sync.Mutex{} ) +type NotificationSyncer interface { + notification.Center + Subscribe(ctx context.Context, channel string) (chan string, error) +} + +type Syncer interface { + Sync(ctx context.Context, channel string, sdkKey string) error + Subscribe(ctx context.Context, channel string) (chan string, error) +} + +// RedisSyncer defines Redis pubsub configuration +type SyncedNotificationCenter struct { + ctx context.Context + logger *zerolog.Logger + sdkKey string + pubsub PubSub +} + // Event holds the notification event with it's type type Event struct { Type notification.Type `json:"type"` Message interface{} `json:"message"` } -// RedisSyncer defines Redis pubsub configuration -type RedisSyncer struct { - ctx context.Context - Host string - Password string - Database int - Channel string - logger *zerolog.Logger - sdkKey string -} - -// NewRedisSyncer returns an instance of RedisNotificationSyncer -func NewRedisSyncer(logger *zerolog.Logger, conf config.SyncConfig, sdkKey string) (*RedisSyncer, error) { +func NewSyncedNotificationCenter(ctx context.Context, sdkKey string, conf config.SyncConfig) (NotificationSyncer, error) { mutexLock.Lock() defer mutexLock.Unlock() - if nc, found := ncCache[sdkKey]; found { + if nc, ok := ncCache[sdkKey]; ok { return nc, nil } - if !conf.Notification.Enable { - return nil, errors.New("notification syncer is not enabled") - } - if conf.Notification.Default != PubSubRedis { - return nil, errors.New("redis syncer is not set as default") - } - if conf.Pubsub == nil { - return nil, errors.New("redis config is not given") - } - - redisConfig, found := conf.Pubsub[PubSubRedis].(map[string]interface{}) - if !found { - return nil, errors.New("redis pubsub config not found") - } - - host, ok := redisConfig["host"].(string) - if !ok { - return nil, errors.New("redis host not provided in correct format") - } - password, ok := redisConfig["password"].(string) - if !ok { - return nil, errors.New("redis password not provider in correct format") - } - database, ok := redisConfig["database"].(int) - if !ok { - return nil, errors.New("redis database not provided in correct format") - } - channel, ok := redisConfig["channel"].(string) + logger, ok := ctx.Value(LoggerCtxKey).(*zerolog.Logger) if !ok { - channel = PubSubDefaultChan + logger = &log.Logger } - if logger == nil { - logger = &zerolog.Logger{} + pubsub, err := newPubSub(conf, SyncFeatureFlagNotificaiton) + if err != nil { + return nil, err } - nc := &RedisSyncer{ - ctx: context.Background(), - Host: host, - Password: password, - Database: database, - Channel: channel, - logger: logger, - sdkKey: sdkKey, + nc := &SyncedNotificationCenter{ + ctx: ctx, + logger: logger, + sdkKey: sdkKey, + pubsub: pubsub, } ncCache[sdkKey] = nc return nc, nil } -func (r *RedisSyncer) WithContext(ctx context.Context) *RedisSyncer { - r.ctx = ctx - return r -} - // AddHandler is empty but needed to implement notification.Center interface -func (r *RedisSyncer) AddHandler(_ notification.Type, _ func(interface{})) (int, error) { +func (r *SyncedNotificationCenter) AddHandler(_ notification.Type, _ func(interface{})) (int, error) { return 0, nil } // RemoveHandler is empty but needed to implement notification.Center interface -func (r *RedisSyncer) RemoveHandler(_ int, t notification.Type) error { +func (r *SyncedNotificationCenter) RemoveHandler(_ int, t notification.Type) error { return nil } // Send will send the notification to the specified channel in the Redis pubsub -func (r *RedisSyncer) Send(t notification.Type, n interface{}) error { +func (r *SyncedNotificationCenter) Send(t notification.Type, n interface{}) error { event := Event{ Type: t, Message: n, @@ -144,21 +112,40 @@ func (r *RedisSyncer) Send(t notification.Type, n interface{}) error { return err } - client := redis.NewClient(&redis.Options{ - Addr: r.Host, - Password: r.Password, - DB: r.Database, - }) - defer client.Close() - channel := GetChannelForSDKKey(r.Channel, r.sdkKey) + return r.pubsub.Publish(r.ctx, GetChannelForSDKKey(PubSubDefaultChan, r.sdkKey), jsonEvent) +} - if err := client.Publish(r.ctx, channel, jsonEvent).Err(); err != nil { - r.logger.Err(err).Msg("failed to publish json event to pub/sub") - return err - } - return nil +func (r *SyncedNotificationCenter) Subscribe(ctx context.Context, channel string) (chan string, error) { + return r.pubsub.Subscribe(ctx, channel) +} + +func GetDatafileSyncChannel() string { + return fmt.Sprintf("%s-datafile", PubSubDefaultChan) } func GetChannelForSDKKey(channel, key string) string { return fmt.Sprintf("%s-%s", channel, key) } + +type DatafileSyncer struct { + pubsub PubSub +} + +func NewDatafileSyncer(conf config.SyncConfig) (*DatafileSyncer, error) { + pubsub, err := newPubSub(conf, SycnFeatureFlagDatafile) + if err != nil { + return nil, err + } + + return &DatafileSyncer{ + pubsub: pubsub, + }, nil +} + +func (r *DatafileSyncer) Sync(ctx context.Context, channel string, sdkKey string) error { + return r.pubsub.Publish(ctx, channel, sdkKey) +} + +func (r *DatafileSyncer) Subscribe(ctx context.Context, channel string) (chan string, error) { + return r.pubsub.Subscribe(ctx, channel) +} diff --git a/pkg/syncer/syncer_test.go b/pkg/syncer/syncer_test.go new file mode 100644 index 00000000..c03d6553 --- /dev/null +++ b/pkg/syncer/syncer_test.go @@ -0,0 +1,386 @@ +/**************************************************************************** + * Copyright 2023 Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package syncer provides synchronization across Agent nodes +package syncer + +import ( + "context" + "reflect" + "testing" + + "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/syncer/pubsub" + "github.com/optimizely/go-sdk/pkg/notification" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/stretchr/testify/assert" +) + +type testPubSub struct { + publishCalled bool + subscribeCalled bool +} + +func (r *testPubSub) Publish(ctx context.Context, channel string, message interface{}) error { + r.publishCalled = true + return nil +} + +func (r *testPubSub) Subscribe(ctx context.Context, channel string) (chan string, error) { + r.subscribeCalled = true + return nil, nil +} + +func TestNewSyncedNotificationCenter(t *testing.T) { + type args struct { + ctx context.Context + sdkKey string + conf config.SyncConfig + } + tests := []struct { + name string + args args + want NotificationSyncer + wantErr bool + }{ + { + name: "Test with valid config", + args: args{ + ctx: context.Background(), + sdkKey: "123", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: &SyncedNotificationCenter{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "123", + pubsub: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, + }, + }, + wantErr: false, + }, + { + name: "Test with invalid sync config", + args: args{ + ctx: context.Background(), + sdkKey: "1234", + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "not-redis": map[string]interface{}{ + "host": "invalid host", + }, + }, + Notification: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "Test with empty sync config", + args: args{ + ctx: context.Background(), + sdkKey: "1234", + conf: config.SyncConfig{}, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewSyncedNotificationCenter(tt.args.ctx, tt.args.sdkKey, tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("NewSyncedNotificationCenter() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewSyncedNotificationCenter() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewDatafileSyncer(t *testing.T) { + type args struct { + conf config.SyncConfig + } + tests := []struct { + name string + args args + want *DatafileSyncer + wantErr bool + }{ + { + name: "Test with valid config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + "database": 0, + }, + }, + Datafile: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: &DatafileSyncer{ + pubsub: &pubsub.Redis{ + Host: "localhost:6379", + Password: "", + Database: 0, + }, + }, + wantErr: false, + }, + { + name: "Test with invalid sync config", + args: args{ + conf: config.SyncConfig{ + Pubsub: map[string]interface{}{ + "not-redis": map[string]interface{}{ + "host": "invalid host", + }, + }, + Datafile: config.FeatureSyncConfig{ + Default: "redis", + Enable: true, + }, + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewDatafileSyncer(tt.args.conf) + if (err != nil) != tt.wantErr { + t.Errorf("NewDatafileSyncer() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewDatafileSyncer() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDatafileSyncer_Sync(t *testing.T) { + type fields struct { + pubsub PubSub + } + type args struct { + ctx context.Context + channel string + sdkKey string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test datafile sync", + fields: fields{ + pubsub: &testPubSub{}, + }, + args: args{ + ctx: context.Background(), + channel: "test-ch", + sdkKey: "123", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &DatafileSyncer{ + pubsub: tt.fields.pubsub, + } + if err := r.Sync(tt.args.ctx, tt.args.channel, tt.args.sdkKey); (err != nil) != tt.wantErr { + t.Errorf("DatafileSyncer.Sync() error = %v, wantErr %v", err, tt.wantErr) + } + + assert.True(t, tt.fields.pubsub.(*testPubSub).publishCalled) + }) + } +} + +func TestDatafileSyncer_Subscribe(t *testing.T) { + type fields struct { + pubsub PubSub + } + type args struct { + ctx context.Context + channel string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test datafile sync", + fields: fields{ + pubsub: &testPubSub{}, + }, + args: args{ + ctx: context.Background(), + channel: "test-ch", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &DatafileSyncer{ + pubsub: tt.fields.pubsub, + } + _, err := r.Subscribe(tt.args.ctx, tt.args.channel) + if (err != nil) != tt.wantErr { + t.Errorf("DatafileSyncer.Subscribe() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.True(t, tt.fields.pubsub.(*testPubSub).subscribeCalled) + }) + } +} + +func TestSyncedNotificationCenter_Send(t *testing.T) { + type fields struct { + ctx context.Context + logger *zerolog.Logger + sdkKey string + pubsub PubSub + } + type args struct { + t notification.Type + n interface{} + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test notification send", + fields: fields{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "123", + pubsub: &testPubSub{}, + }, + args: args{ + t: notification.Decision, + n: "test", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &SyncedNotificationCenter{ + ctx: tt.fields.ctx, + logger: tt.fields.logger, + sdkKey: tt.fields.sdkKey, + pubsub: tt.fields.pubsub, + } + if err := r.Send(tt.args.t, tt.args.n); (err != nil) != tt.wantErr { + t.Errorf("SyncedNotificationCenter.Send() error = %v, wantErr %v", err, tt.wantErr) + } + + assert.True(t, tt.fields.pubsub.(*testPubSub).publishCalled) + }) + } +} + +func TestSyncedNotificationCenter_Subscribe(t *testing.T) { + type fields struct { + ctx context.Context + logger *zerolog.Logger + sdkKey string + pubsub PubSub + } + type args struct { + ctx context.Context + channel string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Test notification send", + fields: fields{ + ctx: context.Background(), + logger: &log.Logger, + sdkKey: "123", + pubsub: &testPubSub{}, + }, + args: args{ + ctx: context.Background(), + channel: "test-ch", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &SyncedNotificationCenter{ + ctx: tt.fields.ctx, + logger: tt.fields.logger, + sdkKey: tt.fields.sdkKey, + pubsub: tt.fields.pubsub, + } + _, err := r.Subscribe(tt.args.ctx, tt.args.channel) + if (err != nil) != tt.wantErr { + t.Errorf("SyncedNotificationCenter.Subscribe() error = %v, wantErr %v", err, tt.wantErr) + return + } + + assert.True(t, tt.fields.pubsub.(*testPubSub).subscribeCalled) + }) + } +}