Skip to content

Commit

Permalink
apacheGH-38281: [Go] Ensure CData imported arrays are freed on release (
Browse files Browse the repository at this point in the history
apache#38314)

### Rationale for this change
The usage of `SetFinalizer` means that it's not *guaranteed* that calling `Release()` on an imported Record or Array will actually free the memory during the lifetime of the process. Instead we can leverage a shared buffer count, atomic ref counting and a custom allocator to ensure proper and more timely memory releasing when importing from C Data interface.

### What changes are included in this PR?
* Some simplifications of code to use `unsafe.Slice` instead of the deprecated handling of `reflect.SliceHeader` to improve readability
* Updating tests using `mallocator.Mallocator` in order to easily allow testing to ensure that memory is being cleaned up and freed
* Fixing a series of memory leaks subsequently found by the previous change of using the `mallocator.Mallocator` to track the allocations used for testing arrays.

### Are these changes tested?
Yes, unit tests are updated and included.

* Closes: apache#38281

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade authored Oct 18, 2023
1 parent d68f8e2 commit 0428c5e
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 102 deletions.
112 changes: 79 additions & 33 deletions go/arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"errors"
"fmt"
"io"
"reflect"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -152,13 +151,8 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {
var childFields []arrow.Field
if schema.n_children > 0 {
// call ourselves recursively if there are children.
var schemaChildren []*CArrowSchema
// set up a slice to reference safely
s := (*reflect.SliceHeader)(unsafe.Pointer(&schemaChildren))
s.Data = uintptr(unsafe.Pointer(schema.children))
s.Len = int(schema.n_children)
s.Cap = int(schema.n_children)

schemaChildren := unsafe.Slice(schema.children, schema.n_children)
childFields = make([]arrow.Field, schema.n_children)
for i, c := range schemaChildren {
childFields[i], err = importSchema((*CArrowSchema)(c))
Expand Down Expand Up @@ -341,21 +335,18 @@ type cimporter struct {
parent *cimporter
children []cimporter
cbuffers []*C.void

alloc *importAllocator
}

func (imp *cimporter) importChild(parent *cimporter, src *CArrowArray) error {
imp.parent = parent
return imp.doImport(src)
imp.parent, imp.arr, imp.alloc = parent, src, parent.alloc
return imp.doImport()
}

// import any child arrays for lists, structs, and so on.
func (imp *cimporter) doImportChildren() error {
var children []*CArrowArray
// create a proper slice for our children
s := (*reflect.SliceHeader)(unsafe.Pointer(&children))
s.Data = uintptr(unsafe.Pointer(imp.arr.children))
s.Len = int(imp.arr.n_children)
s.Cap = int(imp.arr.n_children)
children := unsafe.Slice(imp.arr.children, imp.arr.n_children)

if len(children) > 0 {
imp.children = make([]cimporter, len(children))
Expand Down Expand Up @@ -418,26 +409,44 @@ func (imp *cimporter) doImportChildren() error {

func (imp *cimporter) initarr() {
imp.arr = C.get_arr()
if imp.alloc == nil {
imp.alloc = &importAllocator{arr: imp.arr}
}
}

func (imp *cimporter) doImportArr(src *CArrowArray) error {
imp.arr = C.get_arr()
C.ArrowArrayMove(src, imp.arr)
if imp.alloc == nil {
imp.alloc = &importAllocator{arr: imp.arr}
}

// we tie the releasing of the array to when the buffers are
// cleaned up, so if there are no buffers that we've imported
// such as for a null array or a nested array with no bitmap
// and only null columns, then we can release the CArrowArray
// struct immediately after import, since we have no imported
// memory that we have to track the lifetime of.
defer func() {
if imp.alloc.bufCount == 0 {
C.ArrowArrayRelease(imp.arr)
}
}()

return imp.doImport()
}

// import is called recursively as needed for importing an array and its children
// in order to generate array.Data objects
func (imp *cimporter) doImport(src *CArrowArray) error {
imp.initarr()
func (imp *cimporter) doImport() error {
// move the array from the src object passed in to the one referenced by
// this importer. That way we can set up a finalizer on the created
// arrow.ArrayData object so we clean up our Array's memory when garbage collected.
C.ArrowArrayMove(src, imp.arr)
defer func(arr *CArrowArray) {
if imp.data != nil {
runtime.SetFinalizer(imp.data, func(arrow.ArrayData) {
defer C.free(unsafe.Pointer(arr))
C.ArrowArrayRelease(arr)
if C.ArrowArrayIsReleased(arr) != 1 {
panic("did not release C mem")
}
})
} else {
// this should only occur in the case of an error happening
// during import, at which point we need to clean up the
// ArrowArray struct we allocated.
if imp.data == nil {
C.free(unsafe.Pointer(arr))
}
}(imp.arr)
Expand All @@ -447,6 +456,12 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
return err
}

for _, c := range imp.children {
if c.data != nil {
defer c.data.Release()
}
}

if imp.arr.n_buffers > 0 {
// get a view of the buffers, zero-copy. we're just looking at the pointers
imp.cbuffers = unsafe.Slice((**C.void)(unsafe.Pointer(imp.arr.buffers)), imp.arr.n_buffers)
Expand All @@ -458,6 +473,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if err := imp.checkNoChildren(); err != nil {
return err
}

imp.data = array.NewData(dt, int(imp.arr.length), nil, nil, int(imp.arr.null_count), int(imp.arr.offset))
case arrow.FixedWidthDataType:
return imp.importFixedSizePrimitive()
Expand Down Expand Up @@ -488,6 +504,9 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if err != nil {
return err
}
if nulls != nil {
defer nulls.Release()
}

imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nulls}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset))
case *arrow.StructType:
Expand All @@ -499,6 +518,9 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if err != nil {
return err
}
if nulls != nil {
defer nulls.Release()
}

children := make([]arrow.ArrayData, len(imp.children))
for i := range imp.children {
Expand Down Expand Up @@ -529,9 +551,11 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if bufs[1], err = imp.importFixedSizeBuffer(1, 1); err != nil {
return err
}
defer bufs[1].Release()
if bufs[2], err = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes)); err != nil {
return err
}
defer bufs[2].Release()
} else {
if err := imp.checkNumBuffers(2); err != nil {
return err
Expand All @@ -540,9 +564,11 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if bufs[1], err = imp.importFixedSizeBuffer(0, 1); err != nil {
return err
}
defer bufs[1].Release()
if bufs[2], err = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes)); err != nil {
return err
}
defer bufs[2].Release()
}

children := make([]arrow.ArrayData, len(imp.children))
Expand All @@ -562,6 +588,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if buf, err = imp.importFixedSizeBuffer(1, 1); err != nil {
return err
}
defer buf.Release()
} else {
if err := imp.checkNumBuffers(1); err != nil {
return err
Expand All @@ -570,6 +597,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error {
if buf, err = imp.importFixedSizeBuffer(0, 1); err != nil {
return err
}
defer buf.Release()
}

children := make([]arrow.ArrayData, len(imp.children))
Expand All @@ -596,14 +624,17 @@ func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) {
var (
nulls, offsets, values *memory.Buffer
)

if nulls, err = imp.importNullBitmap(0); err != nil {
return
}
if nulls != nil {
defer nulls.Release()
}

if offsets, err = imp.importOffsetsBuffer(1, offsetByteWidth); err != nil {
return
}
defer offsets.Release()

var nvals int64
switch offsetByteWidth {
Expand All @@ -617,6 +648,8 @@ func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) {
if values, err = imp.importVariableValuesBuffer(2, 1, nvals); err != nil {
return
}
defer values.Release()

imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset))
return
}
Expand All @@ -634,11 +667,17 @@ func (imp *cimporter) importListLike() (err error) {
if nulls, err = imp.importNullBitmap(0); err != nil {
return
}
if nulls != nil {
defer nulls.Release()
}

offsetSize := imp.dt.Layout().Buffers[1].ByteWidth
if offsets, err = imp.importOffsetsBuffer(1, int64(offsetSize)); err != nil {
return
}
if offsets != nil {
defer offsets.Release()
}

imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset))
return
Expand Down Expand Up @@ -677,14 +716,21 @@ func (imp *cimporter) importFixedSizePrimitive() error {
var dict *array.Data
if dt, ok := imp.dt.(*arrow.DictionaryType); ok {
dictImp := &cimporter{dt: dt.ValueType}
if err := dictImp.doImport(imp.arr.dictionary); err != nil {
if err := dictImp.importChild(imp, imp.arr.dictionary); err != nil {
return err
}
defer dictImp.data.Release()

dict = dictImp.data.(*array.Data)
}

if nulls != nil {
defer nulls.Release()
}
if values != nil {
defer values.Release()
}

imp.data = array.NewDataWithDictionary(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, int(imp.arr.null_count), int(imp.arr.offset), dict)
return nil
}
Expand Down Expand Up @@ -721,9 +767,9 @@ func (imp *cimporter) importBuffer(bufferID int, sz int64) (*memory.Buffer, erro
}
return memory.NewBufferBytes([]byte{}), nil
}
const maxLen = 0x7fffffff
data := (*[maxLen]byte)(unsafe.Pointer(imp.cbuffers[bufferID]))[:sz:sz]
return memory.NewBufferBytes(data), nil
data := unsafe.Slice((*byte)(unsafe.Pointer(imp.cbuffers[bufferID])), sz)
imp.alloc.addBuffer()
return memory.NewBufferWithAllocator(data, imp.alloc), nil
}

func (imp *cimporter) importBitsBuffer(bufferID int) (*memory.Buffer, error) {
Expand Down Expand Up @@ -760,7 +806,7 @@ func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals

func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, err error) {
imp = &cimporter{dt: dt}
err = imp.doImport(arr)
err = imp.doImportArr(arr)
return
}

Expand Down
12 changes: 12 additions & 0 deletions go/arrow/cdata/cdata_fulltest.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,20 @@
#include "arrow/c/helpers.h"
#include "utils.h"

int is_little_endian()
{
unsigned int x = 1;
char *c = (char*) &x;
return (int)*c;
}

static const int64_t kDefaultFlags = ARROW_FLAG_NULLABLE;

extern void releaseTestArr(struct ArrowArray* array);
void goReleaseTestArray(struct ArrowArray* array) {
releaseTestArr(array);
}

static void release_int32_type(struct ArrowSchema* schema) {
// mark released
schema->release = NULL;
Expand Down
29 changes: 21 additions & 8 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/apache/arrow/go/v14/arrow/decimal128"
"github.com/apache/arrow/go/v14/arrow/internal/arrdata"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/arrow/memory/mallocator"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -488,8 +489,11 @@ func TestPrimitiveArrs(t *testing.T) {
arr := tt.fn()
defer arr.Release()

carr := createCArr(arr)
defer freeTestArr(carr)
mem := mallocator.NewMallocator()
defer mem.AssertSize(t, 0)

carr := createCArr(arr, mem)
defer freeTestMallocatorArr(carr, mem)

imported, err := ImportCArrayWithType(carr, arr.DataType())
assert.NoError(t, err)
Expand All @@ -508,8 +512,11 @@ func TestPrimitiveSliced(t *testing.T) {
sl := array.NewSlice(arr, 1, 2)
defer sl.Release()

carr := createCArr(sl)
defer freeTestArr(carr)
mem := mallocator.NewMallocator()
defer mem.AssertSize(t, 0)

carr := createCArr(sl, mem)
defer freeTestMallocatorArr(carr, mem)

imported, err := ImportCArrayWithType(carr, arr.DataType())
assert.NoError(t, err)
Expand Down Expand Up @@ -687,8 +694,11 @@ func TestNestedArrays(t *testing.T) {
arr := tt.fn()
defer arr.Release()

carr := createCArr(arr)
defer freeTestArr(carr)
mem := mallocator.NewMallocator()
defer mem.AssertSize(t, 0)

carr := createCArr(arr, mem)
defer freeTestMallocatorArr(carr, mem)

imported, err := ImportCArrayWithType(carr, arr.DataType())
assert.NoError(t, err)
Expand All @@ -701,11 +711,14 @@ func TestNestedArrays(t *testing.T) {
}

func TestRecordBatch(t *testing.T) {
mem := mallocator.NewMallocator()
defer mem.AssertSize(t, 0)

arr := createTestStructArr()
defer arr.Release()

carr := createCArr(arr)
defer freeTestArr(carr)
carr := createCArr(arr, mem)
defer freeTestMallocatorArr(carr, mem)

sc := testStruct([]string{"+s", "c", "u"}, []string{"", "a", "b"}, []int64{0, flagIsNullable, flagIsNullable})
defer freeMallocedSchemas(sc)
Expand Down
Loading

0 comments on commit 0428c5e

Please sign in to comment.