Skip to content

Commit

Permalink
Cs to stable 24 3 (ydb-platform#9268)
Browse files Browse the repository at this point in the history
Co-authored-by: Semyon <yentsovsemyon@ydb.tech>
Co-authored-by: ivanmorozov333 <ivanmorozov@ydb.tech>
Co-authored-by: Ivan <5627721+abyss7@users.noreply.github.com>
Co-authored-by: Artem Alekseev <fexolm@ydb.tech>
Co-authored-by: Alexander Avdonkin <aavdonkin@yandex.ru>
Co-authored-by: Nikita Vasilev <ns-vasilev@ydb.tech>
Co-authored-by: Vladislav Gogov <vlad-gogov@ydb.tech>
Conflicts:
	ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
	ydb/core/kqp/compute_actor/kqp_compute_actor.h
	ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
	ydb/core/kqp/compute_actor/kqp_compute_events.h
	ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
	ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
	ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
	ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
	ydb/core/kqp/executer_actor/kqp_data_executer.cpp
	ydb/core/kqp/executer_actor/kqp_planner.cpp
	ydb/core/kqp/executer_actor/kqp_planner.h
	ydb/core/kqp/node_service/kqp_node_service.cpp
	ydb/core/kqp/ut/common/kqp_ut_common.cpp
	ydb/core/kqp/ut/olap/clickbench_ut.cpp
	ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
	ydb/core/kqp/ut/tx/kqp_sink_common.h
	ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp
	ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp
	ydb/core/kqp/ut/tx/kqp_sink_tx_ut.cpp
	ydb/core/protos/feature_flags.proto
	ydb/core/protos/kqp.proto
	ydb/core/testlib/basics/feature_flags.h
	ydb/core/tx/columnshard/engines/scheme/index_info.cpp
	ydb/core/tx/columnshard/splitter/chunks.h
	ydb/library/yql/sql/v1/sql_ut.cpp
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-12
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-14
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-15
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-16
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-17
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-21
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-30
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-31
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-32
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-33
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-34
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-36
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-37
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-38
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-41
	ydb/tests/functional/clickbench/canondata/test.test_plans_column_/queries-original-plan-column-7
  • Loading branch information
zverevgeny committed Sep 16, 2024
1 parent ffe45cd commit 5509bcc
Show file tree
Hide file tree
Showing 588 changed files with 22,067 additions and 6,199 deletions.
8 changes: 2 additions & 6 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDat
ydb/core/kqp/ut/join KqpJoinOrder.Chain65Nodes
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlapAggregations.Aggregation_ResultCountAll_FilterL
ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssueInteractiveTx+withSink
ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssue+withSink
Expand All @@ -28,10 +26,6 @@ ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExte
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_StreamGenericQuery
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_UsesGenericQueryOnJoinWithDataShardTable
ydb/core/kqp/ut/scheme KqpOlapScheme.DropTable
ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication
ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/scheme [14/50]*
Expand Down Expand Up @@ -118,4 +112,6 @@ ydb/tests/functional/tenants test_storage_config.py.TestStorageConfig.*
ydb/tests/functional/tenants test_tenants.py.*
ydb/tests/functional/ydb_cli test_ydb_impex.py.TestImpex.test_big_dataset*
ydb/tests/tools/pq_read/test test_timeout.py.TestTimeout.test_timeout
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/tests/functional/rename [test_rename.py */10] chunk chunk
2 changes: 2 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ struct TKikimrEvents : TEvents {
ES_TX_BACKGROUND = 4256,
ES_SS_BG_TASKS = 4257,
ES_LIMITER = 4258,
//ES_MEMORY = 4259, NB. exists in main
ES_GROUPED_ALLOCATIONS_MANAGER = 4260,
};
};

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@
#include <ydb/core/tx/limiter/usage/config.h>
#include <ydb/core/tx/limiter/usage/service.h>

#include <ydb/core/tx/limiter/grouped_memory/usage/config.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>

