Skip to content

Commit

Permalink
Change Name to VolumeName and cache VolumeName in controller client
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Weber <eric.weber@suse.com>
  • Loading branch information
ejweber authored and PhanLe1010 committed Aug 4, 2023
1 parent 4767149 commit 4d54af9
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 47 deletions.
2 changes: 2 additions & 0 deletions pkg/controller/client/controller_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (c ControllerServiceContext) Close() error {

type ControllerClient struct {
serviceURL string
VolumeName string
ControllerServiceContext
}

Expand Down Expand Up @@ -63,6 +64,7 @@ func NewControllerClient(address, volumeName, instanceName string) (*ControllerC

return &ControllerClient{
serviceURL: serviceURL,
VolumeName: volumeName,
ControllerServiceContext: serviceContext,
}, nil
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

type Controller struct {
sync.RWMutex
Name string
VolumeName string
size int64
sectorSize int64
replicas []types.Replica
Expand Down Expand Up @@ -70,7 +70,7 @@ func NewController(name string, factory types.BackendFactory, frontend types.Fro
iscsiTargetRequestTimeout, engineReplicaTimeout time.Duration, dataServerProtocol types.DataServerProtocol, fileSyncHTTPClientTimeout int) *Controller {
c := &Controller{
factory: factory,
Name: name,
VolumeName: name,
frontend: frontend,
metrics: &types.Metrics{},
latestMetrics: &types.Metrics{},
Expand Down Expand Up @@ -164,7 +164,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
return err
}

newBackend, err := c.factory.Create(c.Name, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
if err != nil {
return err
}
Expand All @@ -183,7 +183,7 @@ func (c *Controller) addReplica(address string, snapshotRequired bool, mode type
// Snapshot will try to freeze the filesystem of the volume if possible
// and will fallback to a system level sync in all other cases
func (c *Controller) Snapshot(name string, labels map[string]string) (string, error) {
log := logrus.WithFields(logrus.Fields{"volume": c.Name, "snapshot": name})
log := logrus.WithFields(logrus.Fields{"volume": c.VolumeName, "snapshot": name})
log.Info("Starting snapshot")
if ne, err := iutil.NewNamespaceExecutor(util.GetInitiatorNS()); err != nil {
log.WithError(err).Errorf("WARNING: continue to snapshot for %v, but cannot sync due to cannot get the namespace executor", name)
Expand Down Expand Up @@ -237,11 +237,11 @@ func (c *Controller) Expand(size int64) error {
// We perform a system level sync without the lock. Cannot block read/write
// Can be improved to only sync the filesystem on the block device later
if ne, err := iutil.NewNamespaceExecutor(util.GetInitiatorNS()); err != nil {
logrus.WithError(err).Errorf("WARNING: continue to expand to size %v for %v, but cannot sync due to cannot get the namespace executor", size, c.Name)
logrus.WithError(err).Errorf("WARNING: continue to expand to size %v for %v, but cannot sync due to cannot get the namespace executor", size, c.VolumeName)
} else {
if _, err := ne.ExecuteWithTimeout(syncTimeout, "sync", []string{}); err != nil {
// sync should never fail though, so it more like due to the nsenter
logrus.WithError(err).Errorf("WARNING: continue to expand to size %v for %v, but sync failed", size, c.Name)
logrus.WithError(err).Errorf("WARNING: continue to expand to size %v for %v, but sync failed", size, c.VolumeName)
}
}

Expand Down Expand Up @@ -298,7 +298,7 @@ func (c *Controller) startExpansion(size int64) (err error) {
return fmt.Errorf("requested expansion size %v not multiple of volume sector size %v", size, diskutil.VolumeSectorSize)
}
if c.size == size {
logrus.Infof("controller %v is already expanded to size %v", c.Name, size)
logrus.Infof("controller %v is already expanded to size %v", c.VolumeName, size)
return nil
}
if c.size > size {
Expand Down Expand Up @@ -449,13 +449,13 @@ func (c *Controller) startFrontend() error {
if len(c.replicas) > 0 && c.frontend != nil {
if c.isUpgrade {
logrus.Info("Upgrading frontend")
if err := c.frontend.Upgrade(c.Name, c.size, c.sectorSize, c); err != nil {
if err := c.frontend.Upgrade(c.VolumeName, c.size, c.sectorSize, c); err != nil {
logrus.WithError(err).Error("Failed to upgrade frontend")
return errors.Wrap(err, "failed to upgrade frontend")
}
return nil
}
if err := c.frontend.Init(c.Name, c.size, c.sectorSize); err != nil {
if err := c.frontend.Init(c.VolumeName, c.size, c.sectorSize); err != nil {
logrus.WithError(err).Error("Failed to init frontend")
return errors.Wrap(err, "failed to init frontend")
}
Expand Down Expand Up @@ -728,7 +728,7 @@ func (c *Controller) Start(volumeSize, volumeCurrentSize int64, addresses ...str
errorCodes := map[string]codes.Code{}
first := true
for _, address := range addresses {
newBackend, err := c.factory.Create(c.Name, address, c.DataServerProtocol, c.engineReplicaTimeout)
newBackend, err := c.factory.Create(c.VolumeName, address, c.DataServerProtocol, c.engineReplicaTimeout)
if err != nil {
if strings.Contains(err.Error(), "rpc error: code = Unavailable") {
errorCodes[address] = codes.Unavailable
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,9 @@ func (s *TestSuite) TestWriteInWOMode(c *C) {
readSource := makeByteSliceWithInitialData(dataLength, readSourceInitVal)
writeSource := makeByteSliceWithInitialData(dataLength, writeSourceInitVal)
controller := Controller{
Name: "test-controller",
replicas: []types.Replica{types.Replica{Address: "0.0.0.0", Mode: types.WO}},
backend: newMockReplicator(readSource, writeSource),
VolumeName: "test-controller",
replicas: []types.Replica{types.Replica{Address: "0.0.0.0", Mode: types.WO}},
backend: newMockReplicator(readSource, writeSource),
}

for _, t := range testsets {
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (c *Controller) VerifyRebuildReplica(address, instanceName string) error {
return fmt.Errorf("invalid mode %v for replica %v to check", replica.Mode, address)
}

fromDisks, _, err := GetReplicaDisksAndHead(rwReplica.Address, c.Name, "")
fromDisks, _, err := GetReplicaDisksAndHead(rwReplica.Address, c.VolumeName, "")
if err != nil {
return err
}

toDisks, _, err := GetReplicaDisksAndHead(address, c.Name, instanceName)
toDisks, _, err := GetReplicaDisksAndHead(address, c.VolumeName, instanceName)
if err != nil {
return err
}
Expand Down Expand Up @@ -146,12 +146,12 @@ func (c *Controller) PrepareRebuildReplica(address, instanceName string) ([]type
return nil, fmt.Errorf("invalid mode %v for replica %v to prepare rebuild", replica.Mode, address)
}

fromDisks, fromHead, err := GetReplicaDisksAndHead(rwReplica.Address, c.Name, "")
fromDisks, fromHead, err := GetReplicaDisksAndHead(rwReplica.Address, c.VolumeName, "")
if err != nil {
return nil, err
}

toDisks, toHead, err := GetReplicaDisksAndHead(address, c.Name, instanceName)
toDisks, toHead, err := GetReplicaDisksAndHead(address, c.VolumeName, instanceName)
if err != nil {
return nil, err
}
Expand All @@ -178,12 +178,12 @@ func (c *Controller) PrepareRebuildReplica(address, instanceName string) ([]type
delete(extraDisks, diskName)
}

if err := removeExtraDisks(extraDisks, address, c.Name, instanceName); err != nil {
if err := removeExtraDisks(extraDisks, address, c.VolumeName, instanceName); err != nil {
return nil, err
}

// The lock will block the read/write for this head file sync
if err := syncFile(fromHead+".meta", toHead+".meta", rwReplica.Address, address, c.Name, instanceName,
if err := syncFile(fromHead+".meta", toHead+".meta", rwReplica.Address, address, c.VolumeName, instanceName,
c.fileSyncHTTPClientTimeout, false); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Controller) Revert(name string) error {
continue
}

disks, _, err := GetReplicaDisksAndHead(r.Address, c.Name, "")
disks, _, err := GetReplicaDisksAndHead(r.Address, c.VolumeName, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (c *Controller) clientsAndSnapshot(name string) (map[string]*client.Replica
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err = client.NewReplicaClient(replica.Address, c.Name, "")
repClient, err = client.NewReplicaClient(replica.Address, c.VolumeName, "")
if err != nil {
return nil, "", err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (cs *ControllerServer) syncFileInfoToControllerFormat(info types.SyncFileIn
func (cs *ControllerServer) getVolume() *ptypes.Volume {
lastExpansionError, lastExpansionFailedAt := cs.c.GetExpansionErrorInfo()
return &ptypes.Volume{
Name: cs.c.Name,
Name: cs.c.VolumeName,
Size: cs.c.Size(),
ReplicaCount: int32(len(cs.c.ListReplicas())),
Endpoint: cs.c.Endpoint(),
Expand Down
10 changes: 5 additions & 5 deletions pkg/sync/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *Task) createBackup(replicaInController *types.ControllerReplicaInfo, ba
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (t *Task) RestoreBackup(backup string, credential map[string]string, concur
}
}

snapshots, err := GetSnapshotsInfo(replicas, t.volumeName)
snapshots, err := GetSnapshotsInfo(replicas, t.client.VolumeName)
if err != nil {
return errors.Wrapf(err, "failed to get snapshot info before the incremental restore")
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func (t *Task) restoreBackup(replicaInController *types.ControllerReplicaInfo, b
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func (t *Task) Reset() error {

for _, replica := range replicas {
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replica.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replica.Address, t.client.VolumeName, "")
if err != nil {
logrus.WithError(err).Errorf("Failed to get a replica client for %v", replica.Address)
return err
Expand Down Expand Up @@ -348,7 +348,7 @@ func (t *Task) RestoreStatus() (map[string]*RestoreStatus, error) {
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replica.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replica.Address, t.client.VolumeName, "")
if err != nil {
return nil, err
}
Expand Down
38 changes: 18 additions & 20 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
)

type Task struct {
client *client.ControllerClient
volumeName string
client *client.ControllerClient
}

type TaskError struct {
Expand Down Expand Up @@ -118,8 +117,7 @@ func NewTask(ctx context.Context, controllerAddress, volumeName, controllerInsta
}()

return &Task{
client: controllerClient,
volumeName: volumeName,
client: controllerClient,
}, nil
}

Expand Down Expand Up @@ -189,7 +187,7 @@ func (t *Task) PurgeSnapshots(skip bool) error {
defer wg.Done()

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(rep.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(rep.Address, t.client.VolumeName, "")
if err != nil {
errorMap.Store(rep.Address, errors.Wrapf(err, "failed to get replica client %v before purging", rep.Address))
return
Expand Down Expand Up @@ -243,7 +241,7 @@ func (t *Task) PurgeSnapshotStatus() (map[string]*SnapshotPurgeStatus, error) {
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, "")
if err != nil {
return nil, err
}
Expand All @@ -269,7 +267,7 @@ func (t *Task) PurgeSnapshotStatus() (map[string]*SnapshotPurgeStatus, error) {

func (t *Task) isRebuilding(replicaInController *types.ControllerReplicaInfo) (bool, error) {
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return false, err
}
Expand All @@ -285,7 +283,7 @@ func (t *Task) isRebuilding(replicaInController *types.ControllerReplicaInfo) (b

func (t *Task) isHashingSnapshot(replicaInController *types.ControllerReplicaInfo) (bool, error) {
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return false, err
}
Expand All @@ -301,7 +299,7 @@ func (t *Task) isHashingSnapshot(replicaInController *types.ControllerReplicaInf

func (t *Task) isPurging(replicaInController *types.ControllerReplicaInfo) (bool, error) {
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return false, err
}
Expand All @@ -321,7 +319,7 @@ func (t *Task) markSnapshotAsRemoved(replicaInController *types.ControllerReplic
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return err
}
Expand All @@ -336,7 +334,7 @@ func (t *Task) markSnapshotAsRemoved(replicaInController *types.ControllerReplic

func (t *Task) cancelSnapshotHashJob(replicaInController *types.ControllerReplicaInfo, snapshot string) error {
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(replicaInController.Address, t.client.VolumeName, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -375,7 +373,7 @@ func (t *Task) AddRestoreReplica(volumeSize, volumeCurrentSize int64, address, i
}

func (t *Task) checkRestoreReplicaSize(address, instanceName string, volumeSize int64) error {
replicaCli, err := replicaClient.NewReplicaClient(address, t.volumeName, instanceName)
replicaCli, err := replicaClient.NewReplicaClient(address, t.client.VolumeName, instanceName)
if err != nil {
return err
}
Expand Down Expand Up @@ -462,7 +460,7 @@ func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instance
}

func (t *Task) checkAndResetFailedRebuild(address, instanceName string) error {
client, err := replicaClient.NewReplicaClient(address, t.volumeName, instanceName)
client, err := replicaClient.NewReplicaClient(address, t.client.VolumeName, instanceName)
if err != nil {
return err
}
Expand All @@ -489,7 +487,7 @@ func (t *Task) checkAndResetFailedRebuild(address, instanceName string) error {
}

func (t *Task) checkAndExpandReplica(address, instanceName string, size int64) error {
client, err := replicaClient.NewReplicaClient(address, t.volumeName, instanceName)
client, err := replicaClient.NewReplicaClient(address, t.client.VolumeName, instanceName)
if err != nil {
return err
}
Expand Down Expand Up @@ -594,7 +592,7 @@ func (t *Task) getFromReplicaClientForTransfer() (*replicaClient.ReplicaClient,
continue
}
// We don't know the replica's instanceName, so create a client without it.
fromClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, "")
fromClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, "")
if err != nil {
logrus.WithError(err).Warnf("Failed to get the client for replica %v when picking up a transfer-from replica", r.Address)
continue
Expand Down Expand Up @@ -627,7 +625,7 @@ func (t *Task) getToReplicaClientForTransfer(address, instanceName string) (*rep
if r.Mode != types.WO {
return nil, "", fmt.Errorf("replica %s is not in mode WO: %s", address, r.Mode)
}
toClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, instanceName)
toClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, instanceName)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -764,7 +762,7 @@ func (t *Task) RebuildStatus() (map[string]*ReplicaRebuildStatus, error) {
}

// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -940,7 +938,7 @@ func (t *Task) HashSnapshot(snapshotName string, rehash bool) error {
go func(r *types.ControllerReplicaInfo) {
defer wg.Done()
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, "")
if err != nil {
syncErrorMap.Store(r.Address, err)
return
Expand Down Expand Up @@ -1014,7 +1012,7 @@ func (t *Task) HashSnapshotStatus(snapshotName string) (map[string]*SnapshotHash
}
}()
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, "")
if err != nil {
err = errors.Wrapf(err, "failed to create replica client to %v", r.Address)
return
Expand Down Expand Up @@ -1068,7 +1066,7 @@ func (t *Task) HashSnapshotCancel(snapshotName string) error {
go func(r *types.ControllerReplicaInfo) {
defer wg.Done()
// We don't know the replica's instanceName, so create a client without it.
repClient, err := replicaClient.NewReplicaClient(r.Address, t.volumeName, "")
repClient, err := replicaClient.NewReplicaClient(r.Address, t.client.VolumeName, "")
if err != nil {
syncErrorMap.Store(r.Address, err)
return
Expand Down

0 comments on commit 4d54af9

Please sign in to comment.