Skip to content

Commit c61692e

Browse files
committed
feat(backend): add malloc volume
Signed-off-by: Moshe Shahar <mooshons@gmail.com>
1 parent 6182ef4 commit c61692e

File tree

6 files changed

+1200
-2
lines changed

6 files changed

+1200
-2
lines changed

cmd/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ func runGrpcServer(grpcPort int, useKvm bool, store gokv.Store, spdkAddress, qmp
172172

173173
pb.RegisterNvmeRemoteControllerServiceServer(s, backendServer)
174174
pb.RegisterNullVolumeServiceServer(s, backendServer)
175+
pb.RegisterMallocVolumeServiceServer(s, backendServer)
175176
pb.RegisterAioVolumeServiceServer(s, backendServer)
176177
pb.RegisterMiddleendEncryptionServiceServer(s, middleendServer)
177178
pb.RegisterMiddleendQosVolumeServiceServer(s, middleendServer)
@@ -199,6 +200,7 @@ func runGatewayServer(grpcPort int, httpPort int) {
199200

200201
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterAioVolumeServiceHandlerFromEndpoint, "backend aio")
201202
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterNullVolumeServiceHandlerFromEndpoint, "backend null")
203+
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterMallocVolumeServiceHandlerFromEndpoint, "backend malloc")
202204
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterNvmeRemoteControllerServiceHandlerFromEndpoint, "backend nvme")
203205

204206
registerGatewayHandler(ctx, mux, endpoint, opts, pb.RegisterMiddleendEncryptionServiceHandlerFromEndpoint, "middleend encryption")

pkg/backend/backend.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020

