@@ -330,7 +330,7 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
330
330
if (useHashMapOptimization) {
331
331
const auto * child = _subtree->getRootOperation ()->getChildren ().at (0 );
332
332
// Skip sorting
333
- subresult = child->getResult ();
333
+ subresult = child->getResult (true );
334
334
// Update runtime information
335
335
auto runTimeInfoChildren =
336
336
child->getRootOperation ()->getRuntimeInfoPointer ();
@@ -366,13 +366,28 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
366
366
}
367
367
368
368
if (useHashMapOptimization) {
369
- auto localVocab = subresult->getCopyOfLocalVocab ();
370
- IdTable idTable = CALL_FIXED_SIZE (
371
- groupByCols.size (), &GroupBy::computeGroupByForHashMapOptimization,
372
- this , metadataForUnsequentialData->aggregateAliases_ ,
373
- subresult->idTable (), groupByCols, &localVocab);
369
+ // Helper lambda that calls `computeGroupByForHashMapOptimization` for the
370
+ // given `subresults`.
371
+ auto computeWithHashMap = [this , &metadataForUnsequentialData,
372
+ &groupByCols](auto && subresults) {
373
+ auto doCompute = [&]<int NumCols> {
374
+ return computeGroupByForHashMapOptimization<NumCols>(
375
+ metadataForUnsequentialData->aggregateAliases_ , AD_FWD (subresults),
376
+ groupByCols);
377
+ };
378
+ return ad_utility::callFixedSize (groupByCols.size (), doCompute);
379
+ };
374
380
375
- return {std::move (idTable), resultSortedOn (), std::move (localVocab)};
381
+ // Now call `computeWithHashMap` and return the result. It expects a range
382
+ // of results, so if the result is fully materialized, we create an array
383
+ // with a single element.
384
+ if (subresult->isFullyMaterialized ()) {
385
+ return computeWithHashMap (
386
+ std::array{std::pair{std::cref (subresult->idTable ()),
387
+ std::cref (subresult->localVocab ())}});
388
+ } else {
389
+ return computeWithHashMap (std::move (subresult->idTables ()));
390
+ }
376
391
}
377
392
378
393
size_t inWidth = _subtree->getResultWidth ();
@@ -846,7 +861,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
846
861
const auto & index = getExecutionContext ()->getIndex ();
847
862
848
863
// TODO<joka921, C++23> Simplify the following pattern by using
849
- // `ql::views::chunkd_by ` and implement a lazy version of this view for
864
+ // `ql::views::chunk_by ` and implement a lazy version of this view for
850
865
// input iterators.
851
866
852
867
// Take care of duplicate values in the input.
@@ -1487,78 +1502,95 @@ static constexpr auto makeProcessGroupsVisitor =
1487
1502
1488
1503
// _____________________________________________________________________________
1489
1504
template <size_t NUM_GROUP_COLUMNS>
1490
- IdTable GroupBy::computeGroupByForHashMapOptimization (
1491
- std::vector<HashMapAliasInformation>& aggregateAliases,
1492
- const IdTable& subresult, const std::vector<size_t >& columnIndices,
1493
- LocalVocab* localVocab) const {
1494
- AD_CONTRACT_CHECK (columnIndices. size () == NUM_GROUP_COLUMNS ||
1495
- NUM_GROUP_COLUMNS == 0 ) ;
1496
-
1497
- // Initialize aggregation data
1505
+ Result GroupBy::computeGroupByForHashMapOptimization (
1506
+ std::vector<HashMapAliasInformation>& aggregateAliases, auto subresults,
1507
+ const std::vector<size_t >& columnIndices) const {
1508
+ AD_CORRECTNESS_CHECK (columnIndices. size () == NUM_GROUP_COLUMNS ||
1509
+ NUM_GROUP_COLUMNS == 0 );
1510
+ LocalVocab localVocab ;
1511
+
1512
+ // Initialize the data for the aggregates of the GROUP BY operation.
1498
1513
HashMapAggregationData<NUM_GROUP_COLUMNS> aggregationData (
1499
1514
getExecutionContext ()->getAllocator (), aggregateAliases,
1500
1515
columnIndices.size ());
1501
1516
1502
- // Initialize evaluation context
1503
- sparqlExpression::EvaluationContext evaluationContext (
1504
- *getExecutionContext (), _subtree->getVariableColumns (), subresult,
1505
- getExecutionContext ()->getAllocator (), *localVocab, cancellationHandle_,
1506
- deadline_);
1507
-
1508
- evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
1509
- _groupByVariables.begin (), _groupByVariables.end ()};
1510
- evaluationContext._isPartOfGroupBy = true ;
1511
-
1517
+ // Process the input blocks (pairs of `IdTable` and `LocalVocab`) one after
1518
+ // the other.
1512
1519
ad_utility::Timer lookupTimer{ad_utility::Timer::Stopped};
1513
1520
ad_utility::Timer aggregationTimer{ad_utility::Timer::Stopped};
1514
- for (size_t i = 0 ; i < subresult.size (); i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
1515
- checkCancellation ();
1516
-
1517
- evaluationContext._beginIndex = i;
1518
- evaluationContext._endIndex =
1519
- std::min (i + GROUP_BY_HASH_MAP_BLOCK_SIZE, subresult.size ());
1520
-
1521
- auto currentBlockSize = evaluationContext.size ();
1522
-
1523
- // Perform HashMap lookup once for all groups in current block
1524
- using U = HashMapAggregationData<NUM_GROUP_COLUMNS>::template ArrayOrVector<
1525
- std::span<const Id>>;
1526
- U groupValues;
1527
- resizeIfVector (groupValues, columnIndices.size ());
1528
-
1529
- // TODO<C++23> use views::enumerate
1530
- size_t j = 0 ;
1531
- for (auto & idx : columnIndices) {
1532
- groupValues[j] = subresult.getColumn (idx).subspan (
1533
- evaluationContext._beginIndex , currentBlockSize);
1534
- ++j;
1535
- }
1536
- lookupTimer.cont ();
1537
- auto hashEntries = aggregationData.getHashEntries (groupValues);
1538
- lookupTimer.stop ();
1539
-
1540
- aggregationTimer.cont ();
1541
- for (auto & aggregateAlias : aggregateAliases) {
1542
- for (auto & aggregate : aggregateAlias.aggregateInfo_ ) {
1543
- sparqlExpression::ExpressionResult expressionResult =
1544
- GroupBy::evaluateChildExpressionOfAggregateFunction (
1545
- aggregate, evaluationContext);
1546
-
1547
- auto & aggregationDataVariant =
1548
- aggregationData.getAggregationDataVariant (
1549
- aggregate.aggregateDataIndex_ );
1550
-
1551
- std::visit (makeProcessGroupsVisitor (currentBlockSize,
1552
- &evaluationContext, hashEntries),
1553
- std::move (expressionResult), aggregationDataVariant);
1521
+ for (const auto & [inputTableRef, inputLocalVocabRef] : subresults) {
1522
+ const IdTable& inputTable = inputTableRef;
1523
+ const LocalVocab& inputLocalVocab = inputLocalVocabRef;
1524
+
1525
+ // Merge the local vocab of each input block.
1526
+ //
1527
+ // NOTE: If the input blocks have very similar or even identical non-empty
1528
+ // local vocabs, no deduplication is performed.
1529
+ localVocab.mergeWith (std::span{&inputLocalVocab, 1 });
1530
+
1531
+ // Setup the `EvaluationContext` for this input block.
1532
+ sparqlExpression::EvaluationContext evaluationContext (
1533
+ *getExecutionContext (), _subtree->getVariableColumns (), inputTable,
1534
+ getExecutionContext ()->getAllocator (), localVocab, cancellationHandle_,
1535
+ deadline_);
1536
+ evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
1537
+ _groupByVariables.begin (), _groupByVariables.end ()};
1538
+ evaluationContext._isPartOfGroupBy = true ;
1539
+
1540
+ // Iterate of the rows of this input block. Process (up to)
1541
+ // `GROUP_BY_HASH_MAP_BLOCK_SIZE` rows at a time.
1542
+ for (size_t i = 0 ; i < inputTable.size ();
1543
+ i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
1544
+ checkCancellation ();
1545
+
1546
+ evaluationContext._beginIndex = i;
1547
+ evaluationContext._endIndex =
1548
+ std::min (i + GROUP_BY_HASH_MAP_BLOCK_SIZE, inputTable.size ());
1549
+
1550
+ auto currentBlockSize = evaluationContext.size ();
1551
+
1552
+ // Perform HashMap lookup once for all groups in current block
1553
+ using U = HashMapAggregationData<
1554
+ NUM_GROUP_COLUMNS>::template ArrayOrVector<std::span<const Id>>;
1555
+ U groupValues;
1556
+ resizeIfVector (groupValues, columnIndices.size ());
1557
+
1558
+ // TODO<C++23> use views::enumerate
1559
+ size_t j = 0 ;
1560
+ for (auto & idx : columnIndices) {
1561
+ groupValues[j] = inputTable.getColumn (idx).subspan (
1562
+ evaluationContext._beginIndex , currentBlockSize);
1563
+ ++j;
1564
+ }
1565
+ lookupTimer.cont ();
1566
+ auto hashEntries = aggregationData.getHashEntries (groupValues);
1567
+ lookupTimer.stop ();
1568
+
1569
+ aggregationTimer.cont ();
1570
+ for (auto & aggregateAlias : aggregateAliases) {
1571
+ for (auto & aggregate : aggregateAlias.aggregateInfo_ ) {
1572
+ sparqlExpression::ExpressionResult expressionResult =
1573
+ GroupBy::evaluateChildExpressionOfAggregateFunction (
1574
+ aggregate, evaluationContext);
1575
+
1576
+ auto & aggregationDataVariant =
1577
+ aggregationData.getAggregationDataVariant (
1578
+ aggregate.aggregateDataIndex_ );
1579
+
1580
+ std::visit (makeProcessGroupsVisitor (currentBlockSize,
1581
+ &evaluationContext, hashEntries),
1582
+ std::move (expressionResult), aggregationDataVariant);
1583
+ }
1554
1584
}
1585
+ aggregationTimer.stop ();
1555
1586
}
1556
- aggregationTimer.stop ();
1557
1587
}
1588
+
1558
1589
runtimeInfo ().addDetail (" timeMapLookup" , lookupTimer.msecs ());
1559
1590
runtimeInfo ().addDetail (" timeAggregation" , aggregationTimer.msecs ());
1560
-
1561
- return createResultFromHashMap (aggregationData, aggregateAliases, localVocab);
1591
+ IdTable resultTable =
1592
+ createResultFromHashMap (aggregationData, aggregateAliases, &localVocab);
1593
+ return {std::move (resultTable), resultSortedOn (), std::move (localVocab)};
1562
1594
}
1563
1595
1564
1596
// _____________________________________________________________________________
0 commit comments