From 1160a1f132591d22a49fbe00692278400bfffadf Mon Sep 17 00:00:00 2001 From: yeebing Date: Wed, 14 Jun 2023 16:23:37 +0800 Subject: [PATCH] refactor: refactor nacos configstore component. --- .../nacos/change_listener_test.go | 3 +- components/configstores/nacos/config_test.go | 3 +- .../nacos/{nacos.go => configstore.go} | 406 +++++++++--------- .../{nacos_test.go => configstore_test.go} | 15 +- components/configstores/nacos/const.go | 18 +- 5 files changed, 224 insertions(+), 221 deletions(-) rename components/configstores/nacos/{nacos.go => configstore.go} (78%) rename components/configstores/nacos/{nacos_test.go => configstore_test.go} (98%) diff --git a/components/configstores/nacos/change_listener_test.go b/components/configstores/nacos/change_listener_test.go index 7c599e2a16..1ebac80553 100644 --- a/components/configstores/nacos/change_listener_test.go +++ b/components/configstores/nacos/change_listener_test.go @@ -14,8 +14,9 @@ package nacos import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func Test_newSubscriberHolder(t *testing.T) { diff --git a/components/configstores/nacos/config_test.go b/components/configstores/nacos/config_test.go index e561087b2d..390b8e8028 100644 --- a/components/configstores/nacos/config_test.go +++ b/components/configstores/nacos/config_test.go @@ -14,8 +14,9 @@ package nacos import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestParseNacosMetadata(t *testing.T) { diff --git a/components/configstores/nacos/nacos.go b/components/configstores/nacos/configstore.go similarity index 78% rename from components/configstores/nacos/nacos.go rename to components/configstores/nacos/configstore.go index c5a0b255a8..7f6fa02ea2 100644 --- a/components/configstores/nacos/nacos.go +++ b/components/configstores/nacos/configstore.go @@ -17,33 +17,32 @@ import ( "context" "encoding/json" "errors" + "strconv" + "strings" + "time" + "github.com/nacos-group/nacos-sdk-go/v2/clients" "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" "github.com/nacos-group/nacos-sdk-go/v2/common/constant" nacoslog "github.com/nacos-group/nacos-sdk-go/v2/common/logger" "github.com/nacos-group/nacos-sdk-go/v2/vo" - "mosn.io/layotto/components/configstores" "mosn.io/pkg/log" - "strconv" - "strings" - "time" + + "mosn.io/layotto/components/configstores" ) -type NacosConfigStore struct { +type ConfigStore struct { client config_client.IConfigClient - storeName string - appName string - logDir string - cacheDir string - addresses []string - namespaceId string listener *subscriberHolder tagGroup string delimiter string + storeName string + appName string + namespaceId string } func NewStore() configstores.Store { - return &NacosConfigStore{ + return &ConfigStore{ listener: newSubscriberHolder(), delimiter: defaultDelimiter, tagGroup: defaultTagGroup, @@ -51,7 +50,7 @@ func NewStore() configstores.Store { } // Init SetConfig the configuration store. -func (n *NacosConfigStore) Init(config *configstores.StoreConfig) (err error) { +func (n *ConfigStore) Init(config *configstores.StoreConfig) (err error) { // 1.parse the config if config == nil { return errors.New("configuration illegal:no config data") @@ -103,7 +102,6 @@ func (n *NacosConfigStore) Init(config *configstores.StoreConfig) (err error) { } sc := *constant.NewServerConfig(ip, uint64(port)) serverConfigs = append(serverConfigs, sc) - n.addresses = append(n.addresses, v) } // 3.create client config @@ -142,7 +140,7 @@ func (n *NacosConfigStore) Init(config *configstores.StoreConfig) (err error) { } // Get gets configuration from configuration store. -func (n *NacosConfigStore) Get(ctx context.Context, request *configstores.GetRequest) ([]*configstores.ConfigurationItem, error) { +func (n *ConfigStore) Get(ctx context.Context, request *configstores.GetRequest) ([]*configstores.ConfigurationItem, error) { // todo: Nacos supports pagination to obtain data, and we can support this feature through metadata // use the configuration's app_name instead of the app_id in request // 0. check if illegal @@ -172,7 +170,7 @@ func (n *NacosConfigStore) Get(ctx context.Context, request *configstores.GetReq // 4. get keys' tags for i, v := range res { - res[i].Tags, err = n.getKeyTags(ctx, v.Group, v.Label, v.Key) + res[i].Tags, err = n.getConfigTags(ctx, v.Group, v.Label, v.Key) if err != nil { return nil, err } @@ -181,144 +179,7 @@ func (n *NacosConfigStore) Get(ctx context.Context, request *configstores.GetReq return res, nil } -func (n *NacosConfigStore) keyWithLabel(key, label string) string { - return key + n.delimiter + label -} - -// split nacos DataId into key and value -func (n *NacosConfigStore) splitDataId(dataId string) (key string, label string, err error) { - split := strings.Split(dataId, n.delimiter) - if len(split) != 2 { - return "", "", InvalidDataId - } - - return split[0], split[1], nil -} - -func (n *NacosConfigStore) getAllWithAppId(ctx context.Context) ([]*configstores.ConfigurationItem, error) { - values, err := n.client.SearchConfig(vo.SearchConfigParam{ - Search: "accurate", - AppName: n.appName, - }) - if err != nil { - log.DefaultLogger.Errorf("fail get all app_id key-value,err: %+v", err) - return nil, err - } - - res := make([]*configstores.ConfigurationItem, 0, len(values.PageItems)) - for _, v := range values.PageItems { - key, label, err := n.splitDataId(v.DataId) - if err != nil { - log.DefaultLogger.Errorf("invalid data_id:", v.DataId) - continue - } - - config := &configstores.ConfigurationItem{ - Content: v.Content, - Key: key, - Label: label, - Group: v.Group, - } - res = append(res, config) - } - - return res, nil -} - -func (n *NacosConfigStore) getAllWithGroup(ctx context.Context, group string) ([]*configstores.ConfigurationItem, error) { - values, err := n.client.SearchConfig(vo.SearchConfigParam{ - Search: "accurate", - AppName: n.appName, - Group: group, - }) - if err != nil { - log.DefaultLogger.Errorf("fail get all group key-value,err: %+v", err) - return nil, err - } - - res := make([]*configstores.ConfigurationItem, 0, len(values.PageItems)) - for _, v := range values.PageItems { - key, label, err := n.splitDataId(v.DataId) - if err != nil { - log.DefaultLogger.Errorf("invalid data_id:", v.DataId) - continue - } - config := &configstores.ConfigurationItem{ - Content: v.Content, - Key: key, - Label: label, - Group: v.Group, - } - res = append(res, config) - } - - return res, nil -} - -func (n *NacosConfigStore) getAllWithKeys(ctx context.Context, group, label string, keys []string) ([]*configstores.ConfigurationItem, error) { - res := make([]*configstores.ConfigurationItem, 0, len(keys)) - // todo: Use errgroup to get the configurations. Should think about panic - for _, key := range keys { - value, err := n.client.GetConfig(vo.ConfigParam{ - DataId: n.keyWithLabel(key, label), - Group: group, - AppName: n.appName, - }) - if err != nil { - log.DefaultLogger.Errorf("fail get key-value,err: %+v", err) - return nil, err - } - - // config is not exist - // nacos does not support an empty content. - if value == "" { - continue - } - - config := &configstores.ConfigurationItem{ - Content: value, - Key: key, - Group: group, - Label: label, - } - - res = append(res, config) - } - - return res, nil -} - -func (n *NacosConfigStore) tagKey(group, label, key string) string { - return group + n.delimiter + key + n.delimiter + label -} - -func (n *NacosConfigStore) getKeyTags(ctx context.Context, group, label, key string) (map[string]string, error) { - content, err := n.client.GetConfig(vo.ConfigParam{ - AppName: n.appName, - Group: n.tagGroup, - DataId: n.tagKey(group, label, key), - }) - - if err != nil { - log.DefaultLogger.Errorf("fail get key tags,err: %+v", err) - return nil, err - } - - // haven't set config tags. - if content == "" { - return nil, nil - } - - res := make(map[string]string) - err = json.Unmarshal([]byte(content), &res) - if err != nil { - return nil, err - } - - return res, nil -} - -func (n *NacosConfigStore) Set(ctx context.Context, request *configstores.SetRequest) error { +func (n *ConfigStore) Set(ctx context.Context, request *configstores.SetRequest) error { if request.AppId == "" { return errParamsMissingField("AppId") } @@ -337,7 +198,7 @@ func (n *NacosConfigStore) Set(ctx context.Context, request *configstores.SetReq return InvalidGroup } - // check whether key and label include delimiter + // check whether keyWithLabel and label include delimiter if strings.Contains(configItem.Key, n.delimiter) { return InvalidKey } @@ -346,44 +207,43 @@ func (n *NacosConfigStore) Set(ctx context.Context, request *configstores.SetReq } // publish config content + keyWithLabel := n.concatenateKey(configItem.Key, configItem.Label) ok, err := n.client.PublishConfig(vo.ConfigParam{ - DataId: n.keyWithLabel(configItem.Key, configItem.Label), + DataId: keyWithLabel, Group: configItem.Group, AppName: request.AppId, Content: configItem.Content, }) - // If the config does not exist, deleting the config will not result in an error. if err != nil { - log.DefaultLogger.Errorf("set key[%+v] failed with error: %+v", configItem.Key, err) + log.DefaultLogger.Errorf("set keyWithLabel[%+v] failed with error: %+v", configItem.Key, err) return err } - if !ok { return IllegalParam } - // publish tags - if configItem.Tags == nil { - configItem.Tags = make(map[string]string) + // publish config tags + if len(configItem.Tags) == 0 { + continue } + content, err := json.Marshal(configItem.Tags) if err != nil { return err } + keyForTag := n.concatenateKeyForTag(configItem.Group, keyWithLabel) ok, err = n.client.PublishConfig(vo.ConfigParam{ - DataId: n.tagKey(configItem.Group, configItem.Label, configItem.Key), + DataId: keyForTag, Group: n.tagGroup, AppName: request.AppId, Content: string(content), }) - if err != nil { - log.DefaultLogger.Errorf("set key[%+v] tags failed with error: %+v", configItem.Key, err) + log.DefaultLogger.Errorf("set keyWithLabel[%+v] tags failed with error: %+v", configItem.Key, err) return err } - if !ok { return IllegalParam } @@ -392,7 +252,7 @@ func (n *NacosConfigStore) Set(ctx context.Context, request *configstores.SetReq return nil } -func (n *NacosConfigStore) Delete(ctx context.Context, request *configstores.DeleteRequest) error { +func (n *ConfigStore) Delete(ctx context.Context, request *configstores.DeleteRequest) error { if request.AppId == "" { return errParamsMissingField("AppId") } @@ -407,8 +267,9 @@ func (n *NacosConfigStore) Delete(ctx context.Context, request *configstores.Del for _, key := range request.Keys { // delete config + keyWithLabel := n.concatenateKey(key, request.Label) ok, err := n.client.DeleteConfig(vo.ConfigParam{ - DataId: n.keyWithLabel(key, request.Label), + DataId: keyWithLabel, Group: request.Group, AppName: request.AppId, }) @@ -422,8 +283,9 @@ func (n *NacosConfigStore) Delete(ctx context.Context, request *configstores.Del } // delete tags + keyForTag := n.concatenateKeyForTag(request.Group, keyWithLabel) ok, err = n.client.DeleteConfig(vo.ConfigParam{ - DataId: n.tagKey(request.Group, request.Label, key), + DataId: keyForTag, Group: n.tagGroup, AppName: request.AppId, }) @@ -446,7 +308,7 @@ func (n *NacosConfigStore) Delete(ctx context.Context, request *configstores.Del return nil } -func (n *NacosConfigStore) Subscribe(request *configstores.SubscribeReq, ch chan *configstores.SubscribeResp) error { +func (n *ConfigStore) Subscribe(request *configstores.SubscribeReq, ch chan *configstores.SubscribeResp) error { if request.Group == "" && len(request.Keys) > 0 { request.Group = defaultGroup } @@ -477,9 +339,149 @@ func (n *NacosConfigStore) Subscribe(request *configstores.SubscribeReq, ch chan return nil } -func (n *NacosConfigStore) subscribeKey(item *configstores.ConfigurationItem, ch chan *configstores.SubscribeResp) error { +func (n *ConfigStore) StopSubscribe() { + // stop listening all subscribed configs + // grpc server will with the resp channel, so here don't need to close the channel. + subscribeInfos := n.listener.GetSubscriberKey() + for _, info := range subscribeInfos { + keyWithLabel := n.concatenateKey(info.key, info.label) + err := n.client.CancelListenConfig(vo.ConfigParam{ + DataId: keyWithLabel, + Group: info.group, + AppName: n.appName, + }) + + if err != nil { + log.DefaultLogger.Errorf("nacos StopSubscribe key %s-%s-%s failed", n.appName, info.group, keyWithLabel) + return + } + + n.listener.RemoveSubscriberKey(info) + } +} + +func (n ConfigStore) GetDefaultGroup() string { + return defaultGroup +} + +func (n ConfigStore) GetDefaultLabel() string { + return defaultLabel +} + +func (n *ConfigStore) getAllWithAppId(ctx context.Context) ([]*configstores.ConfigurationItem, error) { + values, err := n.client.SearchConfig(vo.SearchConfigParam{ + Search: "accurate", + AppName: n.appName, + }) + if err != nil { + log.DefaultLogger.Errorf("fail get all app_id key-value,err: %+v", err) + return nil, err + } + + res := make([]*configstores.ConfigurationItem, 0, len(values.PageItems)) + for _, v := range values.PageItems { + key, label := n.splitKey(v.DataId) + config := &configstores.ConfigurationItem{ + Content: v.Content, + Key: key, + Label: label, + Group: v.Group, + } + res = append(res, config) + } + + return res, nil +} + +func (n *ConfigStore) getAllWithGroup(ctx context.Context, group string) ([]*configstores.ConfigurationItem, error) { + values, err := n.client.SearchConfig(vo.SearchConfigParam{ + Search: "accurate", + AppName: n.appName, + Group: group, + }) + if err != nil { + log.DefaultLogger.Errorf("fail get all group key-value,err: %+v", err) + return nil, err + } + + res := make([]*configstores.ConfigurationItem, 0, len(values.PageItems)) + for _, v := range values.PageItems { + key, label := n.splitKey(v.DataId) + config := &configstores.ConfigurationItem{ + Content: v.Content, + Key: key, + Label: label, + Group: v.Group, + } + res = append(res, config) + } + + return res, nil +} + +func (n *ConfigStore) getAllWithKeys(ctx context.Context, group, label string, keys []string) ([]*configstores.ConfigurationItem, error) { + res := make([]*configstores.ConfigurationItem, 0, len(keys)) + // todo: Use errgroup to get the configurations. Should think about panic + for _, key := range keys { + value, err := n.client.GetConfig(vo.ConfigParam{ + DataId: n.concatenateKey(key, label), + Group: group, + AppName: n.appName, + }) + if err != nil { + log.DefaultLogger.Errorf("fail get key-value,err: %+v", err) + return nil, err + } + + // config is not exist + // nacos does not support an empty content. + if value == "" { + continue + } + + config := &configstores.ConfigurationItem{ + Content: value, + Key: key, + Group: group, + Label: label, + } + + res = append(res, config) + } + + return res, nil +} + +func (n *ConfigStore) getConfigTags(ctx context.Context, group, label, key string) (map[string]string, error) { + keyForTag := n.concatenateKeyForTag(group, n.concatenateKey(key, label)) + content, err := n.client.GetConfig(vo.ConfigParam{ + AppName: n.appName, + Group: n.tagGroup, + DataId: keyForTag, + }) + + if err != nil { + log.DefaultLogger.Errorf("fail get key tags,err: %+v", err) + return nil, err + } + + // haven't set config tags. + if content == "" { + return nil, nil + } + + res := make(map[string]string) + err = json.Unmarshal([]byte(content), &res) + if err != nil { + return nil, err + } + + return res, nil +} + +func (n *ConfigStore) subscribeKey(item *configstores.ConfigurationItem, ch chan *configstores.SubscribeResp) error { err := n.client.ListenConfig(vo.ConfigParam{ - DataId: n.keyWithLabel(item.Key, item.Label), + DataId: n.concatenateKey(item.Key, item.Label), Group: item.Group, AppName: n.appName, OnChange: subscribeFunc(ch), @@ -504,18 +506,14 @@ func setupSubscribeFunc(fn SubscribeFunc) { subscribeFunc = fn } -func (n *NacosConfigStore) keyWithGroup(group, keyWithLabel string) string { - return group + n.delimiter + keyWithLabel -} - -func (n *NacosConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp) OnChangeFunc { +func (n *ConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp) OnChangeFunc { return func(namespace, group, dataId, data string) { // get tags // If getting tags failed returns the resp without tags and ignore this error. tags := make(map[string]string) config, err := n.client.GetConfig(vo.ConfigParam{ // This key will naturally carry label information, so there is no need to add a label suffix - DataId: n.keyWithGroup(group, dataId), + DataId: n.concatenateKeyForTag(group, dataId), Group: n.tagGroup, AppName: n.appName, }) @@ -525,11 +523,7 @@ func (n *NacosConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp } // split nacos data_id to key and label - id, label, err := n.splitDataId(dataId) - if err != nil { - log.DefaultLogger.Errorf("split nacos data_id [%s] into key and label failed", dataId) - return - } + key, label := n.splitKey(dataId) // package the listening data. resp := &configstores.SubscribeResp{ @@ -537,7 +531,7 @@ func (n *NacosConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp AppId: n.appName, Items: []*configstores.ConfigurationItem{ { - Key: id, + Key: key, Label: label, Content: data, Group: group, @@ -550,31 +544,31 @@ func (n *NacosConfigStore) subscribeOnChange(ch chan *configstores.SubscribeResp } } -func (n *NacosConfigStore) StopSubscribe() { - // stop listening all subscribed configs - // grpc server will with the resp channel, so here don't need to close the channel. - subscribeInfos := n.listener.GetSubscriberKey() - for _, info := range subscribeInfos { - keyWithLabel := n.keyWithLabel(info.key, info.label) - err := n.client.CancelListenConfig(vo.ConfigParam{ - DataId: keyWithLabel, - Group: info.group, - AppName: n.appName, - }) - - if err != nil { - log.DefaultLogger.Errorf("nacos StopSubscribe key %s-%s-%s failed", n.appName, info.group, keyWithLabel) - return - } - - n.listener.RemoveSubscriberKey(info) +func (n *ConfigStore) concatenateKeyForTag(group, keyWithLabel string) string { + if keyWithLabel == "" { + return "" } + return group + n.delimiter + keyWithLabel } -func (n NacosConfigStore) GetDefaultGroup() string { - return defaultGroup +func (n *ConfigStore) concatenateKey(key, label string) string { + if label == "" { + return key + } + return key + n.delimiter + label } -func (n NacosConfigStore) GetDefaultLabel() string { - return defaultLabel +// split nacos DataId into key and value +func (n *ConfigStore) splitKey(keyWithLabel string) (key string, label string) { + if keyWithLabel == "" { + return "", "" + } + + // Users can set config keys without label + split := strings.Split(keyWithLabel, n.delimiter) + if len(split) < 2 { + return split[0], "" + } + + return split[0], split[1] } diff --git a/components/configstores/nacos/nacos_test.go b/components/configstores/nacos/configstore_test.go similarity index 98% rename from components/configstores/nacos/nacos_test.go rename to components/configstores/nacos/configstore_test.go index 2546063398..439d3e6a71 100644 --- a/components/configstores/nacos/nacos_test.go +++ b/components/configstores/nacos/configstore_test.go @@ -16,14 +16,16 @@ package nacos import ( "context" "fmt" + "sync" + "testing" + "github.com/golang/mock/gomock" "github.com/nacos-group/nacos-sdk-go/v2/model" "github.com/nacos-group/nacos-sdk-go/v2/vo" "github.com/stretchr/testify/assert" + "mosn.io/layotto/components/configstores" "mosn.io/layotto/components/pkg/mock" - "sync" - "testing" ) const ( @@ -37,7 +39,7 @@ func getMockNacosClient(t *testing.T) *mock.MockNacosConfigClient { return mock.NewMockNacosConfigClient(ctrl) } -func setup(t *testing.T, client NacosConfigClient) *NacosConfigStore { +func setup(t *testing.T, client NacosConfigClient) *ConfigStore { t.Helper() store := NewStore() // with default namespace and timeout @@ -55,7 +57,7 @@ func setup(t *testing.T, client NacosConfigClient) *NacosConfigStore { return nil } - nacosStore := store.(*NacosConfigStore) + nacosStore := store.(*ConfigStore) if client != nil { nacosStore.client = client } @@ -262,9 +264,8 @@ func TestNacosConfigStore_Init(t *testing.T) { err := store.Init(config) assert.Nil(t, err) // check config params - nacosStore := store.(*NacosConfigStore) + nacosStore := store.(*ConfigStore) assert.EqualValues(t, config.Metadata[namespaceIdKey], nacosStore.namespaceId) - assert.EqualValues(t, config.Address, nacosStore.addresses) assert.EqualValues(t, config.Metadata[appNameKey], nacosStore.appName) assert.EqualValues(t, config.StoreName, nacosStore.storeName) }) @@ -364,8 +365,6 @@ func TestNacosConfigStore_Set(t *testing.T) { } func TestNacosConfigStore_StopSubscribe(t *testing.T) { - // todo test channel closed. - req := &configstores.SubscribeReq{ AppId: appName, Group: "test-stop-subscribe-group", diff --git a/components/configstores/nacos/const.go b/components/configstores/nacos/const.go index 28e8b8a8ef..958a4f4889 100644 --- a/components/configstores/nacos/const.go +++ b/components/configstores/nacos/const.go @@ -1,3 +1,16 @@ +// Copyright 2021 Layotto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package nacos import "errors" @@ -7,11 +20,6 @@ var ( // This error is usually caused by the logic of the parameters, // such as filling in parameters with illegal characters IllegalParam = errors.New("illegal parameter") - - // InvalidDataId - // The nacos data_id cannot be divided into key labels using the delimiter set in store instance. - InvalidDataId = errors.New("invalid data_id") - InvalidKey = errors.New("the key contains delimiter which are not allowed") InvalidLabel = errors.New("the label contains delimiter which are not allowed") InvalidGroup = errors.New("the group is consistent with the group stored in the set tag")