Skip to content

Commit

Permalink
v2 volume: support live upgrade
Browse files Browse the repository at this point in the history
Longhorn 6001

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed Jun 18, 2024
1 parent a13a809 commit 492521d
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 28 deletions.
6 changes: 0 additions & 6 deletions app/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,6 @@ func start(c *cli.Context) (err error) {
return err
}

if spdkEnabled {
if err := cleanupStaledNvmeAndDmDevices(); err != nil {
return err
}
}

// setup tls config
var tlsConfig *tls.Config
tlsDir := c.GlobalString("tls-dir")
Expand Down
4 changes: 2 additions & 2 deletions package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ RUN wget -O - https://storage.googleapis.com/golang/go1.22.2.linux-${!GOLANG_ARC
go install golang.org/x/lint/golint@latest

ENV GO_SPDK_HELPER_DIR /usr/src/go-spdk-helper
ENV GO_SPDK_HELPER_COMMIT_ID d05637da59788eeeb8927c663f0eec7c958d3f97
RUN git clone https://github.com/longhorn/go-spdk-helper.git ${GO_SPDK_HELPER_DIR} && \
ENV GO_SPDK_HELPER_COMMIT_ID af8698af02b38e39a52159332bb1989a36290dde
RUN git clone https://github.com/derekbit/go-spdk-helper.git ${GO_SPDK_HELPER_DIR} && \
cd ${GO_SPDK_HELPER_DIR} && \
git checkout ${GO_SPDK_HELPER_COMMIT_ID} && \
go build && \
Expand Down
24 changes: 14 additions & 10 deletions pkg/api/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,24 @@ func RPCToInstanceList(obj *rpc.InstanceListResponse) map[string]*Instance {
}

type InstanceStatus struct {
State string `json:"state"`
ErrorMsg string `json:"errorMsg"`
Conditions map[string]bool `json:"conditions"`
PortStart int32 `json:"portStart"`
PortEnd int32 `json:"portEnd"`
State string `json:"state"`
ErrorMsg string `json:"errorMsg"`
Conditions map[string]bool `json:"conditions"`
PortStart int32 `json:"portStart"`
PortEnd int32 `json:"portEnd"`
TargetPortStart int32 `json:"targetPortStart"`
TargetPortEnd int32 `json:"targetPortEnd"`
}

func RPCToInstanceStatus(obj *rpc.InstanceStatus) InstanceStatus {
return InstanceStatus{
State: obj.State,
ErrorMsg: obj.ErrorMsg,
Conditions: obj.Conditions,
PortStart: obj.PortStart,
PortEnd: obj.PortEnd,
State: obj.State,
ErrorMsg: obj.ErrorMsg,
Conditions: obj.Conditions,
PortStart: obj.PortStart,
PortEnd: obj.PortEnd,
TargetPortStart: obj.TargetPortStart,
TargetPortEnd: obj.TargetPortEnd,
}
}

Expand Down
114 changes: 114 additions & 0 deletions pkg/client/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func NewInstanceServiceClientWithTLS(ctx context.Context, ctxCancel context.Canc
type EngineCreateRequest struct {
ReplicaAddressMap map[string]string
Frontend string
InitiatorAddress string
TargetAddress string
UpgradeRequired bool
}

type ReplicaCreateRequest struct {
Expand Down Expand Up @@ -115,6 +118,17 @@ type InstanceCreateRequest struct {
BackendStoreDriver string
}

type InstanceSuspendRequest struct {
DataEngine string
Name string
InstanceType string
VolumeName string

Engine EngineCreateRequest
Replica ReplicaCreateRequest
}

// InstanceCreate creates an instance.
func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api.Instance, error) {
if req.Name == "" || req.InstanceType == "" {
return nil, fmt.Errorf("failed to create instance: missing required parameter")
Expand Down Expand Up @@ -169,6 +183,10 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api

ProcessInstanceSpec: processInstanceSpec,
SpdkInstanceSpec: spdkInstanceSpec,

UpgradeRequired: req.Engine.UpgradeRequired,
InitiatorAddress: req.Engine.InitiatorAddress,
TargetAddress: req.Engine.TargetAddress,
},
})
if err != nil {
Expand All @@ -178,6 +196,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api
return api.RPCToInstance(p), nil
}

// InstanceDelete deletes the instance by name.
func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, diskUUID string, cleanupRequired bool) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to delete instance: missing required parameter name")
Expand Down Expand Up @@ -207,6 +226,7 @@ func (c *InstanceServiceClient) InstanceDelete(dataEngine, name, instanceType, d
return api.RPCToInstance(p), nil
}

