Skip to content

Commit

Permalink
[ML] Remove some unused (and buggy) functionality in anomaly detectio…
Browse files Browse the repository at this point in the history
…n data gathering (#2216)

This PR removes some functionality for updating time which was not being used and in fact wasn't properly maintaining
class invariants. It also adds some extra useful information in various log messages, in case we still have problems in
this area, after fixing #2213.

Finally, it addresses assorted warnings in this code generated by clang-tidy. Principally, we were using bind, because
the code predated lambdas, however calling via lambdas often produces better inlining.
  • Loading branch information
tveasey authored Feb 21, 2022
1 parent 2d02e1d commit 91be7ef
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 109 deletions.
3 changes: 0 additions & 3 deletions include/model/CBucketGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,6 @@ class MODEL_EXPORT CBucketGatherer {
//! Get the start of the current bucketing time interval.
core_t::TTime currentBucketStartTime() const;

//! Set the start of the current bucketing time interval.
void currentBucketStartTime(core_t::TTime time);

//! The earliest time for which data can still arrive.
core_t::TTime earliestBucketStartTime() const;

Expand Down
15 changes: 3 additions & 12 deletions include/model/CBucketQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ class CBucketQueue {
//! Note, the queue should never be empty.
void clear(const T& initial = T()) { this->fill(initial); }

//! Resets the queue to \p startTime.
//! This will clear the queue and will fill it with default items.
void reset(core_t::TTime startTime, const T& initial = T()) {
m_LatestBucketEnd = startTime + m_BucketLength - 1;
this->fill(initial);
}

//! Returns an iterator pointing to the latest bucket and directed
//! towards the earlier buckets.
iterator begin() { return m_Queue.begin(); }
Expand Down Expand Up @@ -217,11 +210,9 @@ class CBucketQueue {
if (!(core::CPersistUtils::restore(BUCKET_TAG, dummy, traverser))) {
LOG_ERROR(<< "Invalid bucket");
}
} else {
if (!(core::CPersistUtils::restore(BUCKET_TAG, m_Queue[i], traverser))) {
LOG_ERROR(<< "Invalid bucket");
return false;
}
} else if (!(core::CPersistUtils::restore(BUCKET_TAG, m_Queue[i], traverser))) {
LOG_ERROR(<< "Invalid bucket");
return false;
}
}
} while (traverser.next());
Expand Down
3 changes: 0 additions & 3 deletions include/model/CDataGatherer.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,6 @@ class MODEL_EXPORT CDataGatherer {
//! Get the start of the current bucketing time interval.
core_t::TTime currentBucketStartTime() const;

//! Reset the current bucketing interval start time.
void currentBucketStartTime(core_t::TTime bucketStart);

//! Get the length of the bucketing time interval.
core_t::TTime bucketLength() const;

Expand Down
8 changes: 0 additions & 8 deletions lib/model/CBucketGatherer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,6 @@ void CBucketGatherer::timeNow(core_t::TTime time) {
void CBucketGatherer::hiddenTimeNow(core_t::TTime time, bool skipUpdates) {
m_EarliestTime = std::min(m_EarliestTime, time);
core_t::TTime n = (time - m_BucketStart) / this->bucketLength();
if (n <= 0) {
return;
}

core_t::TTime newBucketStart = m_BucketStart;
for (core_t::TTime i = 0; i < n; ++i) {
newBucketStart += this->bucketLength();
Expand Down Expand Up @@ -426,10 +422,6 @@ core_t::TTime CBucketGatherer::currentBucketStartTime() const {
return m_BucketStart;
}

void CBucketGatherer::currentBucketStartTime(core_t::TTime time) {
m_BucketStart = time;
}

core_t::TTime CBucketGatherer::earliestBucketStartTime() const {
return this->currentBucketStartTime() -
(m_DataGatherer.params().s_LatencyBuckets * this->bucketLength());
Expand Down
4 changes: 0 additions & 4 deletions lib/model/CDataGatherer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,6 @@ core_t::TTime CDataGatherer::currentBucketStartTime() const {
return m_BucketGatherer->currentBucketStartTime();
}

void CDataGatherer::currentBucketStartTime(core_t::TTime bucketStart) {
m_BucketGatherer->currentBucketStartTime(bucketStart);
}

core_t::TTime CDataGatherer::bucketLength() const {
return m_BucketGatherer->bucketLength();
}
Expand Down
73 changes: 40 additions & 33 deletions lib/model/CEventRateBucketGatherer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ struct SAddValue {
return;
}
if (time > personAttributeUniqueCounts.latestBucketEnd()) {
LOG_ERROR(<< "No queue item for time " << time);
LOG_ERROR(<< "No queue item for time " << time << ", end of latest bucket "
<< personAttributeUniqueCounts.latestBucketEnd() << ", bucket length "
<< personAttributeUniqueCounts.bucketLength());
personAttributeUniqueCounts.push(TSizeSizePrStrDataUMap(1), time);
}
TSizeSizePrStrDataUMap& counts = personAttributeUniqueCounts.get(time);
Expand All @@ -586,7 +588,9 @@ struct SAddValue {
const CEventData::TOptionalStr& /*uniqueStrings*/,
const TStoredStringPtrVec& /*influences*/) const {
if (time > arrivalTimes.latestBucketEnd()) {
LOG_ERROR(<< "No queue item for time " << time);
LOG_ERROR(<< "No queue item for time " << time << ", end of latest bucket "
<< arrivalTimes.latestBucketEnd() << ", bucket length "
<< arrivalTimes.bucketLength());
arrivalTimes.push(TSizeSizePrMeanAccumulatorUMap(1), time);
}
TSizeSizePrMeanAccumulatorUMap& times = arrivalTimes.get(time);
Expand Down Expand Up @@ -860,8 +864,9 @@ bool CEventRateBucketGatherer::processFields(const TStrCPtrVec& fieldValues,
return false;
}

for (std::size_t i = m_DataGatherer.isPopulation() + 1; i < m_BeginValueField; ++i) {
result.addInfluence(fieldValues[i] ? TOptionalStr(*fieldValues[i]) : TOptionalStr());
for (std::size_t i = m_DataGatherer.isPopulation() ? 2 : 1; i < m_BeginValueField; ++i) {
result.addInfluence(fieldValues[i] != nullptr ? TOptionalStr(*fieldValues[i])
: TOptionalStr());
}

if (m_BeginValueField != m_BeginSummaryFields) {
Expand Down Expand Up @@ -965,42 +970,42 @@ void CEventRateBucketGatherer::recyclePeople(const TSizeVec& peopleToRemove) {
if (peopleToRemove.empty()) {
return;
}

applyFunc(m_FeatureData, std::bind<void>(SRemovePeople(), std::placeholders::_1,
std::cref(peopleToRemove)));

applyFunc(m_FeatureData, [&, remove = SRemovePeople{} ](auto& data) {
remove(data, peopleToRemove);
});
this->CBucketGatherer::recyclePeople(peopleToRemove);
}

void CEventRateBucketGatherer::removePeople(std::size_t lowestPersonToRemove) {
applyFunc(m_FeatureData, std::bind<void>(SRemovePeople(), std::placeholders::_1, lowestPersonToRemove,
m_DataGatherer.numberPeople()));
applyFunc(m_FeatureData, [&, remove = SRemovePeople{} ](auto& data) {
remove(data, lowestPersonToRemove, m_DataGatherer.numberPeople());
});
this->CBucketGatherer::removePeople(lowestPersonToRemove);
}

void CEventRateBucketGatherer::recycleAttributes(const TSizeVec& attributesToRemove) {
if (attributesToRemove.empty()) {
return;
}

applyFunc(m_FeatureData, std::bind<void>(SRemoveAttributes(), std::placeholders::_1,
std::cref(attributesToRemove)));

applyFunc(m_FeatureData, [&, remove = SRemoveAttributes{} ](auto& data) {
remove(data, attributesToRemove);
});
this->CBucketGatherer::recycleAttributes(attributesToRemove);
}

void CEventRateBucketGatherer::removeAttributes(std::size_t lowestAttributeToRemove) {
applyFunc(m_FeatureData, std::bind<void>(SRemoveAttributes(), std::placeholders::_1,
lowestAttributeToRemove));
applyFunc(m_FeatureData, [&, remove = SRemoveAttributes{} ](auto& data) {
remove(data, lowestAttributeToRemove);
});
this->CBucketGatherer::removeAttributes(lowestAttributeToRemove);
}

uint64_t CEventRateBucketGatherer::checksum() const {
uint64_t seed = this->CBucketGatherer::checksum();

TStrUInt64Map hashes;
applyFunc(m_FeatureData, std::bind<void>(SChecksum(), std::placeholders::_1,
std::cref(m_DataGatherer), std::ref(hashes)));
applyFunc(m_FeatureData, [&, checksum = SChecksum{} ](const auto& data) {
checksum(data, m_DataGatherer, hashes);
});
LOG_TRACE(<< "seed = " << seed);
LOG_TRACE(<< "hashes = " << core::CContainerPrinter::print(hashes));
core::CHashing::CSafeMurmurHash2String64 hasher;
Expand Down Expand Up @@ -1053,11 +1058,12 @@ void CEventRateBucketGatherer::featureData(core_t::TTime time,
if (!this->dataAvailable(time) ||
time >= this->currentBucketStartTime() + this->bucketLength()) {
LOG_DEBUG(<< "No data available at " << time
<< ", current bucket = " << this->printCurrentBucket());
<< ", current bucket = " << this->printCurrentBucket()
<< ", bucket length = " << this->bucketLength());
return;
}

for (std::size_t i = 0u, n = m_DataGatherer.numberFeatures(); i < n; ++i) {
for (std::size_t i = 0, n = m_DataGatherer.numberFeatures(); i < n; ++i) {
const model_t::EFeature feature = m_DataGatherer.feature(i);

switch (feature) {
Expand Down Expand Up @@ -1179,7 +1185,7 @@ void CEventRateBucketGatherer::personCounts(model_t::EFeature feature,
*boost::unsafe_any_cast<TSizeFeatureDataPrVec>(&result_.back().second);
result.reserve(m_DataGatherer.numberActivePeople());

for (std::size_t pid = 0u, n = m_DataGatherer.numberPeople(); pid < n; ++pid) {
for (std::size_t pid = 0, n = m_DataGatherer.numberPeople(); pid < n; ++pid) {
if (!m_DataGatherer.isPersonActive(pid) ||
this->hasExplicitNullsOnly(time, pid, model_t::INDIVIDUAL_ANALYSIS_ATTRIBUTE_ID)) {
continue;
Expand Down Expand Up @@ -1271,7 +1277,7 @@ void CEventRateBucketGatherer::peoplePerAttribute(model_t::EFeature feature,
}

try {
const TSizeUSetVec& attributePeople = boost::any_cast<const TSizeUSetVec&>(i->second);
const auto& attributePeople = boost::any_cast<const TSizeUSetVec&>(i->second);
result.reserve(attributePeople.size());
for (std::size_t cid = 0; cid < attributePeople.size(); ++cid) {
if (m_DataGatherer.isAttributeActive(cid)) {
Expand Down Expand Up @@ -1516,7 +1522,8 @@ void CEventRateBucketGatherer::bucketMeanTimesPerPersonAttribute(model_t::EFeatu
}

void CEventRateBucketGatherer::resize(std::size_t pid, std::size_t cid) {
applyFunc(m_FeatureData, std::bind<void>(SResize(), std::placeholders::_1, pid, cid));
applyFunc(m_FeatureData,
[&, resize = SResize{} ](auto& data) { resize(data, pid, cid); });
}

void CEventRateBucketGatherer::addValue(std::size_t pid,
Expand All @@ -1528,14 +1535,15 @@ void CEventRateBucketGatherer::addValue(std::size_t pid,
const TStoredStringPtrVec& influences) {
// Check that we are correctly sized - a person/attribute might have been added
this->resize(pid, cid);
applyFunc(m_FeatureData,
std::bind<void>(SAddValue(), std::placeholders::_1, pid, cid,
time, count, std::cref(values),
std::cref(stringValue), std::cref(influences)));
applyFunc(m_FeatureData, [&, addValue = SAddValue{} ](auto& data) {
addValue(data, pid, cid, time, count, values, stringValue, influences);
});
}

void CEventRateBucketGatherer::startNewBucket(core_t::TTime time, bool /*skipUpdates*/) {
applyFunc(m_FeatureData, std::bind<void>(SNewBucket(), std::placeholders::_1, time));
applyFunc(m_FeatureData, [&, newBucket = SNewBucket{} ](auto& data) {
newBucket(data, time);
});
}

void CEventRateBucketGatherer::initializeFieldNames(const std::string& personFieldName,
Expand Down Expand Up @@ -1564,14 +1572,13 @@ void CEventRateBucketGatherer::initializeFieldNames(const std::string& personFie
case model_t::E_Manual:
m_FieldNames.push_back(summaryCountFieldName);
break;
};
}

// swap trick to reduce unused capacity
TStrVec(m_FieldNames).swap(m_FieldNames);
m_FieldNames.shrink_to_fit();
}

void CEventRateBucketGatherer::initializeFeatureData() {
for (std::size_t i = 0u, n = m_DataGatherer.numberFeatures(); i < n; ++i) {
for (std::size_t i = 0, n = m_DataGatherer.numberFeatures(); i < n; ++i) {
switch (m_DataGatherer.feature(i)) {
case model_t::E_IndividualCountByBucketAndPerson:
case model_t::E_IndividualNonZeroCountByBucketAndPerson:
Expand Down
Loading

0 comments on commit 91be7ef

Please sign in to comment.