14
14
15
15
package p2pstore
16
16
17
+ /*
18
+ * All memory pointed to by the "char *" parameters will not be used
19
+ * after the C function returns.
20
+ * This means that the caller can free the memory pointed to by "char *"
21
+ * parameters, after the call is completed.
22
+ * All the C functions used here follow this convention.
23
+ */
24
+
17
25
//#cgo LDFLAGS: -L../../../build/mooncake-transfer-engine/src -L../../../thirdparties/lib -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc
18
26
//#include "../../../mooncake-transfer-engine/include/transfer_engine_c.h"
19
27
import "C"
20
28
21
29
import (
22
- "unsafe"
23
30
"net"
24
31
"strconv"
32
+ "unsafe"
25
33
)
26
34
27
35
type BatchID int64
28
36
29
37
type TransferEngine struct {
30
38
engine C.transfer_engine_t
31
- xport C.transport_t
39
+ xport C.transport_t
32
40
}
33
41
34
42
func parseServerName (serverName string ) (host string , port int ) {
@@ -45,35 +53,45 @@ func parseServerName(serverName string) (host string, port int) {
45
53
return host , port
46
54
}
47
55
56
+ const (
57
+ rdmaCStr = C .CString ("rdma" )
58
+ )
59
+
48
60
func NewTransferEngine (metadata_uri string , local_server_name string , nic_priority_matrix string ) (* TransferEngine , error ) {
49
61
// For simplifiy, local_server_name must be a valid IP address or hostname
50
62
connectable_name , rpc_port := parseServerName (local_server_name )
51
63
52
- native_engine := C .createTransferEngine (C .CString (metadata_uri ),
53
- C .CString (local_server_name ),
54
- C .CString (connectable_name ),
55
- C .uint64_t (rpc_port ))
64
+ metadataUri := C .CString (metadata_uri )
65
+ localServerName := C .CString (local_server_name )
66
+ connectableName := C .CString (connectable_name )
67
+ nicPriorityMatrix := C .CString (nic_priority_matrix )
68
+ defer C .free (unsafe .Pointer (metadataUri ))
69
+ defer C .free (unsafe .Pointer (localServerName ))
70
+ defer C .free (unsafe .Pointer (connectableName ))
71
+ defer C .free (unsafe .Pointer (nicPriorityMatrix ))
72
+
73
+ native_engine := C .createTransferEngine (metadataUri , localServerName , connectableName , C .uint64_t (rpc_port ))
56
74
if native_engine == nil {
57
75
return nil , ErrTransferEngine
58
76
}
59
77
60
78
var args [2 ]unsafe.Pointer
61
- args [0 ] = unsafe .Pointer (C . CString ( nic_priority_matrix ) )
79
+ args [0 ] = unsafe .Pointer (nicPriorityMatrix )
62
80
args [1 ] = nil
63
- xport := C .installTransport (native_engine , C . CString ( "rdma" ) , & args [0 ])
81
+ xport := C .installTransport (native_engine , rdmaCStr , & args [0 ])
64
82
if xport == nil {
65
83
C .destroyTransferEngine (native_engine )
66
84
return nil , ErrTransferEngine
67
85
}
68
86
69
87
return & TransferEngine {
70
88
engine : native_engine ,
71
- xport :xport ,
89
+ xport : xport ,
72
90
}, nil
73
91
}
74
92
75
93
func (engine * TransferEngine ) Close () error {
76
- ret := C .uninstallTransport (engine .engine , C . CString ( "rdma" ) )
94
+ ret := C .uninstallTransport (engine .engine , rdmaCStr )
77
95
if ret < 0 {
78
96
return ErrTransferEngine
79
97
}
@@ -83,7 +101,9 @@ func (engine *TransferEngine) Close() error {
83
101
}
84
102
85
103
func (engine * TransferEngine ) registerLocalMemory (addr uintptr , length uint64 , location string ) error {
86
- ret := C .registerLocalMemory (engine .engine , unsafe .Pointer (addr ), C .size_t (length ), C .CString (location ), 1 )
104
+ locationCStr := C .CString (location )
105
+ defer C .free (unsafe .Pointer (locationCStr ))
106
+ ret := C .registerLocalMemory (engine .engine , unsafe .Pointer (addr ), C .size_t (length ), locationCStr , 1 )
87
107
if ret < 0 {
88
108
return ErrTransferEngine
89
109
}
@@ -163,7 +183,10 @@ func (engine *TransferEngine) freeBatchID(batchID BatchID) error {
163
183
}
164
184
165
185
func (engine * TransferEngine ) openSegment (name string ) (int64 , error ) {
166
- ret := C .openSegment (engine .engine , C .CString (name ))
186
+ nameCStr := C .CString (name )
187
+ defer C .free (unsafe .Pointer (nameCStr ))
188
+
189
+ ret := C .openSegment (engine .engine , nameCStr )
167
190
if ret < 0 {
168
191
return - 1 , ErrTransferEngine
169
192
}
@@ -184,4 +207,4 @@ func (engine *TransferEngine) syncSegmentCache() error {
184
207
return ErrTransferEngine
185
208
}
186
209
return nil
187
- }
210
+ }
0 commit comments