Skip to content

Commit

Permalink
feat(auto-balance): implement local sync to added replica
Browse files Browse the repository at this point in the history
longhorn/longhorn-4105

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Jul 4, 2024
1 parent 2568053 commit bca16fe
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 11 deletions.
7 changes: 6 additions & 1 deletion app/cmd/add_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func AddReplicaCmd() cli.Command {
Required: false,
Usage: "Enable fast file synchronization using change time and checksum",
},
cli.BoolFlag{
Name: "sync-local",
Required: false,
Usage: "sync local replica",
},
cli.IntFlag{
Name: "file-sync-http-client-timeout",
Required: false,
Expand Down Expand Up @@ -103,7 +108,7 @@ func addReplica(c *cli.Context) error {
if c.Bool("restore") {
return task.AddRestoreReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName)
}
return task.AddReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds)
return task.AddReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName, fileSyncHTTPClientTimeout, fastSync, nil, grpcTimeoutSeconds)
}

func StartWithReplicasCmd() cli.Command {
Expand Down
15 changes: 12 additions & 3 deletions pkg/replica/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (c *ReplicaClient) LaunchReceiver(toFilePath string) (string, int32, error)
return c.host, reply.Port, nil
}

func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64) error {
func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64, localSync *types.FileLocalSync) error {
syncAgentServiceClient, err := c.getSyncServiceClient()
if err != nil {
return err
Expand All @@ -518,14 +518,23 @@ func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo,
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()

if _, err := syncAgentServiceClient.FilesSync(ctx, &enginerpc.FilesSyncRequest{
fileSyncRequest := &enginerpc.FilesSyncRequest{
FromAddress: fromAddress,
ToHost: c.host,
SyncFileInfoList: syncFileInfoListToSyncAgentGRPCFormat(list),
FastSync: fastSync,
FileSyncHttpClientTimeout: int32(fileSyncHTTPClientTimeout),
GrpcTimeoutSeconds: grpcTimeoutSeconds,
}); err != nil {
}

if localSync != nil {
fileSyncRequest.LocalSync = &enginerpc.FileLocalSync{
SourcePath: localSync.SourcePath,
TargetPath: localSync.TargetPath,
}
}

if _, err := syncAgentServiceClient.FilesSync(ctx, fileSyncRequest); err != nil {
return errors.Wrapf(err, "failed to sync files %+v from %v", list, fromAddress)
}

Expand Down
72 changes: 67 additions & 5 deletions pkg/sync/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/emptypb"

lhio "github.com/longhorn/go-common-libs/io"

"github.com/longhorn/longhorn-engine/pkg/backup"
"github.com/longhorn/longhorn-engine/pkg/interceptor"
"github.com/longhorn/longhorn-engine/pkg/replica"
Expand Down Expand Up @@ -445,16 +447,77 @@ func (s *SyncAgentServer) FilesSync(ctx context.Context, req *enginerpc.FilesSyn
}
}()

if req.LocalSync != nil {
err := s.fileSyncLocal(ctx, req)
if err == nil {
return &emptypb.Empty{}, nil
}

logrus.WithError(err).Warn("Falling back to remote sync")
}

return &emptypb.Empty{}, s.fileSyncRemote(ctx, req)
}

func (s *SyncAgentServer) fileSyncLocal(ctx context.Context, req *enginerpc.FilesSyncRequest) error {
var targetPaths []string
var err error

log := logrus.WithFields(logrus.Fields{
"sourcePath": req.LocalSync.SourcePath,
"targetPath": req.LocalSync.TargetPath,
})

log.Info("Syncing files locally")

// Defer function to handle cleanup of files if an error occurs
defer func() {
if err == nil {
log.Info("Done syncing files locally")
} else {
log.WithError(err).Warn("Failed to sync files locally, reverting changes")

for _, targetPath := range targetPaths {
if removeErr := os.Remove(targetPath); removeErr != nil && removeErr != os.ErrNotExist {
log.WithError(removeErr).Warnf("Failed to remove file %v", targetPath)
}
}
}
}()

for _, info := range req.SyncFileInfoList {
sourcePath := filepath.Join("/host", req.LocalSync.SourcePath, info.FromFileName)
targetPath := filepath.Join("/host", req.LocalSync.TargetPath, info.ToFileName)
targetPaths = append(targetPaths, targetPath)

log.Tracef("Copying file %v to %v", sourcePath, targetPath)

err = lhio.CopyFile(sourcePath, targetPath, true)
if err != nil {
return err
}

err := lhio.CheckIsFileSizeSame(sourcePath, targetPath)
if err != nil {
return errors.Wrapf(err, "failed to check file size for file %v", sourcePath)
}
}

return nil
}

func (s *SyncAgentServer) fileSyncRemote(ctx context.Context, req *enginerpc.FilesSyncRequest) error {
// We generally don't know the from replica's instanceName since it is arbitrarily chosen from candidate addresses
// stored in the controller. Don't modify FilesSyncRequest to contain it, and create a client without it.
fromClient, err := replicaclient.NewReplicaClient(req.FromAddress, s.volumeName, "")
if err != nil {
return nil, err
return err
}
defer fromClient.Close()

var ops sparserest.SyncFileOperations
fileStub := &sparserest.SyncFileStub{}

for _, info := range req.SyncFileInfoList {
// Do not count size for disk meta file or empty disk file.
if info.ActualSize == 0 {
Expand All @@ -465,14 +528,13 @@ func (s *SyncAgentServer) FilesSync(ctx context.Context, req *enginerpc.FilesSyn

port, err := s.launchReceiver("FilesSync", info.ToFileName, ops)
if err != nil {
return nil, errors.Wrapf(err, "failed to launch receiver for file %v", info.ToFileName)
return errors.Wrapf(err, "failed to launch receiver for file %v", info.ToFileName)
}
if err := fromClient.SendFile(info.FromFileName, req.ToHost, int32(port), int(req.FileSyncHttpClientTimeout), req.FastSync, req.GrpcTimeoutSeconds); err != nil {
return nil, errors.Wrapf(err, "replica %v failed to send file %v to %v:%v", req.FromAddress, info.ToFileName, req.ToHost, port)
return errors.Wrapf(err, "replica %v failed to send file %v to %v:%v", req.FromAddress, info.ToFileName, req.ToHost, port)
}
}

return &emptypb.Empty{}, nil
return nil
}

func (s *SyncAgentServer) PrepareRebuild(list []*enginerpc.SyncFileInfo, fromReplicaAddress string) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (t *Task) VerifyRebuildReplica(address, instanceName string) error {
return nil
}

func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instanceName string, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64) error {
func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instanceName string, fileSyncHTTPClientTimeout int, fastSync bool, localSync *types.FileLocalSync, grpcTimeoutSeconds int64) error {
volume, err := t.client.VolumeGet()
if err != nil {
return err
Expand Down Expand Up @@ -459,7 +459,7 @@ func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instance
return fmt.Errorf("sync file list shouldn't contain volume head")
}

if err = toClient.SyncFiles(fromAddress, resp, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds); err != nil {
if err = toClient.SyncFiles(fromAddress, resp, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds, localSync); err != nil {
return err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,8 @@ func GRPCReplicaModeToReplicaMode(replicaMode enginerpc.ReplicaMode) Mode {
}
return ERR
}

type FileLocalSync struct {
SourcePath string
TargetPath string
}

0 comments on commit bca16fe

Please sign in to comment.