// InstanceGet returns the instance by name.
func (c *InstanceServiceClient) InstanceGet(dataEngine, name, instanceType string) (*api.Instance, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand Down Expand Up @@ -246,6 +266,7 @@ func (c *InstanceServiceClient) InstanceList() (map[string]*api.Instance, error)
return api.RPCToInstanceList(instances), nil
}

// InstanceLog returns the log stream of an instance.
func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, name, instanceType string) (*api.LogStream, error) {
if name == "" {
return nil, fmt.Errorf("failed to get instance: missing required parameter name")
Expand All @@ -270,6 +291,7 @@ func (c *InstanceServiceClient) InstanceLog(ctx context.Context, dataEngine, nam
return api.NewLogStream(stream), nil
}

// InstanceWatch watches for instance updates.
func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.InstanceStream, error) {
client := c.getControllerServiceClient()
stream, err := client.InstanceWatch(ctx, &emptypb.Empty{})
Expand All @@ -280,6 +302,7 @@ func (c *InstanceServiceClient) InstanceWatch(ctx context.Context) (*api.Instanc
return api.NewInstanceStream(stream), nil
}

// InstanceReplace replaces an instance with a new one.
func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType, binary string, portCount int, args, portArgs []string, terminateSignal string) (*api.Instance, error) {
if name == "" || binary == "" {
return nil, fmt.Errorf("failed to replace instance: missing required parameter")
Expand Down Expand Up @@ -320,6 +343,93 @@ func (c *InstanceServiceClient) InstanceReplace(dataEngine, name, instanceType,
return api.RPCToInstance(p), nil
}

// InstanceSuspend suspends an instance.
func (c *InstanceServiceClient) InstanceSuspend(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to suspend instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to suspend instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSuspend(ctx, &rpc.InstanceSuspendRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to suspend instance %v", name)
}

return nil
}

// InstanceResume suspends an instance.
func (c *InstanceServiceClient) InstanceResume(dataEngine, name, instanceType string) error {
if name == "" {
return fmt.Errorf("failed to resume instance: missing required parameter name")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to resume instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceResume(ctx, &rpc.InstanceResumeRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
})
if err != nil {
return errors.Wrapf(err, "failed to resume instance %v", name)
}

return nil
}

// InstanceSwitchOver switches over the target for an instance.
func (c *InstanceServiceClient) InstanceSwitchOver(dataEngine, name, instanceType, targetAddress string) error {
if name == "" {
return fmt.Errorf("failed to switch over target for instance: missing required parameter name")
}

if targetAddress == "" {
return fmt.Errorf("failed to switch over target for instance: missing required parameter target address")
}

driver, ok := rpc.DataEngine_value[getDataEngine(dataEngine)]
if !ok {
return fmt.Errorf("failed to switch over target instance: invalid data engine %v", dataEngine)
}

client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
defer cancel()

_, err := client.InstanceSwitchOver(ctx, &rpc.InstanceSwitchOverRequest{
Name: name,
Type: instanceType,
DataEngine: rpc.DataEngine(driver),
TargetAddress: targetAddress,
})
if err != nil {
return errors.Wrapf(err, "failed to switch over target instance %v", name)
}

return nil
}

// InstanceResume resumes an instance.
func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -343,6 +453,7 @@ func (c *InstanceServiceClient) VersionGet() (*meta.VersionOutput, error) {
}, nil
}

// LogSetLevel sets the log level of the service.
func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -360,6 +471,7 @@ func (c *InstanceServiceClient) LogSetLevel(dataEngine, service, level string) e
return err
}

// LogSetFlags sets the log flags of the service.x
func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) error {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -377,6 +489,7 @@ func (c *InstanceServiceClient) LogSetFlags(dataEngine, service, flags string) e
return err
}

// LogGetLevel returns the log level of the service.
func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand All @@ -396,6 +509,7 @@ func (c *InstanceServiceClient) LogGetLevel(dataEngine, service string) (string,
return resp.Level, nil
}

// LogGetFlags returns the log flags of the service.
func (c *InstanceServiceClient) LogGetFlags(dataEngine, service string) (string, error) {
client := c.getControllerServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), types.GRPCServiceTimeout)
Expand Down
Loading

0 comments on commit 492521d

Please sign in to comment.