Skip to content

Commit 0fa7609

Browse files
author
Damir Zainullin
committed
++
1 parent fb9289e commit 0fa7609

File tree

5 files changed

+64
-396
lines changed

5 files changed

+64
-396
lines changed

include/ipfixprobe/outputPlugin/outputStorage/bOutputStorage.hpp

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -46,44 +46,35 @@ class BOutputStorage : public OutputStorage<ElementType> {
4646
public:
4747
// constexpr static std::size_t BUCKET_INDEX_BIT_SIZE = std::countr_zero(BUCKET_COUNT);
4848

49-
explicit BOutputStorage(const uint8_t writersCount) noexcept
50-
: OutputStorage<ElementType>(writersCount)
51-
, m_randomGenerator(1, writersCount)
49+
explicit BOutputStorage() noexcept
50+
: OutputStorage<ElementType>()
51+
, m_randomGenerator(1, OutputStorage<ElementType>::MAX_WRITERS_COUNT)
5252
{
5353
std::ranges::for_each(
5454
std::views::repeat(std::ignore, BUCKET_COUNT),
5555
[&, bucketIndex = 0](const auto) mutable { m_buckets.emplace_back(bucketIndex++); });
5656
}
5757

58-
OutputStorage<ElementType>::WriteHandler registerWriter() noexcept override
58+
void registerWriter(const uint8_t writerIndex) noexcept override
5959
{
6060
std::unique_lock<std::mutex> lock(m_registrationMutex);
6161
m_writersData.emplace_back(m_randomGenerator);
6262
m_buckets[m_writersData.size() - 1].bucketIndex
6363
= m_writersData.back()->bucketAllocation.reset(
6464
m_buckets[m_writersData.size() - 1].bucketIndex);
6565
lock.unlock();
66-
return OutputStorage<ElementType>::registerWriter();
66+
OutputStorage<ElementType>::registerWriter();
6767
}
6868

69-
OutputStorage<ElementType>::ReaderGroupHandler&
70-
registerReaderGroup(const uint8_t groupSize) noexcept override
71-
{
72-
return OutputStorage<ElementType>::registerReaderGroup(groupSize);
73-
}
74-
75-
void registerReader(
76-
[[maybe_unused]] const uint8_t readerGroupIndex,
77-
const uint8_t localReaderIndex,
78-
[[maybe_unused]] const uint8_t globalReaderIndex) noexcept override
69+
void registerReader(const uint8_t readerIndex) noexcept override
7970
{
8071
std::unique_lock<std::mutex> lock(m_registrationMutex);
81-
m_readersData.resize(std::max<std::size_t>(m_readersData.size(), globalReaderIndex + 1));
82-
m_readersData[globalReaderIndex]->readPosition = localReaderIndex;
83-
m_readersData[globalReaderIndex]->generationIncreasePosition = localReaderIndex;
72+
m_readersData.resize(std::max<std::size_t>(m_readersData.size(), readerIndex + 1));
73+
m_readersData[globalReaderIndex]->readPosition = readerIndex;
74+
//m_readersData[globalReaderIndex]->generationIncreasePosition = readerIndex;
8475
lock.unlock();
8576

86-
return OutputStorage<ElementType>::registerReader(
77+
OutputStorage<ElementType>::registerReader(
8778
readerGroupIndex,
8879
localReaderIndex,
8980
globalReaderIndex);
@@ -118,15 +109,12 @@ class BOutputStorage : public OutputStorage<ElementType> {
118109
return true;
119110
}
120111

121-
// uint8_t loopCounter = 0;
122-
// const uint16_t initialPosition = writerData.writePosition;
123112
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
124113
do {
125114
const bool overflowed = writerData.randomShift();
126115
if (overflowed) {
127116
writerData.cachedLowestReaderGeneration = m_lowestReaderGeneration.load();
128117
if (containersLeft == 0) {
129-
// container.deallocate(*m_allocationBuffer);
130118
this->m_allocationBuffer->deallocate(element, writerIndex);
131119
}
132120
backoffScheme.backoff();
@@ -160,7 +148,6 @@ class BOutputStorage : public OutputStorage<ElementType> {
160148
m_buckets[writerData.writePosition].lock.unlock();
161149

162150
if (containersLeft == 0) {
163-
// getNextContainer(writerData.bucketAllocation).assign(element, *m_allocationBuffer);
164151
this->m_allocationBuffer->replace(
165152
getNextElement(writerData.bucketAllocation),
166153
element,
@@ -169,12 +156,9 @@ class BOutputStorage : public OutputStorage<ElementType> {
169156
return true;
170157
}
171158

172-
ElementType* read(
173-
const std::size_t readerGroupIndex,
174-
const uint8_t localReaderIndex,
175-
const uint8_t globalReaderIndex) noexcept override
159+
ElementType* read(const uint8_t readerIndex) noexcept override
176160
{
177-
ReaderData& readerData = m_readersData[globalReaderIndex].get();
161+
ReaderData& readerData = m_readersData[readerIndex].get();
178162
// const uint64_t readPosition = readerData.readPosition;
179163
if (readerData.bucketAllocation.containersLeft()) {
180164
/*if (!BucketAllocation::isValidBucketIndex(readerData.bucketAllocation.bucketIndex)) {
@@ -183,7 +167,6 @@ class BOutputStorage : public OutputStorage<ElementType> {
183167
return getNextElement(readerData.bucketAllocation);
184168
}
185169

186-
// uint8_t loopCounter = 0;
187170
uint64_t cachedGeneration;
188171
uint16_t cachedBucketIndex;
189172
BackoffScheme backoffScheme(0, std::numeric_limits<std::size_t>::max());
@@ -200,12 +183,12 @@ class BOutputStorage : public OutputStorage<ElementType> {
200183
if (!readerData.seenValidBucket) {
201184
updateLowestReaderGeneration();
202185
backoffScheme.backoff();
203-
readerData.skipLoop = true;
186+
//readerData.skipLoop = true;
204187
return nullptr;
205188
}
206189
readerData.generation++;
207190
readerData.seenValidBucket = false;
208-
readerData.skipLoop = false;
191+
//readerData.skipLoop = false;
209192
updateLowestReaderGeneration();
210193
}
211194
cachedGeneration = m_buckets[readerData.readPosition].generation;
@@ -295,10 +278,10 @@ class BOutputStorage : public OutputStorage<ElementType> {
295278
struct ReaderData {
296279
BucketAllocation bucketAllocation {};
297280
uint16_t readPosition;
298-
uint16_t generationIncreasePosition;
281+
//uint16_t generationIncreasePosition;
299282
uint64_t generation {1};
300283
bool seenValidBucket {false};
301-
bool skipLoop {false};
284+
//bool skipLoop {false};
302285

303286
void shift(const uint8_t adjustment, const uint16_t initialPosition) noexcept
304287
{
@@ -328,15 +311,15 @@ class BOutputStorage : public OutputStorage<ElementType> {
328311
};
329312

330313
boost::container::static_vector<Bucket, BUCKET_COUNT> m_buckets;
331-
std::span<Bucket> debugBuckets {m_buckets.data(), BUCKET_COUNT};
314+
std::span<Bucket> d_buckets {m_buckets.data(), BUCKET_COUNT};
332315
std::vector<uint16_t> m_bucketIndices;
333-
struct D {
316+
/*struct D {
334317
uint64_t bucketIndex;
335318
uint64_t generation;
336319
uint64_t readPos;
337320
uint64_t generationIncreasePos;
338321
};
339-
std::vector<std::vector<D>> debugIndices;
322+
std::vector<std::vector<D>> debugIndices;*/
340323

341324
FastRandomGenerator<uint8_t> m_randomGenerator;
342325
/*BucketAllocator m_bucketAllocator {

include/ipfixprobe/outputPlugin/outputStorage/outputContainer.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#pragma once
22

3-
#include "../../processPlugin/flowRecord.hpp"
3+
//#include "../../processPlugin/flowRecord.hpp"
44

55
#include <array>
66
#include <atomic>
@@ -14,10 +14,10 @@ namespace ipxp::output {
1414

1515
template<typename ElementType>
1616
struct OutputContainer {
17-
constexpr static std::size_t SIZE = 64;
18-
std::chrono::steady_clock::time_point creationTime;
17+
//constexpr static std::size_t SIZE = 64;
18+
constexpr static std::size_t SIZE = 1;
1919
boost::container::static_vector<ElementType, SIZE> data;
20-
std::atomic<uint8_t> readTimes {0};
20+
std::atomic<bool> read {0};
2121
};
2222

2323
} // namespace ipxp::output

0 commit comments

Comments
 (0)