Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): replace maps with gokv.Store abstraction #671

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 50 additions & 18 deletions pkg/backend/aio.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ func (s *Server) CreateAioVolume(_ context.Context, in *pb.CreateAioVolumeReques
}
in.AioVolume.Name = utils.ResourceIDToVolumeName(resourceID)
// idempotent API when called with same key, should return same object
volume, ok := s.Volumes.AioVolumes[in.AioVolume.Name]
if ok {
volume := new(pb.AioVolume)
found, err := s.store.Get(in.AioVolume.Name, volume)
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd explicitly set gRPC error code as Internal in all storage related operations by means of status.Errof(codes.Internal,...)

}
if found {
log.Printf("Already existing AioVolume with id %v", in.AioVolume.Name)
return volume, nil
}
Expand All @@ -57,7 +61,7 @@ func (s *Server) CreateAioVolume(_ context.Context, in *pb.CreateAioVolumeReques
Filename: in.AioVolume.Filename,
}
var result spdk.BdevAioCreateResult
err := s.rpc.Call("bdev_aio_create", &params, &result)
err = s.rpc.Call("bdev_aio_create", &params, &result)
if err != nil {
return nil, err
}
Expand All @@ -67,7 +71,10 @@ func (s *Server) CreateAioVolume(_ context.Context, in *pb.CreateAioVolumeReques
return nil, status.Errorf(codes.InvalidArgument, msg)
}
response := utils.ProtoClone(in.AioVolume)
s.Volumes.AioVolumes[in.AioVolume.Name] = response
err = s.store.Set(in.AioVolume.Name, response)
if err != nil {
return nil, err
}
return response, nil
}

