Skip to content

Commit

Permalink
fix: sharding reconfigure (#8640)
Browse files Browse the repository at this point in the history
  • Loading branch information
kizuna-lek authored Dec 16, 2024
1 parent ff9702d commit 81d89da
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 11 deletions.
62 changes: 53 additions & 9 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ func (r *OpsRequest) getConfigMap(ctx context.Context,
return cmObj, nil
}

func (r *OpsRequest) getShardingComponents(ctx context.Context,
k8sClient client.Client,
cluster *Cluster, shardingName string) ([]Component, error) {
compList := &ComponentList{}
ml := client.MatchingLabels{
constant.AppInstanceLabelKey: cluster.Name,
constant.KBAppShardingNameLabelKey: shardingName,
}
if err := k8sClient.List(ctx, compList, client.InNamespace(cluster.Namespace), ml); err != nil {
return nil, err
}
return compList.Items, nil
}

// Validate validates OpsRequest
func (r *OpsRequest) Validate(ctx context.Context,
k8sClient client.Client,
Expand Down Expand Up @@ -375,24 +389,54 @@ func (r *OpsRequest) validateReconfigureParams(ctx context.Context,
k8sClient client.Client,
cluster *Cluster,
reconfigure *Reconfigure) error {
if cluster.Spec.GetComponentByName(reconfigure.ComponentName) == nil {
if cluster.Spec.GetComponentByName(reconfigure.ComponentName) == nil &&
cluster.Spec.GetShardingByName(reconfigure.ComponentName) == nil {
return fmt.Errorf("component %s not found", reconfigure.ComponentName)
}
for _, configuration := range reconfigure.Configurations {
cmObj, err := r.getConfigMap(ctx, k8sClient, fmt.Sprintf("%s-%s-%s", r.Spec.GetClusterName(), reconfigure.ComponentName, configuration.Name))

shardingNameMap := map[string][]Component{}
for _, shardingSpec := range cluster.Spec.ShardingSpecs {
shardingComponents, err := r.getShardingComponents(ctx, k8sClient, cluster, shardingSpec.Name)
if err != nil {
return err
}
for _, key := range configuration.Keys {
// check add file
if _, ok := cmObj.Data[key.Key]; !ok && key.FileContent == "" {
return errors.Errorf("key %s not found in configmap %s", key.Key, configuration.Name)
shardingNameMap[shardingSpec.Name] = shardingComponents
}

validateConfiguration := func(reconfigure *Reconfigure) error {
for _, configuration := range reconfigure.Configurations {
cmObj, err := r.getConfigMap(ctx, k8sClient, fmt.Sprintf("%s-%s-%s", r.Spec.GetClusterName(), reconfigure.ComponentName, configuration.Name))
if err != nil {
return err
}
if key.FileContent == "" && len(key.Parameters) == 0 {
return errors.New("key.fileContent and key.parameters cannot be empty at the same time")
for _, key := range configuration.Keys {
// check add file
if _, ok := cmObj.Data[key.Key]; !ok && key.FileContent == "" {
return errors.Errorf("key %s not found in configmap %s", key.Key, configuration.Name)
}
if key.FileContent == "" && len(key.Parameters) == 0 {
return errors.New("key.fileContent and key.parameters cannot be empty at the same time")
}
}
}
return nil
}

if _, ok := shardingNameMap[reconfigure.ComponentName]; ok {
for _, shardingComponents := range shardingNameMap[reconfigure.ComponentName] {
if err := validateConfiguration(&Reconfigure{
ComponentOps: ComponentOps{
ComponentName: shardingComponents.Labels[constant.KBAppComponentLabelKey],
},
Configurations: reconfigure.Configurations,
}); err != nil {
return err
}
}
} else {
return validateConfiguration(reconfigure)
}

return nil
}

Expand Down
32 changes: 30 additions & 2 deletions controllers/apps/operations/reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/configuration/core"
"github.com/apecloud/kubeblocks/pkg/constant"
configctrl "github.com/apecloud/kubeblocks/pkg/controller/configuration"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)
Expand Down Expand Up @@ -142,10 +143,37 @@ func (r *reconfigureAction) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli c
func fromReconfigureOperations(request appsv1alpha1.OpsRequestSpec, reqCtx intctrlutil.RequestCtx, cli client.Client, resource *OpsResource) (reconfigures []reconfigureParams) {
var operations []appsv1alpha1.Reconfigure

shardingNameMap := map[string][]appsv1alpha1.Component{}
for _, shardingSpec := range resource.Cluster.Spec.ShardingSpecs {
components, err := intctrlutil.ListShardingComponents(reqCtx.Ctx, cli, resource.Cluster, shardingSpec.Name)
if err != nil {
continue
}
shardingNameMap[shardingSpec.Name] = components
}

appendReconfigure := func(reconfigures ...appsv1alpha1.Reconfigure) {
for _, reconfigure := range reconfigures {
// Perform the same reconfigure operation on all shards
if _, ok := shardingNameMap[reconfigure.ComponentName]; ok {
for _, shardingComponents := range shardingNameMap[reconfigure.ComponentName] {
operations = append(operations, appsv1alpha1.Reconfigure{
ComponentOps: appsv1alpha1.ComponentOps{
ComponentName: shardingComponents.Labels[constant.KBAppComponentLabelKey],
},
Configurations: reconfigure.Configurations,
})
}
} else {
operations = append(operations, *request.Reconfigure)
}
}
}

if request.Reconfigure != nil {
operations = append(operations, *request.Reconfigure)
appendReconfigure(*request.Reconfigure)
}
operations = append(operations, request.Reconfigures...)
appendReconfigure(request.Reconfigures...)

for _, reconfigure := range operations {
if len(reconfigure.Configurations) == 0 {
Expand Down

0 comments on commit 81d89da

Please sign in to comment.