Skip to content

Commit

Permalink
Merge branch 'ad-freiburg:master' into words-and-docs-file-parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Flixtastic authored Jan 9, 2025
2 parents b699551 + acb6633 commit 1642175
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 96 deletions.
6 changes: 3 additions & 3 deletions src/global/Constants.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2023, University of Freiburg,
// Copyright 2023 - 2025, University of Freiburg,
// Chair of Algorithms and Data Structures.
//
// Authors: Björn Buchhold <buchhold@gmail.com>
// Authors: Björn Buchhold <buchhold@gmail.com> [2014 - 2017]
// Johannes Kalmbach <kalmbach@cs.uni-freiburg.de>
// Hannah Bast <bast@cs.uni-freiburg.de>

Expand All @@ -22,6 +21,7 @@ using namespace ad_utility::memory_literals;
constexpr inline ad_utility::MemorySize DEFAULT_MEMORY_LIMIT_INDEX_BUILDING =
5_GB;
constexpr inline ad_utility::MemorySize STXXL_DISK_SIZE_INDEX_BUILDER = 1_GB;
constexpr inline ad_utility::MemorySize DEFAULT_PARSER_BUFFER_SIZE = 10_MB;

constexpr inline ad_utility::MemorySize DEFAULT_MEM_FOR_QUERIES = 4_GB;

Expand Down
4 changes: 0 additions & 4 deletions src/index/ConstantsIndexBuilding.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ constexpr inline size_t PARSER_BATCH_SIZE = 1'000'000;
// streams faster.
constexpr inline size_t PARSER_MIN_TRIPLES_AT_ONCE = 10'000;

// When reading from a file, Chunks of this size will
// be fed to the parser at once (10 MiB).
constinit inline std::atomic<size_t> FILE_BUFFER_SIZE = 10 * (1ul << 20);

constinit inline std::atomic<size_t> BUFFER_SIZE_JOIN_PATTERNS_WITH_OSP =
50'000;

Expand Down
18 changes: 14 additions & 4 deletions src/index/Index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,23 @@ ad_utility::MemorySize& Index::memoryLimitIndexBuilding() {
}