#include <ydb/core/backup/controller/tablet.h>

#include <ydb/services/ext_index/common/config.h>
Expand Down Expand Up @@ -2179,6 +2182,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}

TGroupedMemoryLimiterInitializer::TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}

void TGroupedMemoryLimiterInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
NOlap::NGroupedMemoryManager::TConfig serviceConfig;
Y_ABORT_UNLESS(serviceConfig.DeserializeFromProto(Config.GetGroupedMemoryLimiterConfig()));

if (serviceConfig.IsEnabled()) {
TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> countersGroup = tabletGroup->GetSubgroup("type", "TX_GROUPED_MEMORY_LIMITER");

auto service = NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::CreateService(serviceConfig, countersGroup);

setup->LocalServices.push_back(std::make_pair(
NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(NodeId),
TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
}
}

TCompDiskLimiterInitializer::TCompDiskLimiterInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig) {
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TGroupedMemoryLimiterInitializer: public IKikimrServicesInitializer {
public:
TGroupedMemoryLimiterInitializer(const TKikimrRunConfig& runConfig);
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TCompConveyorInitializer: public IKikimrServicesInitializer {
public:
TCompConveyorInitializer(const TKikimrRunConfig& runConfig);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TCompDiskLimiterInitializer(runConfig));
}

if (serviceMask.EnableGroupedMemoryLimiter) {
sil->AddServiceInitializer(new TGroupedMemoryLimiterInitializer(runConfig));
}

if (serviceMask.EnableScanConveyor) {
sil->AddServiceInitializer(new TScanConveyorInitializer(runConfig));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/service_mask.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ union TBasicKikimrServicesMask {
bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
bool EnableCompDiskLimiter:1;
bool EnableGroupedMemoryLimiter:1;
};

struct {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ PEERDIR(
ydb/core/tx/coordinator
ydb/core/tx/conveyor/service
ydb/core/tx/limiter/service
ydb/core/tx/limiter/grouped_memory/usage
ydb/core/tx/datashard
ydb/core/tx/long_tx_service
ydb/core/tx/long_tx_service/public
Expand Down
163 changes: 163 additions & 0 deletions ydb/core/formats/arrow/accessor/abstract/accessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include "accessor.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/permutations.h>
#include <ydb/core/formats/arrow/save_load/saver.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/formats/arrow/splitter/simple.h>
#include <ydb/core/formats/arrow/switch/compare.h>
#include <ydb/core/formats/arrow/switch/switch_type.h>

#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow::NAccessor {

void IChunkedArray::TReader::AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const {
auto address = GetReadChunk(position);
AFL_VERIFY(NArrow::Append(builder, *address.GetArray(), address.GetPosition(), recordSize));
}

std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 recordIndex) const {
auto address = GetReadChunk(recordIndex);
return NArrow::CopyRecords(address.GetArray(), { address.GetPosition() });
}

std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
ui32 currentOffset = offset;
ui32 countLeast = count;
std::vector<std::shared_ptr<arrow::Array>> chunks;
auto address = GetChunkSlow(offset);
while (countLeast) {
address = GetChunk(address.GetAddress(), currentOffset);
const ui64 internalPos = address.GetAddress().GetLocalIndex(currentOffset);
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
break;
} else {
const ui32 deltaCount = address.GetArray()->length() - internalPos;
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
AFL_VERIFY(countLeast >= deltaCount);
countLeast -= deltaCount;
currentOffset += deltaCount;
}
}
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
}

NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
AFL_VERIFY(position < GetRecordsCount());
std::optional<TCommonChunkAddress> address;

if (IsDataOwner()) {
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetSize() == 1)("size", chunkCurrent->GetSize());
}
auto localAddress = GetLocalData(address, position);
TAddressChain addressChain;
addressChain.Add(localAddress.GetAddress());
AFL_VERIFY(addressChain.Contains(position));
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
} else {
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
"chunked", chunkedArrayAddress.GetAddress().GetSize());
}
auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
fullAddress.Add(localAddress.GetAddress());
AFL_VERIFY(fullAddress.Contains(position));
return TFullDataAddress(localAddress.GetArray(), std::move(fullAddress));
}
}

IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
AFL_VERIFY(position < GetRecordsCount());
if (IsDataOwner()) {
AFL_VERIFY(selfPtr);
TAddressChain chain;
chain.Add(TCommonChunkAddress(0, GetRecordsCount(), 0));
return IChunkedArray::TFullChunkedArrayAddress(selfPtr, std::move(chain));
}
TAddressChain addressChain;

auto* currentLevel = this;
ui32 currentPosition = position;
ui32 idx = 0;
std::vector<std::shared_ptr<IChunkedArray>> chainForTemporarySave;
while (!currentLevel->IsDataOwner()) {
std::optional<TCommonChunkAddress> currentAddress;
if (chunkCurrent) {
currentAddress = chunkCurrent->GetAddress(idx);
}
auto nextChunkedArray = currentLevel->GetLocalChunkedArray(currentAddress, currentPosition);
chainForTemporarySave.emplace_back(nextChunkedArray.GetArray());
currentLevel = chainForTemporarySave.back().get();
addressChain.Add(nextChunkedArray.GetAddress());
AFL_VERIFY(nextChunkedArray.GetAddress().GetStartPosition() <= currentPosition);
currentPosition -= nextChunkedArray.GetAddress().GetStartPosition();
++idx;
}
AFL_VERIFY(!chunkCurrent || chunkCurrent->GetSize() - idx <= 1)("idx", idx)("size", chunkCurrent->GetSize());
return TFullChunkedArrayAddress(chainForTemporarySave.back(), std::move(addressChain));
}

TString IChunkedArray::TReader::DebugString(const ui32 position) const {
auto address = GetReadChunk(position);
return NArrow::DebugString(address.GetArray(), address.GetPosition());
}

std::partial_ordering IChunkedArray::TReader::CompareColumns(
const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition) {
AFL_VERIFY(l.size() == r.size());
for (ui32 i = 0; i < l.size(); ++i) {
const TAddress lAddress = l[i].GetReadChunk(lPosition);
const TAddress rAddress = r[i].GetReadChunk(rPosition);
auto cmp = lAddress.Compare(rAddress);
if (std::is_neq(cmp)) {
return cmp;
}
}
return std::partial_ordering::equivalent;
}

IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const {
AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
if (CurrentChunkAddress && CurrentChunkAddress->GetAddress().Contains(position)) {
} else {
CurrentChunkAddress = ChunkedArray->GetChunk(CurrentChunkAddress, position);
}
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), CurrentChunkAddress->GetAddress().GetLocalIndex(position));
}

const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const {
return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
}

TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
: Array(array)
, SerializedData(serializedData) {
AFL_VERIFY(serializedData);
AFL_VERIFY(Array);
AFL_VERIFY(Array->GetRecordsCount());
}

std::partial_ordering IChunkedArray::TFullDataAddress::Compare(
const ui64 position, const TFullDataAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(Address.Contains(position))("pos", position)("start", Address.DebugString());
AFL_VERIFY(item.Address.Contains(itemPosition))("pos", itemPosition)("start", item.Address.DebugString());
return TComparator::TypedCompare<true>(*Array, Address.GetLocalIndex(position), *item.Array, item.Address.GetLocalIndex(itemPosition));
}

std::shared_ptr<arrow::Array> IChunkedArray::TFullDataAddress::CopyRecord(const ui64 recordIndex) const {
return NArrow::CopyRecords(Array, { Address.GetLocalIndex(recordIndex) });
}

TString IChunkedArray::TFullDataAddress::DebugString(const ui64 position) const {
return NArrow::DebugString(Array, Address.GetLocalIndex(position));
}

} // namespace NKikimr::NArrow::NAccessor
Loading

0 comments on commit 5509bcc

Please sign in to comment.