Skip to content

Commit

Permalink
update envoy filter
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Zhao <zhaoyu@koderover.com>
  • Loading branch information
PetrusZ committed Dec 5, 2023
1 parent 2240e07 commit 3ad1e98
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func EnableIstioGrayscale(ctx context.Context, envName, productName string) erro
}

// 3. Ensure `EnvoyFilter` in istio namespace.
err = ensureEnvoyFilter(ctx, istioClient, clusterID, istioNamespace, zadigEnvoyFilter)
err = ensureEnvoyFilter(ctx, istioClient, clusterID, istioNamespace, zadigEnvoyFilter, nil)
if err != nil {
return fmt.Errorf("failed to ensure EnvoyFilter in namespace `%s`: %s", istioNamespace, err)
}
Expand Down Expand Up @@ -266,6 +266,18 @@ func SetIstioGrayscaleConfig(ctx context.Context, envName, productName string, r
if err != nil {
return fmt.Errorf("failed to set istio grayscale weight, err: %w", err)
}

headerKeys := []string{}
for _, headerMatchConfig := range req.HeaderMatchConfigs {
for _, headerMatch := range headerMatchConfig.HeaderMatchs {
headerKeys = append(headerKeys, headerMatch.Key)
}
}

err = reGenerateEnvoyFilter(ctx, baseEnv.ClusterID, headerKeys)
if err != nil {
return fmt.Errorf("failed to re-generate envoy filter, err: %w", err)
}
} else {
return fmt.Errorf("unsupported grayscale strategy type: %s", req.GrayscaleStrategy)
}
Expand Down Expand Up @@ -440,3 +452,27 @@ func ensureDisableGrayscaleEnvConfig(ctx context.Context, baseEnv *commonmodels.

return commonrepo.NewProductColl().Update(baseEnv)
}

func reGenerateEnvoyFilter(ctx context.Context, clusterID string, headerKeys []string) error {
restConfig, err := kubeclient.GetRESTConfig(config.HubServerAddress(), clusterID)
if err != nil {
return fmt.Errorf("failed to get rest config: %s", err)
}

istioClient, err := versionedclient.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to new istio client: %s", err)
}

err = deleteEnvoyFilter(ctx, istioClient, istioNamespace, zadigEnvoyFilter)
if err != nil {
return fmt.Errorf("failed to delete EnvoyFilter: %s", err)
}

err = ensureEnvoyFilter(ctx, istioClient, clusterID, istioNamespace, zadigEnvoyFilter, headerKeys)
if err != nil {
return fmt.Errorf("failed to ensure EnvoyFilter in namespace `%s`: %s", istioNamespace, err)
}

return nil
}
142 changes: 135 additions & 7 deletions pkg/microservice/aslan/core/environment/service/share_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func EnableBaseEnv(ctx context.Context, envName, productName string) error {
}

// 4. Ensure `EnvoyFilter` in istio namespace.
err = ensureEnvoyFilter(ctx, istioClient, clusterID, istioNamespace, zadigEnvoyFilter)
err = ensureEnvoyFilter(ctx, istioClient, clusterID, istioNamespace, zadigEnvoyFilter, nil)
if err != nil {
return fmt.Errorf("failed to ensure EnvoyFilter in namespace `%s`: %s", istioNamespace, err)
}
Expand Down Expand Up @@ -576,7 +576,8 @@ func checkVirtualServicesDeployed(ctx context.Context, kclient client.Client, is
return true, nil
}