// ____________________________________________________________________________
ad_utility::MemorySize& Index::blocksizePermutationsPerColumn() {
return pimpl_->blocksizePermutationPerColumn();
const ad_utility::MemorySize& Index::memoryLimitIndexBuilding() const {
return std::as_const(*pimpl_).memoryLimitIndexBuilding();
}

// ____________________________________________________________________________
const ad_utility::MemorySize& Index::memoryLimitIndexBuilding() const {
return std::as_const(*pimpl_).memoryLimitIndexBuilding();
ad_utility::MemorySize& Index::parserBufferSize() {
return pimpl_->parserBufferSize();
}

// ____________________________________________________________________________
const ad_utility::MemorySize& Index::parserBufferSize() const {
return std::as_const(*pimpl_).parserBufferSize();
}

// ____________________________________________________________________________
ad_utility::MemorySize& Index::blocksizePermutationsPerColumn() {
return pimpl_->blocksizePermutationPerColumn();
}

// ____________________________________________________________________________
Expand Down
3 changes: 3 additions & 0 deletions src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ class Index {
ad_utility::MemorySize& memoryLimitIndexBuilding();
const ad_utility::MemorySize& memoryLimitIndexBuilding() const;

ad_utility::MemorySize& parserBufferSize();
const ad_utility::MemorySize& parserBufferSize() const;

ad_utility::MemorySize& blocksizePermutationsPerColumn();

void setOnDiskBase(const std::string& onDiskBase);
Expand Down
15 changes: 11 additions & 4 deletions src/index/IndexBuilderMain.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2014, University of Freiburg,
// Copyright 2014 - 2025 University of Freiburg
// Chair of Algorithms and Data Structures.
// Author:
// 2014-2017 Björn Buchhold (buchhold@informatik.uni-freiburg.de)
// 2018- Johannes Kalmbach (kalmbach@informatik.uni-freiburg.de)
// Authors: Björn Buchhold <buchhold@cs.uni-freiburg.de> [2014 - 2017]
// Johannes Kalmbach <kalmbach@cs.uni-freiburg.de>
// Hannah Bast <bast@cs.uni-freiburg.de>

#include <boost/program_options.hpp>
#include <cstdlib>
Expand Down Expand Up @@ -165,6 +165,7 @@ int main(int argc, char** argv) {
bool onlyPsoAndPos = false;
bool addWordsFromLiterals = false;
std::optional<ad_utility::MemorySize> stxxlMemory;
std::optional<ad_utility::MemorySize> parserBufferSize;
optind = 1;

Index index{ad_utility::makeUnlimitedAllocator<Id>()};
Expand Down Expand Up @@ -228,6 +229,9 @@ int main(int argc, char** argv) {
add("stxxl-memory,m", po::value(&stxxlMemory),
"The amount of memory in to use for sorting during the index build. "
"Decrease if the index builder runs out of memory.");
add("parser-buffer-size,b", po::value(&parserBufferSize),
"The size of the buffer used for parsing the input files. This must be "
"large enough to hold a single input triple. Default: 10 MB.");
add("keep-temporary-files,k", po::bool_switch(&keepTemporaryFiles),
"Do not delete temporary files from index creation for debugging.");

Expand All @@ -249,6 +253,9 @@ int main(int argc, char** argv) {
if (stxxlMemory.has_value()) {
index.memoryLimitIndexBuilding() = stxxlMemory.value();
}
if (parserBufferSize.has_value()) {
index.parserBufferSize() = parserBufferSize.value();
}

// If no text index name was specified, take the part of the wordsfile after
// the last slash.
Expand Down
5 changes: 3 additions & 2 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ IndexBuilderDataAsFirstPermutationSorter IndexImpl::createIdTriplesAndVocab(
std::unique_ptr<RdfParserBase> IndexImpl::makeRdfParser(
const std::vector<Index::InputFileSpecification>& files) const {
auto makeRdfParserImpl =
[&files]<int useCtre>() -> std::unique_ptr<RdfParserBase> {
[this, &files]<int useCtre>() -> std::unique_ptr<RdfParserBase> {
using TokenizerT =
std::conditional_t<useCtre == 1, TokenizerCtre, Tokenizer>;
return std::make_unique<RdfMultifileParser<TokenizerT>>(files);
return std::make_unique<RdfMultifileParser<TokenizerT>>(
files, this->parserBufferSize());
};

// `callFixedSize` litfts runtime integers to compile time integers. We use it
Expand Down
6 changes: 6 additions & 0 deletions src/index/IndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class IndexImpl {
bool keepTempFiles_ = false;
ad_utility::MemorySize memoryLimitIndexBuilding_ =
DEFAULT_MEMORY_LIMIT_INDEX_BUILDING;
ad_utility::MemorySize parserBufferSize_ = DEFAULT_PARSER_BUFFER_SIZE;
ad_utility::MemorySize blocksizePermutationPerColumn_ =
UNCOMPRESSED_BLOCKSIZE_COMPRESSED_METADATA_PER_COLUMN;
json configurationJson_;
Expand Down Expand Up @@ -406,6 +407,11 @@ class IndexImpl {
return memoryLimitIndexBuilding_;
}

ad_utility::MemorySize& parserBufferSize() { return parserBufferSize_; }
const ad_utility::MemorySize& parserBufferSize() const {
return parserBufferSize_;
}

ad_utility::MemorySize& blocksizePermutationPerColumn() {
return blocksizePermutationPerColumn_;
}
Expand Down
16 changes: 8 additions & 8 deletions src/index/Vocabulary.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ class Vocabulary {
static constexpr bool isCompressed_ =
std::is_same_v<StringType, CompressedString>;

// If a word uses one of these language tags it will be internalized.
vector<std::string> internalizedLangs_{"en"};

// If a word starts with one of those prefixes, it will be externalized When
// a word matched both `externalizedPrefixes_` and `internalizedLangs_`, it
// will be externalized. Qlever-internal prefixes are currently not
// externalized.
vector<std::string> externalizedPrefixes_;
// If a literal uses one of these language tags or starts with one of these
// prefixes, it will be externalized. By default, everything is externalized.
// Both of these settings can be overridden using the `settings.json` file.
//
// NOTE: Qlever-internal prefixes are currently always internalized, no matter
// how `internalizedLangs_` and `externalizedPrefixes_` are set.
vector<std::string> internalizedLangs_;
vector<std::string> externalizedPrefixes_{""};

using UnderlyingVocabulary =
std::conditional_t<isCompressed_,
Expand Down
13 changes: 8 additions & 5 deletions src/parser/ParallelBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,14 @@ ParallelBufferWithEndRegex::getNextBlock() {
if (!endPosition) {
if (rawBuffer_.getNextBlock()) {
throw std::runtime_error(absl::StrCat(
"The regex \"", endRegexAsString_,
"\" which marks the end of a statement was not found at "
"all within a single batch that was not the last one. Please "
"increase the FILE_BUFFER_SIZE "
"or set \"parallel-parsing: false\" in the settings file."));
"The regex ", endRegexAsString_,
" which marks the end of a statement was not found in the current "
"input batch (that was not the last one) of size ",
ad_utility::insertThousandSeparator(std::to_string(rawInput->size()),
','),
"; possible fixes are: "
"use `--parser-buffer-size` to increase the buffer size or "
"use `--parse-parallel false` to disable parallel parsing"));
}
endPosition = rawInput->size();
exhausted_ = true;
Expand Down
3 changes: 3 additions & 0 deletions src/parser/ParallelBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class ParallelBuffer {
*/
virtual std::optional<BufferType> getNextBlock() = 0;

// Get the blocksize of this buffer.
size_t getBlocksize() const { return blocksize_; }

protected:
size_t blocksize_ = 100 * (2 << 20);
};
Expand Down
41 changes: 25 additions & 16 deletions src/parser/RdfParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ template <class T>
bool RdfStreamParser<T>::resetStateAndRead(
RdfStreamParser::TurtleParserBackupState* bPtr) {
auto& b = *bPtr;
AD_CORRECTNESS_CHECK(fileBuffer_);
auto nextBytesOpt = fileBuffer_->getNextBlock();
if (!nextBytesOpt || nextBytesOpt.value().empty()) {
// there are no more decompressed bytes, just continue with what we've got
Expand Down Expand Up @@ -821,7 +822,8 @@ bool RdfStreamParser<T>::resetStateAndRead(
}

template <class T>
void RdfStreamParser<T>::initialize(const string& filename) {
void RdfStreamParser<T>::initialize(const string& filename,
ad_utility::MemorySize bufferSize) {
this->clear();
// Make sure that a block of data ends with a newline. This is important for
// two reasons:
Expand All @@ -834,10 +836,10 @@ void RdfStreamParser<T>::initialize(const string& filename) {
// The reason is that with a `.` at the end, we cannot decide whether we are
// in the middle of a `PN_LOCAL` (that continues in the next buffer) or at the
// end of a statement.
fileBuffer_ =
std::make_unique<ParallelBufferWithEndRegex>(bufferSize_, "([\\r\\n]+)");
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "([\\r\\n]+)");
fileBuffer_->open(filename);
byteVec_.resize(bufferSize_);
byteVec_.resize(bufferSize.getBytes());
// decompress the first block and initialize Tokenizer
if (auto res = fileBuffer_->getNextBlock(); res) {
byteVec_ = std::move(res.value());
Expand Down Expand Up @@ -998,7 +1000,7 @@ void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(
inputBatch = std::move(remainingBatchFromInitialization);
first = false;
} else {
auto nextOptional = fileBuffer_.getNextBlock();
auto nextOptional = fileBuffer_->getNextBlock();
if (!nextOptional) {
return;
}
Expand Down Expand Up @@ -1026,10 +1028,13 @@ void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(

// _______________________________________________________________________
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::initialize(const string& filename) {
void RdfParallelParser<Tokenizer_T>::initialize(
const string& filename, ad_utility::MemorySize bufferSize) {
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "\\.[\\t ]*([\\r\\n]+)");
ParallelBuffer::BufferType remainingBatchFromInitialization;
fileBuffer_.open(filename);
if (auto batch = fileBuffer_.getNextBlock(); !batch) {
fileBuffer_->open(filename);
if (auto batch = fileBuffer_->getNextBlock(); !batch) {
LOG(WARN) << "Empty input to the TURTLE parser, is this what you intended?"
<< std::endl;
} else {
Expand Down Expand Up @@ -1109,7 +1114,8 @@ RdfParallelParser<T>::~RdfParallelParser() {
// file is to be parsed in parallel.
template <typename TokenizerT>
static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
const Index::InputFileSpecification& file) {
const Index::InputFileSpecification& file,
ad_utility::MemorySize bufferSize) {
auto graph = [file]() -> TripleComponent {
if (file.defaultGraph_.has_value()) {
return TripleComponent::Iri::fromIrirefWithoutBrackets(
Expand All @@ -1118,7 +1124,7 @@ static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
return qlever::specialIds().at(DEFAULT_GRAPH_IRI);
}
};
auto makeRdfParserImpl = [&filename = file.filename_,
auto makeRdfParserImpl = [&filename = file.filename_, &bufferSize,
&graph]<int useParallel, int isTurtleInput>()
-> std::unique_ptr<RdfParserBase> {
using InnerParser =
Expand All @@ -1127,7 +1133,7 @@ static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
using Parser =
std::conditional_t<useParallel == 1, RdfParallelParser<InnerParser>,
RdfStreamParser<InnerParser>>;
return std::make_unique<Parser>(filename, graph());
return std::make_unique<Parser>(filename, bufferSize, graph());
};

// The call to `callFixedSize` lifts runtime integers to compile time
Expand All @@ -1142,13 +1148,15 @@ static std::unique_ptr<RdfParserBase> makeSingleRdfParser(
// ______________________________________________________________
template <typename T>
RdfMultifileParser<T>::RdfMultifileParser(
const std::vector<qlever::InputFileSpecification>& files) {
const std::vector<qlever::InputFileSpecification>& files,
ad_utility::MemorySize bufferSize) {
using namespace qlever;
// This lambda parses a single file and pushes the results and all occurring
// exceptions to the `finishedBatchQueue_`.
auto parseFile = [this](const InputFileSpecification& file) {
auto parseFile = [this](const InputFileSpecification& file,
ad_utility::MemorySize bufferSize) {
try {
auto parser = makeSingleRdfParser<Tokenizer>(file);
auto parser = makeSingleRdfParser<Tokenizer>(file, bufferSize);
while (auto batch = parser->getBatch()) {
bool active = finishedBatchQueue_.push(std::move(batch.value()));
if (!active) {
Expand All @@ -1169,10 +1177,11 @@ RdfMultifileParser<T>::RdfMultifileParser(
};

// Feed all the input files to the `parsingQueue_`.
auto makeParsers = [files, this, parseFile]() {
auto makeParsers = [files, bufferSize, this, parseFile]() {
for (const auto& file : files) {
numActiveParsers_++;
bool active = parsingQueue_.push(std::bind_front(parseFile, file));
bool active =
parsingQueue_.push(std::bind_front(parseFile, file, bufferSize));
if (!active) {
// The queue was finished prematurely, stop this thread. This is
// important to avoid deadlocks.
Expand Down
Loading

0 comments on commit 1642175

Please sign in to comment.