Expand All @@ -78,8 +85,12 @@ func (s *Server) DeleteAioVolume(_ context.Context, in *pb.DeleteAioVolumeReques
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.AioVolumes[in.Name]
if !ok {
volume := new(pb.AioVolume)
found, err := s.store.Get(in.Name, volume)
if err != nil {
return nil, err
}
if !found {
if in.AllowMissing {
return &emptypb.Empty{}, nil
}
Expand All @@ -91,7 +102,7 @@ func (s *Server) DeleteAioVolume(_ context.Context, in *pb.DeleteAioVolumeReques
Name: resourceID,
}
var result spdk.BdevAioDeleteResult
err := s.rpc.Call("bdev_aio_delete", &params, &result)
err = s.rpc.Call("bdev_aio_delete", &params, &result)
if err != nil {
return nil, err
}
Expand All @@ -100,7 +111,10 @@ func (s *Server) DeleteAioVolume(_ context.Context, in *pb.DeleteAioVolumeReques
msg := fmt.Sprintf("Could not delete Aio Dev: %s", params.Name)
return nil, status.Errorf(codes.InvalidArgument, msg)
}
delete(s.Volumes.AioVolumes, volume.Name)
err = s.store.Delete(volume.Name)
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}

Expand All @@ -111,8 +125,12 @@ func (s *Server) UpdateAioVolume(_ context.Context, in *pb.UpdateAioVolumeReques
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.AioVolumes[in.AioVolume.Name]
if !ok {
volume := new(pb.AioVolume)
found, err := s.store.Get(in.AioVolume.Name, volume)
Fixed Show fixed Hide fixed
if err != nil {
return nil, err
}
if !found {
if in.AllowMissing {
log.Printf("Got AllowMissing, create a new resource, don't return error when resource not found")
params := spdk.BdevAioCreateParams{
Expand All @@ -131,7 +149,10 @@ func (s *Server) UpdateAioVolume(_ context.Context, in *pb.UpdateAioVolumeReques
return nil, status.Errorf(codes.InvalidArgument, msg)
}
response := utils.ProtoClone(in.AioVolume)
s.Volumes.AioVolumes[in.AioVolume.Name] = response
err = s.store.Set(in.AioVolume.Name, response)
if err != nil {
return nil, err
}
return response, nil
}
err := status.Errorf(codes.NotFound, "unable to find key %s", in.AioVolume.Name)
Expand Down Expand Up @@ -171,7 +192,10 @@ func (s *Server) UpdateAioVolume(_ context.Context, in *pb.UpdateAioVolumeReques
return nil, status.Errorf(codes.InvalidArgument, msg)
}
response := utils.ProtoClone(in.AioVolume)
s.Volumes.AioVolumes[in.AioVolume.Name] = response
err = s.store.Set(in.AioVolume.Name, response)
if err != nil {
return nil, err
}
return response, nil
}

Expand Down Expand Up @@ -215,8 +239,12 @@ func (s *Server) GetAioVolume(_ context.Context, in *pb.GetAioVolumeRequest) (*p
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.AioVolumes[in.Name]
if !ok {
volume := new(pb.AioVolume)
found, err := s.store.Get(in.Name, volume)
Fixed Show fixed Hide fixed
if err != nil {
return nil, err
}
if !found {
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
return nil, err
}
Expand All @@ -225,7 +253,7 @@ func (s *Server) GetAioVolume(_ context.Context, in *pb.GetAioVolumeRequest) (*p
Name: resourceID,
}
var result []spdk.BdevGetBdevsResult
err := s.rpc.Call("bdev_get_bdevs", &params, &result)
err = s.rpc.Call("bdev_get_bdevs", &params, &result)
if err != nil {
return nil, err
}
Expand All @@ -244,8 +272,12 @@ func (s *Server) StatsAioVolume(_ context.Context, in *pb.StatsAioVolumeRequest)
return nil, err
}
// fetch object from the database
volume, ok := s.Volumes.AioVolumes[in.Name]
if !ok {
volume := new(pb.AioVolume)
found, err := s.store.Get(in.Name, volume)
Fixed Show fixed Hide fixed
if err != nil {
return nil, err
}
if !found {
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
return nil, err
}
Expand All @@ -255,7 +287,7 @@ func (s *Server) StatsAioVolume(_ context.Context, in *pb.StatsAioVolumeRequest)
}
// See https://mholt.github.io/json-to-go/
var result spdk.BdevGetIostatResult
err := s.rpc.Call("bdev_get_iostat", &params, &result)
err = s.rpc.Call("bdev_get_iostat", &params, &result)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/backend/aio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestBackEnd_CreateAioVolume(t *testing.T) {
defer testEnv.Close()

if tt.exist {
testEnv.opiSpdkServer.Volumes.AioVolumes[testAioVolumeName] = utils.ProtoClone(&testAioVolumeWithName)
testEnv.opiSpdkServer.store.Set(testAioVolumeName, &testAioVolumeWithName)
}
if tt.out != nil {
tt.out = utils.ProtoClone(tt.out)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestBackEnd_UpdateAioVolume(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.AioVolumes[testAioVolumeName] = utils.ProtoClone(&testAioVolumeWithName)
testEnv.opiSpdkServer.store.Set(testAioVolumeName, &testAioVolumeWithName)

request := &pb.UpdateAioVolumeRequest{AioVolume: tt.in, UpdateMask: tt.mask, AllowMissing: tt.missing}
response, err := testEnv.client.UpdateAioVolume(testEnv.ctx, request)
Expand Down Expand Up @@ -604,7 +604,7 @@ func TestBackEnd_GetAioVolume(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.AioVolumes[testAioVolumeName] = utils.ProtoClone(&testAioVolumeWithName)
testEnv.opiSpdkServer.store.Set(testAioVolumeName, &testAioVolumeWithName)

request := &pb.GetAioVolumeRequest{Name: tt.in}
response, err := testEnv.client.GetAioVolume(testEnv.ctx, request)
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestBackEnd_StatsAioVolume(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.AioVolumes[testAioVolumeName] = utils.ProtoClone(&testAioVolumeWithName)
testEnv.opiSpdkServer.store.Set(testAioVolumeName, &testAioVolumeWithName)

request := &pb.StatsAioVolumeRequest{Name: tt.in}
response, err := testEnv.client.StatsAioVolume(testEnv.ctx, request)
Expand Down Expand Up @@ -820,7 +820,7 @@ func TestBackEnd_DeleteAioVolume(t *testing.T) {
testEnv := createTestEnvironment(tt.spdk)
defer testEnv.Close()

testEnv.opiSpdkServer.Volumes.AioVolumes[testAioVolumeName] = utils.ProtoClone(&testAioVolumeWithName)
testEnv.opiSpdkServer.store.Set(testAioVolumeName, &testAioVolumeWithName)

request := &pb.DeleteAioVolumeRequest{Name: tt.in, AllowMissing: tt.missing}
response, err := testEnv.client.DeleteAioVolume(testEnv.ctx, request)
Expand Down
23 changes: 2 additions & 21 deletions pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,6 @@ import (
pb "github.com/opiproject/opi-api/storage/v1alpha1/gen/go"
)

// TODO: can we combine all of volume types into a single list?
// maybe create a volume abstraction like bdev in SPDK?

// VolumeParameters contains all BackEnd volume related structures
type VolumeParameters struct {
AioVolumes map[string]*pb.AioVolume
NullVolumes map[string]*pb.NullVolume

NvmeControllers map[string]*pb.NvmeRemoteController
NvmePaths map[string]*pb.NvmePath
}

// Server contains backend related OPI services
type Server struct {
pb.UnimplementedNvmeRemoteControllerServiceServer
Expand All @@ -35,7 +23,6 @@ type Server struct {

rpc spdk.JSONRPC
store gokv.Store
Volumes VolumeParameters
Pagination map[string]int
psk psk
}
Expand All @@ -55,14 +42,8 @@ func NewServer(jsonRPC spdk.JSONRPC, store gokv.Store) *Server {
log.Panic("nil for Store is not allowed")
}
return &Server{
rpc: jsonRPC,
store: store,
Volumes: VolumeParameters{
AioVolumes: make(map[string]*pb.AioVolume),
NullVolumes: make(map[string]*pb.NullVolume),
NvmeControllers: make(map[string]*pb.NvmeRemoteController),
NvmePaths: make(map[string]*pb.NvmePath),
},
rpc: jsonRPC,
store: store,
Pagination: make(map[string]int),
psk: psk{
createTempFile: os.CreateTemp,
Expand Down
Loading