2121
// VolumeParameters contains all BackEnd volume related structures
2222
type VolumeParameters struct {
23-
AioVolumes map[string]*pb.AioVolume
24-
NullVolumes map[string]*pb.NullVolume
23+
AioVolumes map[string]*pb.AioVolume
24+
NullVolumes map[string]*pb.NullVolume
25+
MallocVolumes map[string]*pb.MallocVolume
2526

2627
NvmeControllers map[string]*pb.NvmeRemoteController
2728
NvmePaths map[string]*pb.NvmePath
@@ -31,6 +32,7 @@ type VolumeParameters struct {
3132
type Server struct {
3233
pb.UnimplementedNvmeRemoteControllerServiceServer
3334
pb.UnimplementedNullVolumeServiceServer
35+
pb.UnimplementedMallocVolumeServiceServer
3436
pb.UnimplementedAioVolumeServiceServer
3537

3638
rpc spdk.JSONRPC
@@ -55,6 +57,7 @@ func NewServer(jsonRPC spdk.JSONRPC, store gokv.Store) *Server {
5557
Volumes: VolumeParameters{
5658
AioVolumes: make(map[string]*pb.AioVolume),
5759
NullVolumes: make(map[string]*pb.NullVolume),
60+
MallocVolumes: make(map[string]*pb.MallocVolume),
5861
NvmeControllers: make(map[string]*pb.NvmeRemoteController),
5962
NvmePaths: make(map[string]*pb.NvmePath),
6063
},

pkg/backend/backend_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ var checkGlobalTestProtoObjectsNotChanged = utils.CheckTestProtoObjectsNotChange
2727
&testAioVolumeWithName,
2828
&testNullVolume,
2929
&testNullVolumeWithName,
30+
&testMallocVolume,
31+
&testMallocVolumeWithName,
3032
&testNvmeCtrl,
3133
&testNvmeCtrlWithName,
3234
&testNvmePath,
@@ -38,6 +40,7 @@ var checkGlobalTestProtoObjectsNotChanged = utils.CheckTestProtoObjectsNotChange
3840
type backendClient struct {
3941
pb.NvmeRemoteControllerServiceClient
4042
pb.NullVolumeServiceClient
43+
pb.MallocVolumeServiceClient
4144
pb.AioVolumeServiceClient
4245
}
4346

@@ -83,6 +86,7 @@ func createTestEnvironment(spdkResponses []string) *testEnv {
8386
env.client = &backendClient{
8487
pb.NewNvmeRemoteControllerServiceClient(env.conn),
8588
pb.NewNullVolumeServiceClient(env.conn),
89+
pb.NewMallocVolumeServiceClient(env.conn),
8690
pb.NewAioVolumeServiceClient(env.conn),
8791
}
8892

@@ -94,6 +98,7 @@ func dialer(opiSpdkServer *Server) func(context.Context, string) (net.Conn, erro
9498
server := grpc.NewServer()
9599
pb.RegisterNvmeRemoteControllerServiceServer(server, opiSpdkServer)
96100
pb.RegisterNullVolumeServiceServer(server, opiSpdkServer)
101+
pb.RegisterMallocVolumeServiceServer(server, opiSpdkServer)
97102
pb.RegisterAioVolumeServiceServer(server, opiSpdkServer)
98103

99104
go func() {

pkg/backend/malloc.go

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// Copyright (c) 2022-2024 Dell Inc, or its subsidiaries.
3+
// Copyright (C) 2023 Intel Corporation
4+
5+
// Package backend implememnts the BackEnd APIs (network facing) of the storage Server
6+
package backend
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"log"
12+
"path"
13+
"sort"
14+
15+
"github.com/opiproject/gospdk/spdk"
16+
pb "github.com/opiproject/opi-api/storage/v1alpha1/gen/go"
17+
"github.com/opiproject/opi-spdk-bridge/pkg/utils"
18+
19+
"github.com/google/uuid"
20+
"go.einride.tech/aip/fieldbehavior"
21+
"go.einride.tech/aip/fieldmask"
22+
"go.einride.tech/aip/resourceid"
23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
25+
"google.golang.org/protobuf/types/known/emptypb"
26+
)
27+
28+
func sortMallocVolumes(volumes []*pb.MallocVolume) {
29+
sort.Slice(volumes, func(i int, j int) bool {
30+
return volumes[i].Name < volumes[j].Name
31+
})
32+
}
33+
34+
// CreateMallocVolume creates a Malloc volume instance
35+
func (s *Server) CreateMallocVolume(ctx context.Context, in *pb.CreateMallocVolumeRequest) (*pb.MallocVolume, error) {
36+
// check input correctness
37+
if err := s.validateCreateMallocVolumeRequest(in); err != nil {
38+
return nil, err
39+
}
40+
// see https://google.aip.dev/133#user-specified-ids
41+
resourceID := resourceid.NewSystemGenerated()
42+
if in.MallocVolumeId != "" {
43+
log.Printf("client provided the ID of a resource %v, ignoring the name field %v", in.MallocVolumeId, in.MallocVolume.Name)
44+
resourceID = in.MallocVolumeId
45+
}
46+
in.MallocVolume.Name = utils.ResourceIDToVolumeName(resourceID)
47+
// idempotent API when called with same key, should return same object
48+
volume, ok := s.Volumes.MallocVolumes[in.MallocVolume.Name]
49+
if ok {
50+
log.Printf("Already existing MallocVolume with id %v", in.MallocVolume.Name)
51+
return volume, nil
52+
}
53+
// not found, so create a new one
54+
params := spdk.BdevMallocCreateParams{
55+
Name: resourceID,
56+
BlockSize: int(in.GetMallocVolume().GetBlockSize()),
57+
NumBlocks: int(in.GetMallocVolume().GetBlocksCount()),
58+
MdSize: int(in.GetMallocVolume().GetMetadataSize()),
59+
MdInterleave: true,
60+
}
61+
var result spdk.BdevMallocCreateResult
62+
err := s.rpc.Call(ctx, "bdev_malloc_create", &params, &result)
63+
if err != nil {
64+
return nil, err
65+
}
66+
log.Printf("Received from SPDK: %v", result)
67+
if result == "" {
68+
msg := fmt.Sprintf("Could not create Malloc Dev: %s", params.Name)
69+
return nil, status.Errorf(codes.InvalidArgument, msg)
70+
}
71+
response := utils.ProtoClone(in.MallocVolume)
72+
s.Volumes.MallocVolumes[in.MallocVolume.Name] = response
73+
return response, nil
74+
}
75+
76+
// DeleteMallocVolume deletes a Malloc volume instance
77+
func (s *Server) DeleteMallocVolume(ctx context.Context, in *pb.DeleteMallocVolumeRequest) (*emptypb.Empty, error) {
78+
// check input correctness
79+
if err := s.validateDeleteMallocVolumeRequest(in); err != nil {
80+
return nil, err
81+
}
82+
// fetch object from the database
83+
volume, ok := s.Volumes.MallocVolumes[in.Name]
84+
if !ok {
85+
if in.AllowMissing {
86+
return &emptypb.Empty{}, nil
87+
}
88+
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
89+
return nil, err
90+
}
91+
resourceID := path.Base(volume.Name)
92+
params := spdk.BdevMallocDeleteParams{
93+
Name: resourceID,
94+
}
95+
var result spdk.BdevMallocDeleteResult
96+
err := s.rpc.Call(ctx, "bdev_malloc_delete", &params, &result)
97+
if err != nil {
98+
return nil, err
99+
}
100+
log.Printf("Received from SPDK: %v", result)
101+
if !result {
102+
msg := fmt.Sprintf("Could not delete Malloc Dev: %s", params.Name)
103+
return nil, status.Errorf(codes.InvalidArgument, msg)
104+
}
105+
delete(s.Volumes.MallocVolumes, volume.Name)
106+
return &emptypb.Empty{}, nil
107+
}
108+
109+
// UpdateMallocVolume updates a Malloc volume instance
110+
func (s *Server) UpdateMallocVolume(ctx context.Context, in *pb.UpdateMallocVolumeRequest) (*pb.MallocVolume, error) {
111+
// check input correctness
112+
if err := s.validateUpdateMallocVolumeRequest(in); err != nil {
113+
return nil, err
114+
}
115+
// fetch object from the database
116+
volume, ok := s.Volumes.MallocVolumes[in.MallocVolume.Name]
117+
if !ok {
118+
if in.AllowMissing {
119+
log.Printf("Got AllowMissing, create a new resource, don't return error when resource not found")
120+
params := spdk.BdevMallocCreateParams{
121+
Name: path.Base(in.MallocVolume.Name),
122+
BlockSize: int(in.GetMallocVolume().GetBlockSize()),
123+
NumBlocks: int(in.GetMallocVolume().GetBlocksCount()),
124+
}
125+
var result spdk.BdevMallocCreateResult
126+
err := s.rpc.Call(ctx, "bdev_malloc_create", &params, &result)
127+
if err != nil {
128+
return nil, err
129+
}
130+
log.Printf("Received from SPDK: %v", result)
131+
if result == "" {
132+
msg := fmt.Sprintf("Could not create Malloc Dev: %s", params.Name)
133+
return nil, status.Errorf(codes.InvalidArgument, msg)
134+
}
135+
response := utils.ProtoClone(in.MallocVolume)
136+
s.Volumes.MallocVolumes[in.MallocVolume.Name] = response
137+
return response, nil
138+
}
139+
err := status.Errorf(codes.NotFound, "unable to find key %s", in.MallocVolume.Name)
140+
return nil, err
141+
}
142+
resourceID := path.Base(volume.Name)
143+
// update_mask = 2
144+
if err := fieldmask.Validate(in.UpdateMask, in.MallocVolume); err != nil {
145+
return nil, err
146+
}
147+
params1 := spdk.BdevMallocDeleteParams{
148+
Name: resourceID,
149+
}
150+
var result1 spdk.BdevMallocDeleteResult
151+
err1 := s.rpc.Call(ctx, "bdev_malloc_delete", &params1, &result1)
152+
if err1 != nil {
153+
return nil, err1
154+
}
155+
log.Printf("Received from SPDK: %v", result1)
156+
if !result1 {
157+
msg := fmt.Sprintf("Could not delete Malloc Dev: %s", params1.Name)
158+
return nil, status.Errorf(codes.InvalidArgument, msg)
159+
}
160+
params2 := spdk.BdevMallocCreateParams{
161+
Name: resourceID,
162+
BlockSize: 512,
163+
NumBlocks: 64,
164+
}
165+
var result2 spdk.BdevMallocCreateResult
166+
err2 := s.rpc.Call(ctx, "bdev_malloc_create", &params2, &result2)
167+
if err2 != nil {
168+
return nil, err2
169+
}
170+
log.Printf("Received from SPDK: %v", result2)
171+
if result2 == "" {
172+
msg := fmt.Sprintf("Could not create Malloc Dev: %s", params2.Name)
173+
return nil, status.Errorf(codes.InvalidArgument, msg)
174+
}
175+
response := utils.ProtoClone(in.MallocVolume)
176+
s.Volumes.MallocVolumes[in.MallocVolume.Name] = response
177+
return response, nil
178+
}
179+
180+
// ListMallocVolumes lists Malloc volume instances
181+
func (s *Server) ListMallocVolumes(ctx context.Context, in *pb.ListMallocVolumesRequest) (*pb.ListMallocVolumesResponse, error) {
182+
// check required fields
183+
if err := fieldbehavior.ValidateRequiredFields(in); err != nil {
184+
return nil, err
185+
}
186+
// fetch object from the database
187+
size, offset, perr := utils.ExtractPagination(in.PageSize, in.PageToken, s.Pagination)
188+
if perr != nil {
189+
return nil, perr
190+
}
191+
var result []spdk.BdevGetBdevsResult
192+
err := s.rpc.Call(ctx, "bdev_get_bdevs", nil, &result)
193+
if err != nil {
194+
return nil, err
195+
}
196+
log.Printf("Received from SPDK: %v", result)
197+
token := ""
198+
log.Printf("Limiting result len(%d) to [%d:%d]", len(result), offset, size)
199+
result, hasMoreElements := utils.LimitPagination(result, offset, size)
200+
if hasMoreElements {
201+
token = uuid.New().String()
202+
s.Pagination[token] = offset + size
203+
}
204+
Blobarray := make([]*pb.MallocVolume, len(result))
205+
for i := range result {
206+
r := &result[i]
207+
Blobarray[i] = &pb.MallocVolume{Name: r.Name, Uuid: r.UUID, BlockSize: r.BlockSize, BlocksCount: r.NumBlocks}
208+
}
209+
sortMallocVolumes(Blobarray)
210+
return &pb.ListMallocVolumesResponse{MallocVolumes: Blobarray, NextPageToken: token}, nil
211+
}
212+
213+
// GetMallocVolume gets a a Malloc volume instance
214+
func (s *Server) GetMallocVolume(ctx context.Context, in *pb.GetMallocVolumeRequest) (*pb.MallocVolume, error) {
215+
// check input correctness
216+
if err := s.validateGetMallocVolumeRequest(in); err != nil {
217+
return nil, err
218+
}
219+
// fetch object from the database
220+
volume, ok := s.Volumes.MallocVolumes[in.Name]
221+
if !ok {
222+
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
223+
return nil, err
224+
}
225+
resourceID := path.Base(volume.Name)
226+
params := spdk.BdevGetBdevsParams{
227+
Name: resourceID,
228+
}
229+
var result []spdk.BdevGetBdevsResult
230+
err := s.rpc.Call(ctx, "bdev_get_bdevs", &params, &result)
231+
if err != nil {
232+
return nil, err
233+
}
234+
log.Printf("Received from SPDK: %v", result)
235+
if len(result) != 1 {
236+
msg := fmt.Sprintf("expecting exactly 1 result, got %d", len(result))
237+
return nil, status.Errorf(codes.InvalidArgument, msg)
238+
}
239+
return &pb.MallocVolume{Name: result[0].Name, Uuid: result[0].UUID, BlockSize: result[0].BlockSize, BlocksCount: result[0].NumBlocks}, nil
240+
}
241+
242+
// StatsMallocVolume gets a Malloc volume instance stats
243+
func (s *Server) StatsMallocVolume(ctx context.Context, in *pb.StatsMallocVolumeRequest) (*pb.StatsMallocVolumeResponse, error) {
244+
// check input correctness
245+
if err := s.validateStatsMallocVolumeRequest(in); err != nil {
246+
return nil, err
247+
}
248+
// fetch object from the database
249+
volume, ok := s.Volumes.MallocVolumes[in.Name]
250+
if !ok {
251+
err := status.Errorf(codes.NotFound, "unable to find key %s", in.Name)
252+
return nil, err
253+
}
254+
resourceID := path.Base(volume.Name)
255+
params := spdk.BdevGetIostatParams{
256+
Name: resourceID,
257+
}
258+
// See https://mholt.github.io/json-to-go/
259+
var result spdk.BdevGetIostatResult
260+
err := s.rpc.Call(ctx, "bdev_get_iostat", &params, &result)
261+
if err != nil {
262+
return nil, err
263+
}
264+
log.Printf("Received from SPDK: %v", result)
265+
if len(result.Bdevs) != 1 {
266+
msg := fmt.Sprintf("expecting exactly 1 result, got %d", len(result.Bdevs))
267+
return nil, status.Errorf(codes.InvalidArgument, msg)
268+
}
269+
return &pb.StatsMallocVolumeResponse{Stats: &pb.VolumeStats{
270+
ReadBytesCount: int32(result.Bdevs[0].BytesRead),
271+
ReadOpsCount: int32(result.Bdevs[0].NumReadOps),
272+
WriteBytesCount: int32(result.Bdevs[0].BytesWritten),
273+
WriteOpsCount: int32(result.Bdevs[0].NumWriteOps),
274+
UnmapBytesCount: int32(result.Bdevs[0].BytesUnmapped),
275+
UnmapOpsCount: int32(result.Bdevs[0].NumUnmapOps),
276+
ReadLatencyTicks: int32(result.Bdevs[0].ReadLatencyTicks),
277+
WriteLatencyTicks: int32(result.Bdevs[0].WriteLatencyTicks),
278+
UnmapLatencyTicks: int32(result.Bdevs[0].UnmapLatencyTicks),
279+
}}, nil
280+
}

0 commit comments

Comments
 (0)