Skip to content

Commit

Permalink
feat(backend): replace Nvme path map with gokv.Store abstraction
Browse files Browse the repository at this point in the history
Signed-off-by: Boris Glimcher <Boris.Glimcher@emc.com>
  • Loading branch information
glimchb committed Sep 28, 2023
1 parent 8dee0f9 commit 4ba64c7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
16 changes: 2 additions & 14 deletions pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,6 @@ import (
pb "github.com/opiproject/opi-api/storage/v1alpha1/gen/go"
)

// TODO: can we combine all of volume types into a single list?
// maybe create a volume abstraction like bdev in SPDK?

// VolumeParameters contains all BackEnd volume related structures
type VolumeParameters struct {
NvmePaths map[string]*pb.NvmePath
}

// Server contains backend related OPI services
type Server struct {
pb.UnimplementedNvmeRemoteControllerServiceServer
Expand All @@ -31,7 +23,6 @@ type Server struct {

rpc spdk.JSONRPC
store gokv.Store
Volumes VolumeParameters
Pagination map[string]int
psk psk
}
Expand All @@ -51,11 +42,8 @@ func NewServer(jsonRPC spdk.JSONRPC, store gokv.Store) *Server {
log.Panic("nil for Store is not allowed")
}
return &Server{
rpc: jsonRPC,
store: store,
Volumes: VolumeParameters{
NvmePaths: make(map[string]*pb.NvmePath),
},
rpc: jsonRPC,
store: store,
Pagination: make(map[string]int),
psk: psk{
createTempFile: os.CreateTemp,
Expand Down
78 changes: 56 additions & 22 deletions pkg/backend/nvme_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ func (s *Server) CreateNvmePath(_ context.Context, in *pb.CreateNvmePathRequest)
}
in.NvmePath.Name = utils.ResourceIDToVolumeName(resourceID)

nvmePath, ok := s.Volumes.NvmePaths[in.NvmePath.Name]
if ok {
nvmePath := new(pb.NvmePath)
found, err := s.store.Get(in.NvmePath.Name, nvmePath)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
}
if found {
log.Printf("Already existing NvmePath with id %v", in.NvmePath.Name)
return nvmePath, nil
}

controller := new(pb.NvmeRemoteController)
found, err := s.store.Get(in.NvmePath.ControllerNameRef, controller)
found, err = s.store.Get(in.NvmePath.ControllerNameRef, controller)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
Expand Down Expand Up @@ -115,7 +120,10 @@ func (s *Server) CreateNvmePath(_ context.Context, in *pb.CreateNvmePathRequest)
log.Printf("Received from SPDK: %v", result)

response := utils.ProtoClone(in.NvmePath)
s.Volumes.NvmePaths[in.NvmePath.Name] = response
err = s.store.Set(in.NvmePath.Name, response)
if err != nil {
return nil, err
}
log.Printf("CreateNvmePath: Sending to client: %v", response)
return response, nil
}
Expand All @@ -128,8 +136,13 @@ func (s *Server) DeleteNvmePath(_ context.Context, in *pb.DeleteNvmePathRequest)
log.Printf("error: %v", err)
return nil, err
}
nvmePath, ok := s.Volumes.NvmePaths[in.Name]
if !ok {
nvmePath := new(pb.NvmePath)
found, err := s.store.Get(in.Name, nvmePath)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
}
if !found {
if in.AllowMissing {
return &emptypb.Empty{}, nil
}
Expand All @@ -138,7 +151,7 @@ func (s *Server) DeleteNvmePath(_ context.Context, in *pb.DeleteNvmePathRequest)
return nil, err
}
controller := new(pb.NvmeRemoteController)
found, err := s.store.Get(nvmePath.ControllerNameRef, controller)
found, err = s.store.Get(nvmePath.ControllerNameRef, controller)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
Expand Down Expand Up @@ -171,7 +184,10 @@ func (s *Server) DeleteNvmePath(_ context.Context, in *pb.DeleteNvmePathRequest)
return nil, status.Errorf(codes.InvalidArgument, msg)
}

delete(s.Volumes.NvmePaths, in.Name)
err = s.store.Delete(nvmePath.Name)
if err != nil {
return nil, err
}

return &emptypb.Empty{}, nil
}
Expand All @@ -185,8 +201,13 @@ func (s *Server) UpdateNvmePath(_ context.Context, in *pb.UpdateNvmePathRequest)
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.NvmePaths[in.NvmePath.Name]
if !ok {
volume := new(pb.NvmePath)
found, err := s.store.Get(in.NvmePath.Name, volume)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
}
if !found {
if in.AllowMissing {
log.Printf("TODO: in case of AllowMissing, create a new resource, don;t return error")
}
Expand All @@ -202,7 +223,10 @@ func (s *Server) UpdateNvmePath(_ context.Context, in *pb.UpdateNvmePathRequest)
}
log.Printf("TODO: use resourceID=%v", resourceID)
response := utils.ProtoClone(in.NvmePath)
// s.Volumes.NvmePaths[in.NvmePath.Name] = response
// err = s.store.Set(in.NvmePath.Name, response)
// if err != nil {
// return nil, err
// }
return response, nil
}