func ensureEnvoyFilter(ctx context.Context, istioClient versionedclient.Interface, clusterID, ns, name string) error {
func ensureEnvoyFilter(ctx context.Context, istioClient versionedclient.Interface, clusterID, ns, name string, headerKeys []string) error {
headerKeySet := sets.NewString(headerKeys...)
envoyFilterObj, err := istioClient.NetworkingV1alpha3().EnvoyFilters(ns).Get(ctx, name, metav1.GetOptions{})
if err == nil {
log.Infof("Has found EnvoyFilter `%s` in ns `%s` and don't recreate.", name, istioNamespace)
Expand All @@ -587,12 +588,12 @@ func ensureEnvoyFilter(ctx context.Context, istioClient versionedclient.Interfac
return fmt.Errorf("failed to query EnvoyFilter `%s` in ns `%s`: %s", name, istioNamespace, err)
}

storeCacheOperation, err := buildEnvoyStoreCacheOperation()
storeCacheOperation, err := buildEnvoyStoreCacheOperation(headerKeySet.List())
if err != nil {
return fmt.Errorf("failed to build envoy operation of storing cache: %s", err)
}

getCacheOperation, err := buildEnvoyGetCacheOperation()
getCacheOperation, err := buildEnvoyGetCacheOperation(headerKeySet.List())
if err != nil {
return fmt.Errorf("failed to build envoy operation of getting cache: %s", err)
}
Expand Down Expand Up @@ -680,8 +681,7 @@ func ensureEnvoyFilter(ctx context.Context, istioClient versionedclient.Interfac
_, err = istioClient.NetworkingV1alpha3().EnvoyFilters(ns).Create(ctx, envoyFilterObj, metav1.CreateOptions{})
return err
}

func buildEnvoyStoreCacheOperation() (*types.Struct, error) {
func buildEnvoyStoreCacheOperation1(keys []string) (*types.Struct, error) {
inlineCode := `function envoy_on_request(request_handle)
function split_str(s, delimiter)
res = {}
Expand Down Expand Up @@ -727,7 +727,7 @@ end
return buildEnvoyPatchValue(data)
}

func buildEnvoyGetCacheOperation() (*types.Struct, error) {
func buildEnvoyGetCacheOperation1(keys []string) (*types.Struct, error) {
inlineCode := `function envoy_on_request(request_handle)
function split_str(s, delimiter)
res = {}
Expand Down Expand Up @@ -774,6 +774,134 @@ end
return buildEnvoyPatchValue(data)
}

func buildEnvoyStoreCacheOperation(headerKeys []string) (*types.Struct, error) {
inlineCodeStart := `function envoy_on_request(request_handle)
function split_str(s, delimiter)
res = {}
for match in (s..delimiter):gmatch("(.-)"..delimiter) do
table.insert(res, match)
end
return res
end
local traceid = request_handle:headers():get("sw8")
if traceid then
arr = split_str(traceid, "-")
traceid = arr[2]
else
traceid = request_handle:headers():get("x-request-id")
if not traceid then
traceid = request_handle:headers():get("x-b3-traceid")
end
end
local env = request_handle:headers():get("x-env")
local headers, body = request_handle:httpCall(
"cache",
{
[":method"] = "POST",
[":path"] = string.format("/api/cache/%s/%s", traceid, env),
[":authority"] = "cache",
},
"",
5000
)
`
inlineCodeMid := ``
for _, headerKey := range headerKeys {
tmpInlineCodeMid := `
local key = request_handle:headers():get("%s")
local headers, body = request_handle:httpCall(
"cache",
{
[":method"] = "POST",
[":path"] = string.format("/api/cache/%%s/%%s", traceid, key),
[":authority"] = "cache",
},
"",
5000
)
`
tmpInlineCodeMid = fmt.Sprintf(tmpInlineCodeMid, headerKey)
inlineCodeMid += tmpInlineCodeMid
}

inlineCodeEnd := `end
`
inlineCode := fmt.Sprintf(`%s%s%s`, inlineCodeStart, inlineCodeMid, inlineCodeEnd)

data := map[string]interface{}{
"name": "envoy.lua",
"typed_config": map[string]string{
"@type": envoyFilterLua,
"inlineCode": inlineCode,
},
}

return buildEnvoyPatchValue(data)
}

func buildEnvoyGetCacheOperation(headerKeys []string) (*types.Struct, error) {
inlineCodeStart := `function envoy_on_request(request_handle)
function split_str(s, delimiter)
res = {}
for match in (s..delimiter):gmatch("(.-)"..delimiter) do
table.insert(res, match)
end
return res
end
local traceid = request_handle:headers():get("sw8")
if traceid then
arr = split_str(traceid, "-")
traceid = arr[2]
else
traceid = request_handle:headers():get("x-request-id")
if not traceid then
traceid = request_handle:headers():get("x-b3-traceid")
end
end
local headers, body = request_handle:httpCall(
"cache",
{
[":method"] = "GET",
[":path"] = string.format("/api/cache/%s", traceid),
[":authority"] = "cache",
},
"",
5000
)
request_handle:headers():add("x-env", headers["x-data"]);
`

inlineCodeMid := ``
for _, headerKey := range headerKeys {
tmpInlineCodeMid := `
request_handle:headers():add("%s", headers["x-data"]);
`
tmpInlineCodeMid = fmt.Sprintf(tmpInlineCodeMid, headerKey)
inlineCodeMid += tmpInlineCodeMid
}

inlineCodeEnd := `end
`
inlineCode := fmt.Sprintf(`%s%s%s`, inlineCodeStart, inlineCodeMid, inlineCodeEnd)

data := map[string]interface{}{
"name": "envoy.lua",
"typed_config": map[string]string{
"@type": envoyFilterLua,
"inlineCode": inlineCode,
},
}

return buildEnvoyPatchValue(data)
}

func buildEnvoyClusterConfig(cacheAddr string, port int) (*types.Struct, error) {
data := map[string]interface{}{
"name": "cache",
Expand Down

0 comments on commit 3ad1e98

Please sign in to comment.