From 646572f5ae88fe62a04b174c8b3f1fea2f3f6960 Mon Sep 17 00:00:00 2001 From: drrtuy Date: Fri, 13 Dec 2024 15:03:56 +0000 Subject: [PATCH] feat(): dangling pointer/ref issue has been solved for both RGData and BS --- .../primproc/batchprimitiveprocessor.cpp | 16 +++---- tests/counting_allocator.cpp | 20 ++++++++- tests/rowgroup-tests.cpp | 21 +--------- utils/common/countingallocator.h | 3 +- utils/messageqcpp/bytestream.cpp | 2 +- utils/messageqcpp/bytestream.h | 5 ++- utils/rowgroup/rowgroup.cpp | 42 +++++++++---------- utils/rowgroup/rowgroup.h | 16 +++---- 8 files changed, 57 insertions(+), 68 deletions(-) diff --git a/primitives/primproc/batchprimitiveprocessor.cpp b/primitives/primproc/batchprimitiveprocessor.cpp index c6e30f992..8ef6160d0 100644 --- a/primitives/primproc/batchprimitiveprocessor.cpp +++ b/primitives/primproc/batchprimitiveprocessor.cpp @@ -241,8 +241,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs) uint8_t tmp8; uint16_t tmp16; Command::CommandType type; - auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory(); - std::cout << "initBPP availableMemory: " << cnt << std::endl; bs.advance(sizeof(ISMPacketHeader)); // skip the header bs >> tmp8; @@ -848,8 +846,6 @@ int BatchPrimitiveProcessor::endOfJoiner() { endOfJoinerRan = true; pthread_mutex_unlock(&objLock); - auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory(); - std::cout << "endOfJoiner availableMemory: " << cnt << std::endl; return 0; } @@ -892,8 +888,6 @@ int BatchPrimitiveProcessor::endOfJoiner() endOfJoinerRan = true; pthread_mutex_unlock(&objLock); - auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory(); - std::cout << "endOfJoiner availableMemory: " << cnt << std::endl; return 0; } @@ -2217,7 +2211,7 @@ int BatchPrimitiveProcessor::operator()() cpDataFromDictScan = false; auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator(); - messageqcpp::SBS bs(new ByteStream(&allocator)); + messageqcpp::SBS bs(new ByteStream(allocator)); #ifdef PRIMPROC_STOPWATCH stopwatch->start("BPP() execute"); @@ -2283,14 +2277,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers() if (ot == ROW_GROUP && !outRowGroupData) { // outputRG.setUseStringTable(true); - outRowGroupData.reset(new RGData(outputRG, &allocator)); + outRowGroupData.reset(new RGData(outputRG, allocator)); outputRG.setData(outRowGroupData.get()); } if (fe1 && !fe1Data) { // fe1Input.setUseStringTable(true); - fe1Data.reset(new RGData(fe1Input, &allocator)); + fe1Data.reset(new RGData(fe1Input, allocator)); // fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]); fe1Input.setData(fe1Data.get()); } @@ -2298,14 +2292,14 @@ void BatchPrimitiveProcessor::allocLargeBuffers() if (fe2 && !fe2Data) { // fe2Output.setUseStringTable(true); - fe2Data.reset(new RGData(fe2Output, &allocator)); + fe2Data.reset(new RGData(fe2Output, allocator)); fe2Output.setData(fe2Data.get()); } if (getTupleJoinRowGroupData && !joinedRGMem) { // joinedRG.setUseStringTable(true); - joinedRGMem.reset(new RGData(joinedRG, &allocator)); + joinedRGMem.reset(new RGData(joinedRG, allocator)); joinedRG.setData(joinedRGMem.get()); } } diff --git a/tests/counting_allocator.cpp b/tests/counting_allocator.cpp index 6fe34155f..94d69690d 100644 --- a/tests/counting_allocator.cpp +++ b/tests/counting_allocator.cpp @@ -15,9 +15,11 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include -#include #include #include +#include +#include + #include "countingallocator.h" #include "rowgroup.h" @@ -116,6 +118,14 @@ TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator) buf.reset(); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); + + CountingAllocator allocator1(&allocatedMemory, MemoryAllowance / 100); + std::optional> allocator2(allocator1); + auto buf1 = boost::allocate_shared(*allocator2, allocSize); + EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize); + + buf1.reset(); + EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); } // Test 5: Thread Safety - Concurrent Allocations and Deallocations @@ -158,4 +168,12 @@ TEST_F(CountingAllocatorTest, AllocateZeroObjects) EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); allocator.deallocate(ptr, 0); EXPECT_EQ(allocatedMemory.load(), MemoryAllowance); +} + +TEST_F(CountingAllocatorTest, CopyAssignable) +{ + CountingAllocator allocator1(&allocatedMemory); + CountingAllocator allocator2(&allocatedMemory); + allocator1 = allocator2; + EXPECT_EQ(allocator1, allocator2); } \ No newline at end of file diff --git a/tests/rowgroup-tests.cpp b/tests/rowgroup-tests.cpp index 33c01e7e4..b7ef53ce0 100644 --- a/tests/rowgroup-tests.cpp +++ b/tests/rowgroup-tests.cpp @@ -364,25 +364,6 @@ class RGDataTest : public ::testing::Test execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL}, {65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8}); - - // rgD = rowgroup::RGData(rg, &alloc); - // rg.setData(&rgD); - // rg.initRow(&r); - // rg.getRow(0, &r); - - // for (size_t i = 0; i < sValueVector.size(); i++) - // { - // // setStringField - // r.setBinaryField_offset(&sValueVector[i], sizeof(sValueVector[0]), offsets[0]); - // r.setBinaryField_offset(&anotherValueVector[i], sizeof(anotherValueVector[0]), offsets[1]); - // r.setIntField(s64ValueVector[i], 2); - // r.setIntField(s32ValueVector[i], 3); - // r.setIntField(s16ValueVector[i], 4); - // r.setIntField(s8ValueVector[i], 5); - // r.nextRow(rowSize); - // } - - // rowCount = sValueVector.size(); } // void TearDown() override {} @@ -397,7 +378,7 @@ class RGDataTest : public ::testing::Test TEST_F(RGDataTest, AllocData) { std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl; - rgD = rowgroup::RGData(rg, &alloc); + rgD = rowgroup::RGData(rg, alloc); rg.setData(&rgD); rg.initRow(&r); rg.getRow(0, &r); diff --git a/utils/common/countingallocator.h b/utils/common/countingallocator.h index 8f1d83724..4d9a36f6f 100644 --- a/utils/common/countingallocator.h +++ b/utils/common/countingallocator.h @@ -44,8 +44,7 @@ class CountingAllocator { // Copy constructor (template to allow conversion between different types) template CountingAllocator(const CountingAllocator& other) noexcept - : memoryLimit_(other.memoryLimit_) {} - + : memoryLimit_(other.memoryLimit_), memoryLimitLowerBound(other.memoryLimitLowerBound) {} // Allocate memory for n objects of type T template diff --git a/utils/messageqcpp/bytestream.cpp b/utils/messageqcpp/bytestream.cpp index b94b2c31a..183f3d5eb 100644 --- a/utils/messageqcpp/bytestream.cpp +++ b/utils/messageqcpp/bytestream.cpp @@ -101,7 +101,7 @@ ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0) } // WIP remove this one, replacing the allocator arg with a default nullptr. -ByteStream::ByteStream(allocators::CountingAllocator* allocator, uint32_t initSize) +ByteStream::ByteStream(allocators::CountingAllocator& allocator, uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0), allocator(allocator) { if (initSize > 0) diff --git a/utils/messageqcpp/bytestream.h b/utils/messageqcpp/bytestream.h index e9fc8920a..e48ea6bfc 100644 --- a/utils/messageqcpp/bytestream.h +++ b/utils/messageqcpp/bytestream.h @@ -19,6 +19,7 @@ */ #pragma once +#include #include #include #include @@ -78,7 +79,7 @@ class ByteStream : public Serializeable * default ctor */ EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best - explicit ByteStream(allocators::CountingAllocator* alloc, uint32_t initSize = 8192); + explicit ByteStream(allocators::CountingAllocator& alloc, uint32_t initSize = 8192); /** * ctor with a uint8_t array and len initializer */ @@ -476,7 +477,7 @@ class ByteStream : public Serializeable uint32_t fMaxLen; // how big fBuf is currently // Stores `long strings`. std::vector longStrings; - allocators::CountingAllocator* allocator = nullptr; + std::optional> allocator = {}; }; template diff --git a/utils/rowgroup/rowgroup.cpp b/utils/rowgroup/rowgroup.cpp index c03540ef9..2df63b3bf 100644 --- a/utils/rowgroup/rowgroup.cpp +++ b/utils/rowgroup/rowgroup.cpp @@ -48,29 +48,13 @@ namespace rowgroup { using cscType = execplan::CalpontSystemCatalog::ColDataType; -StringStore::StringStore(allocators::CountingAllocator* alloc) : StringStore() +StringStore::StringStore(allocators::CountingAllocator alloc) : StringStore() { this->alloc = alloc; } StringStore::~StringStore() { -#if 0 - // for mem usage debugging - uint32_t i; - uint64_t inUse = 0, allocated = 0; - - for (i = 0; i < mem.size(); i++) - { - MemChunk* tmp = (MemChunk*) mem.back().get(); - inUse += tmp->currentSize; - allocated += tmp->capacity; - } - - if (allocated > 0) - cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse / (float) allocated << endl; - -#endif } uint64_t StringStore::storeString(const uint8_t* data, uint32_t len) @@ -376,15 +360,16 @@ RGData::RGData(const RowGroup& rg) #endif } -RGData::RGData(const RowGroup& rg, allocators::CountingAllocator* alloc) : alloc(alloc) +RGData::RGData(const RowGroup& rg, allocators::CountingAllocator& _alloc) : alloc(_alloc) { - // rowData = shared_ptr(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize); - // }); - rowData = boost::allocate_shared(*alloc, rg.getMaxDataSize()); + rowData = boost::allocate_shared(alloc.value(), rg.getMaxDataSize()); // rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]); if (rg.usesStringTable()) - strings.reset(new StringStore(alloc)); + { + allocators::CountingAllocator ssAlloc = _alloc; + strings.reset(new StringStore(ssAlloc)); + } userDataStore.reset(); @@ -413,7 +398,18 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount) userDataStore.reset(); if (rg.usesStringTable()) - strings.reset(new StringStore(alloc)); + { + if (alloc) + { + allocators::CountingAllocator ssAlloc = alloc.value(); + strings.reset(new StringStore(ssAlloc)); + } + else + { + strings.reset(new StringStore()); + } + + } else strings.reset(); diff --git a/utils/rowgroup/rowgroup.h b/utils/rowgroup/rowgroup.h index ea966d676..723454ab2 100644 --- a/utils/rowgroup/rowgroup.h +++ b/utils/rowgroup/rowgroup.h @@ -136,7 +136,7 @@ class StringStore { public: StringStore() = default; - StringStore(allocators::CountingAllocator* alloc); + StringStore(allocators::CountingAllocator alloc); StringStore(const StringStore&) = delete; StringStore(StringStore&&) = delete; StringStore& operator=(const StringStore&) = delete; @@ -187,11 +187,11 @@ class StringStore std::vector> mem; // To store strings > 64KB (BLOB/TEXT) - std::vector longStrings; + std::vector> longStrings; bool empty = true; bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe boost::mutex fMutex; - allocators::CountingAllocator* alloc = nullptr; + std::optional> alloc {}; }; // Where we store user data for UDA(n)F @@ -264,8 +264,8 @@ class RGData RGData() = default; // useless unless followed by an = or a deserialize operation RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData explicit RGData(const RowGroup& rg); - explicit RGData(const RowGroup& rg, allocators::CountingAllocator* alloc); - RGData& operator=(const RGData&) = default; + explicit RGData(const RowGroup& rg, allocators::CountingAllocator& alloc); + RGData& operator=(const RGData& rhs) = default; RGData& operator=(RGData&&) = default; RGData(const RGData&) = default; RGData(RGData&&) = default; @@ -327,7 +327,7 @@ class RGData boost::shared_ptr rowData; boost::shared_ptr strings; std::shared_ptr userDataStore; - allocators::CountingAllocator* alloc = nullptr; + std::optional> alloc = {}; // Need sig to support backward compat. RGData can deserialize both forms. static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data @@ -998,7 +998,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex if (inStringTable(colIndex)) { - std::cout << "setStringField storeString len " << length << std::endl; + // std::cout << "setStringField storeString len " << length << std::endl; offset = strings->storeString((const uint8_t*)str.str(), length); *((uint64_t*)&data[offsets[colIndex]]) = offset; // cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]]) @@ -1007,7 +1007,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex } else { - std::cout << "setStringField memcpy " << std::endl; + // std::cout << "setStringField memcpy " << std::endl; memcpy(&data[offsets[colIndex]], str.str(), length); memset(&data[offsets[colIndex] + length], 0, offsets[colIndex + 1] - (offsets[colIndex] + length)); }