Expand Down Expand Up @@ -252,15 +276,20 @@ func (s *Server) GetNvmePath(_ context.Context, in *pb.GetNvmePathRequest) (*pb.
return nil, err
}
// fetch object from the database
path, ok := s.Volumes.NvmePaths[in.Name]
if !ok {
path := new(pb.NvmePath)
found, err := s.store.Get(in.Name, path)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
}
if !found {
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
log.Printf("error: %v", err)
return nil, err
}

var result []spdk.BdevNvmeGetControllerResult
err := s.rpc.Call("bdev_nvme_get_controllers", nil, &result)
err = s.rpc.Call("bdev_nvme_get_controllers", nil, &result)
if err != nil {
log.Printf("error: %v", err)
return nil, err
Expand All @@ -287,16 +316,21 @@ func (s *Server) StatsNvmePath(_ context.Context, in *pb.StatsNvmePathRequest) (
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.NvmePaths[in.Name]
if !ok {
volume := new(pb.NvmePath)
found, err := s.store.Get(in.Name, volume)
if err != nil {
fmt.Printf("Failed to interact with store: %v", err)
return nil, err
}
if !found {
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
log.Printf("error: %v", err)
return nil, err
}
resourceID := path.Base(volume.Name)
log.Printf("TODO: send name to SPDK and get back stats: %v", resourceID)
var result spdk.NvmfGetSubsystemStatsResult
err := s.rpc.Call("nvmf_get_stats", nil, &result)
err = s.rpc.Call("nvmf_get_stats", nil, &result)
if err != nil {
log.Printf("error: %v", err)
return nil, err
Expand Down Expand Up @@ -325,11 +359,11 @@ func (s *Server) opiMultipathToSpdk(multipath pb.NvmeMultipath) string {

func (s *Server) numberOfPathsForController(controllerName string) int {
numberOfPaths := 0
for _, path := range s.Volumes.NvmePaths {
if path.ControllerNameRef == controllerName {
numberOfPaths++
}
}
// for _, path := range s.Volumes.NvmePaths {
// if path.ControllerNameRef == controllerName {
// numberOfPaths++
// }
// }
return numberOfPaths
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/backend/nvme_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestBackEnd_CreateNvmePath(t *testing.T) {
testEnv.opiSpdkServer.store.Set(testNvmeCtrlName, utils.ProtoClone(tt.controller))

if tt.exist {
testEnv.opiSpdkServer.Volumes.NvmePaths[testNvmePathName] = utils.ProtoClone(&testNvmePathWithName)
testEnv.opiSpdkServer.store.Set(testNvmePathName, &testNvmePathWithName)
}
if tt.out != nil {
tt.out = utils.ProtoClone(tt.out)
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestBackEnd_DeleteNvmePath(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.NvmePaths[testNvmePathName] = utils.ProtoClone(&testNvmePathWithName)
testEnv.opiSpdkServer.store.Set(testNvmePathName, &testNvmePathWithName)
testEnv.opiSpdkServer.store.Set(testNvmeCtrlName, &testNvmeCtrlWithName)

request := &pb.DeleteNvmePathRequest{Name: tt.in, AllowMissing: tt.missing}
Expand Down Expand Up @@ -637,7 +637,7 @@ func TestBackEnd_UpdateNvmePath(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.NvmePaths[testNvmePathName] = utils.ProtoClone(&testNvmePathWithName)
testEnv.opiSpdkServer.store.Set(testNvmePathName, &testNvmePathWithName)

request := &pb.UpdateNvmePathRequest{NvmePath: tt.in, UpdateMask: tt.mask, AllowMissing: tt.missing}
response, err := testEnv.client.UpdateNvmePath(testEnv.ctx, request)
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestBackEnd_GetNvmePath(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.NvmePaths[testNvmePathName] = utils.ProtoClone(&testNvmePathWithName)
testEnv.opiSpdkServer.store.Set(testNvmePathName, &testNvmePathWithName)

request := &pb.GetNvmePathRequest{Name: tt.in}
response, err := testEnv.client.GetNvmePath(testEnv.ctx, request)
Expand Down Expand Up @@ -1053,7 +1053,7 @@ func TestBackEnd_StatsNvmePath(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.NvmePaths[testNvmePathName] = utils.ProtoClone(&testNvmePathWithName)
testEnv.opiSpdkServer.store.Set(testNvmePathName, &testNvmePathWithName)

request := &pb.StatsNvmePathRequest{Name: tt.in}
response, err := testEnv.client.StatsNvmePath(testEnv.ctx, request)
Expand Down

0 comments on commit 4ba64c7

Please sign in to comment.