Skip to content

Commit

Permalink
feat(): dangling pointer/ref issue has been solved for both RGData an…
Browse files Browse the repository at this point in the history
…d BS
  • Loading branch information
drrtuy committed Dec 17, 2024
1 parent fac7dfa commit 646572f
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 68 deletions.
16 changes: 5 additions & 11 deletions primitives/primproc/batchprimitiveprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,6 @@ void BatchPrimitiveProcessor::initBPP(ByteStream& bs)
uint8_t tmp8;
uint16_t tmp16;
Command::CommandType type;
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "initBPP availableMemory: " << cnt << std::endl;

bs.advance(sizeof(ISMPacketHeader)); // skip the header
bs >> tmp8;
Expand Down Expand Up @@ -848,8 +846,6 @@ int BatchPrimitiveProcessor::endOfJoiner()
{
endOfJoinerRan = true;
pthread_mutex_unlock(&objLock);
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "endOfJoiner availableMemory: " << cnt << std::endl;
return 0;
}

Expand Down Expand Up @@ -892,8 +888,6 @@ int BatchPrimitiveProcessor::endOfJoiner()
endOfJoinerRan = true;

pthread_mutex_unlock(&objLock);
auto cnt = exemgr::globServiceExeMgr->getRm().availableMemory();
std::cout << "endOfJoiner availableMemory: " << cnt << std::endl;
return 0;
}

Expand Down Expand Up @@ -2217,7 +2211,7 @@ int BatchPrimitiveProcessor::operator()()
cpDataFromDictScan = false;

auto allocator = exemgr::globServiceExeMgr->getRm().getAllocator<messageqcpp::BSBufType>();
messageqcpp::SBS bs(new ByteStream(&allocator));
messageqcpp::SBS bs(new ByteStream(allocator));

#ifdef PRIMPROC_STOPWATCH
stopwatch->start("BPP() execute");
Expand Down Expand Up @@ -2283,29 +2277,29 @@ void BatchPrimitiveProcessor::allocLargeBuffers()
if (ot == ROW_GROUP && !outRowGroupData)
{
// outputRG.setUseStringTable(true);
outRowGroupData.reset(new RGData(outputRG, &allocator));
outRowGroupData.reset(new RGData(outputRG, allocator));
outputRG.setData(outRowGroupData.get());
}

if (fe1 && !fe1Data)
{
// fe1Input.setUseStringTable(true);
fe1Data.reset(new RGData(fe1Input, &allocator));
fe1Data.reset(new RGData(fe1Input, allocator));
// fe1Data.reset(new uint8_t[fe1Input.getMaxDataSize()]);
fe1Input.setData(fe1Data.get());
}

if (fe2 && !fe2Data)
{
// fe2Output.setUseStringTable(true);
fe2Data.reset(new RGData(fe2Output, &allocator));
fe2Data.reset(new RGData(fe2Output, allocator));
fe2Output.setData(fe2Data.get());
}

if (getTupleJoinRowGroupData && !joinedRGMem)
{
// joinedRG.setUseStringTable(true);
joinedRGMem.reset(new RGData(joinedRG, &allocator));
joinedRGMem.reset(new RGData(joinedRG, allocator));
joinedRG.setData(joinedRGMem.get());
}
}
Expand Down
20 changes: 19 additions & 1 deletion tests/counting_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
#include <gtest/gtest.h>
#include <memory>
#include <atomic>
#include <cstddef>
#include <memory>
#include <thread>

#include "countingallocator.h"
#include "rowgroup.h"

Expand Down Expand Up @@ -116,6 +118,14 @@ TEST_F(CountingAllocatorTest, AllocateSharedUsesAllocator)

buf.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);

CountingAllocator<rowgroup::RGDataBufType> allocator1(&allocatedMemory, MemoryAllowance / 100);
std::optional<CountingAllocator<rowgroup::RGDataBufType>> allocator2(allocator1);
auto buf1 = boost::allocate_shared<rowgroup::RGDataBufType>(*allocator2, allocSize);
EXPECT_LE(allocatedMemory.load(), MemoryAllowance - allocSize);

buf1.reset();
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
}

// Test 5: Thread Safety - Concurrent Allocations and Deallocations
Expand Down Expand Up @@ -158,4 +168,12 @@ TEST_F(CountingAllocatorTest, AllocateZeroObjects)
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
allocator.deallocate(ptr, 0);
EXPECT_EQ(allocatedMemory.load(), MemoryAllowance);
}

TEST_F(CountingAllocatorTest, CopyAssignable)
{
CountingAllocator<TestClass> allocator1(&allocatedMemory);
CountingAllocator<TestClass> allocator2(&allocatedMemory);
allocator1 = allocator2;
EXPECT_EQ(allocator1, allocator2);
}
21 changes: 1 addition & 20 deletions tests/rowgroup-tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,25 +364,6 @@ class RGDataTest : public ::testing::Test
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL,
execplan::CalpontSystemCatalog::DECIMAL, execplan::CalpontSystemCatalog::DECIMAL},
{65536, 16, 8, 4, 2, 1}, {8, 8, 8, 8, 8, 8});

