Skip to content

Commit

Permalink
Shard ID on the secret must be int (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
dee-kryvenko authored Feb 14, 2025
1 parent b6a7bf5 commit 6710c2c
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 29 deletions.
4 changes: 2 additions & 2 deletions api/autoscaler/common/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// Replica is a representation of the replica for sharding
type Replica struct {
// ID of the replica, starting from 0 and onward.
ID string `json:"id,omitempty"`
ID int32 `json:"id,omitempty"`
// LoadIndexes shards assigned to this replica wrapped into their load index.
// +kubebuilder:validation:Required
LoadIndexes []LoadIndex `json:"loadIndexes,omitempty"`
Expand All @@ -47,7 +47,7 @@ type ReplicaList []Replica
// Regardless of the order of the replicas, the serialized string will be the same.
func (list ReplicaList) SerializeToString() string {

m := map[string]string{}
m := map[string]int32{}
for _, replica := range list {
for _, loadIndex := range replica.LoadIndexes {
m[string(loadIndex.Shard.UID)] = replica.ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica wrapped
into their load index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica
wrapped into their load index.
Expand Down Expand Up @@ -280,7 +281,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica wrapped
into their load index.
Expand Down Expand Up @@ -375,7 +377,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica wrapped
into their load index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica wrapped
into their load index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica wrapped
into their load index.
Expand Down Expand Up @@ -263,7 +264,8 @@ spec:
properties:
id:
description: ID of the replica, starting from 0 and onward.
type: string
format: int32
type: integer
loadIndexes:
description: LoadIndexes shards assigned to this replica wrapped
into their load index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package autoscaler
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -203,12 +204,12 @@ func (r *LongestProcessingTimePartitionReconciler) Reconcile(ctx context.Context
li.Shard.Namespace,
li.Shard.Name,
li.Shard.Server,
replica.ID,
strconv.Itoa(int(replica.ID)),
).Set(1)
}
longestProcessingTimePartitionReplicasTotalLoadGauge.WithLabelValues(
req.NamespacedName.String(),
replica.ID,
strconv.Itoa(int(replica.ID)),
).Set(replica.TotalLoad.AsApproximateFloat64())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package autoscaler
import (
"context"
"errors"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -230,7 +229,7 @@ var _ = Describe("LongestProcessingTimePartition Controller", func() {
replicas := common.ReplicaList{}
for i, shard := range shards {
replicas = append(replicas, common.Replica{
ID: fmt.Sprintf("%d", i),
ID: int32(i),
LoadIndexes: []common.LoadIndex{shard},
TotalLoad: shard.Value,
TotalLoadDisplayValue: shard.DisplayValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package autoscaler
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -283,12 +284,12 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
li.Shard.Namespace,
li.Shard.Name,
li.Shard.Server,
replica.ID,
strconv.Itoa(int(replica.ID)),
).Set(1)
}
mostWantedTwoPhaseHysteresisEvaluationProjectedReplicasTotalLoadGauge.WithLabelValues(
req.NamespacedName.String(),
replica.ID,
strconv.Itoa(int(replica.ID)),
).Set(replica.TotalLoad.AsApproximateFloat64())
}

Expand All @@ -314,12 +315,12 @@ func (r *MostWantedTwoPhaseHysteresisEvaluationReconciler) Reconcile(ctx context
li.Shard.Namespace,
li.Shard.Name,
li.Shard.Server,
replica.ID,
strconv.Itoa(int(replica.ID)),
).Set(1)
}
mostWantedTwoPhaseHysteresisEvaluationReplicasTotalLoadGauge.WithLabelValues(
req.NamespacedName.String(),
replica.ID,
strconv.Itoa(int(replica.ID)),
).Set(replica.TotalLoad.AsApproximateFloat64())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ var _ = Describe("MostWantedTwoPhaseHysteresisEvaluation Controller", func() {

samplePartition.Object().Status.Replicas = common.ReplicaList{
{
ID: "0",
ID: int32(0),
LoadIndexes: []common.LoadIndex{
{
Shard: common.Shard{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ var _ = Describe("ReplicaSetScaler Controller", func() {

samplePartition.Object().Status.Replicas = common.ReplicaList{
{
ID: "0",
ID: int32(0),
LoadIndexes: []common.LoadIndex{
{
Shard: common.Shard{
Expand All @@ -237,7 +237,7 @@ var _ = Describe("ReplicaSetScaler Controller", func() {
TotalLoadDisplayValue: "1",
},
{
ID: "1",
ID: int32(1),
LoadIndexes: []common.LoadIndex{
{
Shard: common.Shard{
Expand All @@ -255,7 +255,7 @@ var _ = Describe("ReplicaSetScaler Controller", func() {
TotalLoadDisplayValue: "1",
},
{
ID: "2",
ID: int32(2),
LoadIndexes: []common.LoadIndex{
{
Shard: common.Shard{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package autoscaler
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -194,18 +196,34 @@ func (r *SecretTypeClusterShardManagerReconciler) Reconcile(ctx context.Context,
shards = append(shards, shard)

actualReplicaBytes, actualReplicaSet := secret.Data["shard"]
var actualReplica string
var actualReplica int
if actualReplicaSet {
actualReplica = string(actualReplicaBytes)
log.V(2).Info("Shard had prior assignment", "shard", shard.ID, "replica", actualReplica)
_actualReplica, err := strconv.Atoi(string(actualReplicaBytes))
if err != nil {
log.Error(err, "Failed to read actual shard from the secret",
"shard", string(actualReplicaBytes), "secret", secret.Name)
meta.SetStatusCondition(&manager.Status.Conditions, metav1.Condition{
Type: StatusTypeReady,
Status: metav1.ConditionFalse,
Reason: "FailedToReadShardFromSecret",
Message: err.Error(),
})
if err := r.Status().Update(ctx, manager); err != nil {
log.V(1).Info("Failed to update resource status", "err", err)
return ctrl.Result{}, err
}
// Resource is malformed, no point in retrying
return ctrl.Result{}, nil
}
actualReplica = _actualReplica
}
desiredReplica, desiredReplicaSet := replicasByUID[secret.GetUID()]
if !desiredReplicaSet {
log.V(2).Info("Shard had desired replica", "shard", shard.ID, "desired", desiredReplica.ID)
}
if desiredReplicaSet && (!actualReplicaSet || desiredReplica.ID != actualReplica) {
if desiredReplicaSet && (!actualReplicaSet || desiredReplica.ID != int32(actualReplica)) {
log.V(1).Info("Secret is outdated", "secret", secret.Name, "actual", actualReplica, "desired", desiredReplica.ID)
secret.Data["shard"] = []byte(desiredReplica.ID)
secret.Data["shard"] = []byte(fmt.Sprintf("%d", desiredReplica.ID))
secretsToUpdate = append(secretsToUpdate, secret)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ var _ = Describe("SecretTypeClusterShardManager Controller", func() {
run.Container().Object().Spec.ShardManagerSpec.Replicas = append(
run.Container().Object().Spec.ShardManagerSpec.Replicas,
common.Replica{
ID: fmt.Sprintf("%d", i),
ID: int32(i),
LoadIndexes: []common.LoadIndex{
{
Shard: common.Shard{
Expand Down Expand Up @@ -349,7 +349,7 @@ var _ = Describe("SecretTypeClusterShardManager Controller", func() {
for _, replica := range run.Container().Object().Spec.ShardManagerSpec.Replicas {
secret, ok := shardSecretsByUID[replica.LoadIndexes[0].Shard.UID]
Expect(ok).To(BeTrue())
Expect(secret.Data["shard"]).To(Equal([]byte(replica.ID)))
Expect(secret.Data["shard"]).To(Equal([]byte(fmt.Sprintf("%d", replica.ID))))
}
},
).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (r *PartitionerImpl) Partition(ctx context.Context,
// Create a new replica for this shard.
replicaCount++
newReplica := common.Replica{
ID: strconv.FormatInt(int64(replicaCount-1), 10),
ID: replicaCount - 1,
LoadIndexes: []common.LoadIndex{shard},
TotalLoad: shard.Value,
TotalLoadDisplayValue: strconv.FormatFloat(shard.Value.AsApproximateFloat64(), 'f', -1, 64),
Expand Down

0 comments on commit 6710c2c

Please sign in to comment.