// rgD = rowgroup::RGData(rg, &alloc);
// rg.setData(&rgD);
// rg.initRow(&r);
// rg.getRow(0, &r);

// for (size_t i = 0; i < sValueVector.size(); i++)
// {
// // setStringField
// r.setBinaryField_offset(&sValueVector[i], sizeof(sValueVector[0]), offsets[0]);
// r.setBinaryField_offset(&anotherValueVector[i], sizeof(anotherValueVector[0]), offsets[1]);
// r.setIntField(s64ValueVector[i], 2);
// r.setIntField(s32ValueVector[i], 3);
// r.setIntField(s16ValueVector[i], 4);
// r.setIntField(s8ValueVector[i], 5);
// r.nextRow(rowSize);
// }

// rowCount = sValueVector.size();
}

// void TearDown() override {}
Expand All @@ -397,7 +378,7 @@ class RGDataTest : public ::testing::Test
TEST_F(RGDataTest, AllocData)
{
std::cout << " test allocatedMemery " << allocatedMemory.load() << " rowsize " << rg.getRowSize() << " " << rg.getMaxDataSize() << std::endl;
rgD = rowgroup::RGData(rg, &alloc);
rgD = rowgroup::RGData(rg, alloc);
rg.setData(&rgD);
rg.initRow(&r);
rg.getRow(0, &r);
Expand Down
3 changes: 1 addition & 2 deletions utils/common/countingallocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class CountingAllocator {
// Copy constructor (template to allow conversion between different types)
template <typename U>
CountingAllocator(const CountingAllocator<U>& other) noexcept
: memoryLimit_(other.memoryLimit_) {}

: memoryLimit_(other.memoryLimit_), memoryLimitLowerBound(other.memoryLimitLowerBound) {}

// Allocate memory for n objects of type T
template <typename U = T>
Expand Down
2 changes: 1 addition & 1 deletion utils/messageqcpp/bytestream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ ByteStream::ByteStream(uint32_t initSize) : fBuf(0), fCurInPtr(0), fCurOutPtr(0)
}

// WIP remove this one, replacing the allocator arg with a default nullptr.
ByteStream::ByteStream(allocators::CountingAllocator<uint8_t>* allocator, uint32_t initSize)
ByteStream::ByteStream(allocators::CountingAllocator<uint8_t>& allocator, uint32_t initSize)
: fBuf(0), fCurInPtr(0), fCurOutPtr(0), fMaxLen(0), allocator(allocator)
{
if (initSize > 0)
Expand Down
5 changes: 3 additions & 2 deletions utils/messageqcpp/bytestream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#pragma once
#include <optional>
#include <string>
#include <iostream>
#include <sys/types.h>
Expand Down Expand Up @@ -78,7 +79,7 @@ class ByteStream : public Serializeable
* default ctor
*/
EXPORT explicit ByteStream(uint32_t initSize = 8192); // multiples of pagesize are best
explicit ByteStream(allocators::CountingAllocator<BSBufType>* alloc, uint32_t initSize = 8192);
explicit ByteStream(allocators::CountingAllocator<BSBufType>& alloc, uint32_t initSize = 8192);
/**
* ctor with a uint8_t array and len initializer
*/
Expand Down Expand Up @@ -476,7 +477,7 @@ class ByteStream : public Serializeable
uint32_t fMaxLen; // how big fBuf is currently
// Stores `long strings`.
std::vector<rowgroup::StringStoreBufSPType> longStrings;
allocators::CountingAllocator<BSBufType>* allocator = nullptr;
std::optional<allocators::CountingAllocator<BSBufType>> allocator = {};
};

template <int W, typename T = void>
Expand Down
42 changes: 19 additions & 23 deletions utils/rowgroup/rowgroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,13 @@ namespace rowgroup
{
using cscType = execplan::CalpontSystemCatalog::ColDataType;

StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc) : StringStore()
StringStore::StringStore(allocators::CountingAllocator<StringStoreBufType> alloc) : StringStore()
{
this->alloc = alloc;
}

StringStore::~StringStore()
{
#if 0
// for mem usage debugging
uint32_t i;
uint64_t inUse = 0, allocated = 0;

for (i = 0; i < mem.size(); i++)
{
MemChunk* tmp = (MemChunk*) mem.back().get();
inUse += tmp->currentSize;
allocated += tmp->capacity;
}

if (allocated > 0)
cout << "~SS: " << inUse << "/" << allocated << " = " << (float) inUse / (float) allocated << endl;

#endif
}

uint64_t StringStore::storeString(const uint8_t* data, uint32_t len)
Expand Down Expand Up @@ -376,15 +360,16 @@ RGData::RGData(const RowGroup& rg)
#endif
}

RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc) : alloc(alloc)
RGData::RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& _alloc) : alloc(_alloc)
{
// rowData = shared_ptr<uint8_t[]>(buf, [alloc, allocSize](uint8_t* p) { alloc->deallocate(p, allocSize);
// });
rowData = boost::allocate_shared<RGDataBufType>(*alloc, rg.getMaxDataSize());
rowData = boost::allocate_shared<RGDataBufType>(alloc.value(), rg.getMaxDataSize());
// rowData = std::make_shared(uint8_t[rg.getMaxDataSize()]);

if (rg.usesStringTable())
strings.reset(new StringStore(alloc));
{
allocators::CountingAllocator<StringStoreBufType> ssAlloc = _alloc;
strings.reset(new StringStore(ssAlloc));
}

userDataStore.reset();

Expand Down Expand Up @@ -413,7 +398,18 @@ void RGData::reinit(const RowGroup& rg, uint32_t rowCount)
userDataStore.reset();

if (rg.usesStringTable())
strings.reset(new StringStore(alloc));
{
if (alloc)
{
allocators::CountingAllocator<StringStoreBufType> ssAlloc = alloc.value();
strings.reset(new StringStore(ssAlloc));
}
else
{
strings.reset(new StringStore());
}

}
else
strings.reset();

Expand Down
16 changes: 8 additions & 8 deletions utils/rowgroup/rowgroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class StringStore
{
public:
StringStore() = default;
StringStore(allocators::CountingAllocator<StringStoreBufType>* alloc);
StringStore(allocators::CountingAllocator<StringStoreBufType> alloc);
StringStore(const StringStore&) = delete;
StringStore(StringStore&&) = delete;
StringStore& operator=(const StringStore&) = delete;
Expand Down Expand Up @@ -187,11 +187,11 @@ class StringStore
std::vector<boost::shared_ptr<uint8_t[]>> mem;

// To store strings > 64KB (BLOB/TEXT)
std::vector<StringStoreBufSPType> longStrings;
std::vector<boost::shared_ptr<uint8_t[]>> longStrings;
bool empty = true;
bool fUseStoreStringMutex = false; //@bug6065, make StringStore::storeString() thread safe
boost::mutex fMutex;
allocators::CountingAllocator<StringStoreBufType>* alloc = nullptr;
std::optional<allocators::CountingAllocator<StringStoreBufType>> alloc {};
};

// Where we store user data for UDA(n)F
Expand Down Expand Up @@ -264,8 +264,8 @@ class RGData
RGData() = default; // useless unless followed by an = or a deserialize operation
RGData(const RowGroup& rg, uint32_t rowCount); // allocates memory for rowData
explicit RGData(const RowGroup& rg);
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>* alloc);
RGData& operator=(const RGData&) = default;
explicit RGData(const RowGroup& rg, allocators::CountingAllocator<RGDataBufType>& alloc);
RGData& operator=(const RGData& rhs) = default;
RGData& operator=(RGData&&) = default;
RGData(const RGData&) = default;
RGData(RGData&&) = default;
Expand Down Expand Up @@ -327,7 +327,7 @@ class RGData
boost::shared_ptr<RGDataBufType> rowData;
boost::shared_ptr<StringStore> strings;
std::shared_ptr<UserDataStore> userDataStore;
allocators::CountingAllocator<RGDataBufType>* alloc = nullptr;
std::optional<allocators::CountingAllocator<RGDataBufType>> alloc = {};

// Need sig to support backward compat. RGData can deserialize both forms.
static const uint32_t RGDATA_SIG = 0xffffffff; // won't happen for 'old' Rowgroup data
Expand Down Expand Up @@ -998,7 +998,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex

if (inStringTable(colIndex))
{
std::cout << "setStringField storeString len " << length << std::endl;
// std::cout << "setStringField storeString len " << length << std::endl;
offset = strings->storeString((const uint8_t*)str.str(), length);
*((uint64_t*)&data[offsets[colIndex]]) = offset;
// cout << " -- stored offset " << *((uint32_t *) &data[offsets[colIndex]])
Expand All @@ -1007,7 +1007,7 @@ inline void Row::setStringField(const utils::ConstString& str, uint32_t colIndex
}
else
{
std::cout << "setStringField memcpy " << std::endl;
// std::cout << "setStringField memcpy " << std::endl;
memcpy(&data[offsets[colIndex]], str.str(), length);
memset(&data[offsets[colIndex] + length], 0, offsets[colIndex + 1] - (offsets[colIndex] + length));
}
Expand Down

0 comments on commit 646572f

Please sign in to comment.