Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga committed Nov 13, 2024
1 parent 8fb382c commit fdd7168
Show file tree
Hide file tree
Showing 30 changed files with 243 additions and 313 deletions.
176 changes: 88 additions & 88 deletions ydb/core/cms/console/shared_cache_configurator.cpp
Original file line number Diff line number Diff line change
@@ -1,88 +1,88 @@
#include "shared_cache_configurator.h"
#include "configs_dispatcher.h"
#include "console.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/tablet_flat/shared_sausagecache.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>

#include <ydb/core/protos/bootstrap.pb.h>

namespace NKikimr::NConsole {

class TSharedCacheConfigurator : public TActorBootstrapped<TSharedCacheConfigurator> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::SHARED_CACHE_CONFIGURATOR;
}

TSharedCacheConfigurator() = default;

void Bootstrap(const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TSharedCacheConfigurator Bootstrap");

ui32 item1 = (ui32)NKikimrConsole::TConfigItem::BootstrapConfigItem;
ui32 item2 = (ui32)NKikimrConsole::TConfigItem::SharedCacheConfigItem;
ctx.Send(MakeConfigsDispatcherID(SelfId().NodeId()),
new TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({ item1, item2 }));
Become(&TThis::StateWork);
}

void Handle(TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;

LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS,
"TSharedCacheConfigurator: got new config: " << record.GetConfig().ShortDebugString());

auto* appData = AppData(ctx);

NKikimrSharedCache::TSharedCacheConfig cfg;
if (record.GetConfig().HasBootstrapConfig()) {
if (record.GetConfig().GetBootstrapConfig().HasSharedCacheConfig()) {
cfg.MergeFrom(record.GetConfig().GetBootstrapConfig().GetSharedCacheConfig());
}
} else if (appData->BootstrapConfig.HasSharedCacheConfig()) {
cfg.MergeFrom(appData->BootstrapConfig.GetSharedCacheConfig());
}
if (record.GetConfig().HasSharedCacheConfig()) {
cfg.MergeFrom(record.GetConfig().GetSharedCacheConfig());
} else {
cfg.MergeFrom(appData->SharedCacheConfig);
}

ApplyConfig(std::move(cfg), ctx);

auto response = MakeHolder<TEvConsole::TEvConfigNotificationResponse>(record);
ctx.Send(ev->Sender, response.Release(), 0, ev->Cookie);
}

void ApplyConfig(NKikimrSharedCache::TSharedCacheConfig&& cfg, const TActorContext& ctx) {
auto event = MakeHolder<TEvSharedPageCache::TEvConfigure>();
event->Record.Swap(&cfg);

LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS,
"Applying new shared cache config: " << event->Record.ShortDebugString());

ctx.Send(MakeSharedPageCacheId(0), event.Release());
}

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvConsole::TEvConfigNotificationRequest, Handle);
IgnoreFunc(TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse);

default:
Y_ABORT("unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->ToString().data());
break;
}
}
};

IActor *CreateSharedCacheConfigurator()
{
return new TSharedCacheConfigurator();
}

} // namespace NKikimr::NConsole
// #include "shared_cache_configurator.h"
// #include "configs_dispatcher.h"
// #include "console.h"

// #include <ydb/core/base/appdata.h>
// #include <ydb/core/tablet_flat/shared_sausagecache.h>

// #include <ydb/library/actors/core/actor_bootstrapped.h>

// #include <ydb/core/protos/bootstrap.pb.h>

// namespace NKikimr::NConsole {

// class TSharedCacheConfigurator : public TActorBootstrapped<TSharedCacheConfigurator> {
// public:
// static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
// return NKikimrServices::TActivity::SHARED_CACHE_CONFIGURATOR;
// }

// TSharedCacheConfigurator() = default;

// void Bootstrap(const TActorContext& ctx) {
// LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS, "TSharedCacheConfigurator Bootstrap");

// ui32 item1 = (ui32)NKikimrConsole::TConfigItem::BootstrapConfigItem;
// ui32 item2 = (ui32)NKikimrConsole::TConfigItem::SharedCacheConfigItem;
// ctx.Send(MakeConfigsDispatcherID(SelfId().NodeId()),
// new TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest({ item1, item2 }));
// Become(&TThis::StateWork);
// }

// void Handle(TEvConsole::TEvConfigNotificationRequest::TPtr& ev, const TActorContext& ctx) {
// const auto& record = ev->Get()->Record;

// LOG_INFO_S(ctx, NKikimrServices::CMS_CONFIGS,
// "TSharedCacheConfigurator: got new config: " << record.GetConfig().ShortDebugString());

// auto* appData = AppData(ctx);

// NKikimrSharedCache::TSharedCacheConfig cfg;
// if (record.GetConfig().HasBootstrapConfig()) {
// if (record.GetConfig().GetBootstrapConfig().HasSharedCacheConfig()) {
// cfg.MergeFrom(record.GetConfig().GetBootstrapConfig().GetSharedCacheConfig());
// }
// } else if (appData->BootstrapConfig.HasSharedCacheConfig()) {
// cfg.MergeFrom(appData->BootstrapConfig.GetSharedCacheConfig());
// }
// if (record.GetConfig().HasSharedCacheConfig()) {
// cfg.MergeFrom(record.GetConfig().GetSharedCacheConfig());
// } else {
// cfg.MergeFrom(appData->SharedCacheConfig);
// }

// ApplyConfig(std::move(cfg), ctx);

// auto response = MakeHolder<TEvConsole::TEvConfigNotificationResponse>(record);
// ctx.Send(ev->Sender, response.Release(), 0, ev->Cookie);
// }

// void ApplyConfig(NKikimrSharedCache::TSharedCacheConfig&& cfg, const TActorContext& ctx) {
// auto event = MakeHolder<TEvSharedPageCache::TEvConfigure>();
// event->Record.Swap(&cfg);

// LOG_DEBUG_S(ctx, NKikimrServices::CMS_CONFIGS,
// "Applying new shared cache config: " << event->Record.ShortDebugString());

// ctx.Send(MakeSharedPageCacheId(0), event.Release());
// }

// STFUNC(StateWork) {
// switch (ev->GetTypeRewrite()) {
// HFunc(TEvConsole::TEvConfigNotificationRequest, Handle);
// IgnoreFunc(TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse);

// default:
// Y_ABORT("unexpected event type: %" PRIx32 " event: %s",
// ev->GetTypeRewrite(), ev->ToString().data());
// break;
// }
// }
// };

// IActor *CreateSharedCacheConfigurator()
// {
// return new TSharedCacheConfigurator();
// }

// } // namespace NKikimr::NConsole
33 changes: 6 additions & 27 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1092,38 +1092,17 @@ TSharedCacheInitializer::TSharedCacheInitializer(const TKikimrRunConfig& runConf
void TSharedCacheInitializer::InitializeServices(
NActors::TActorSystemSetup* setup,
const NKikimr::TAppData* appData) {
auto config = MakeHolder<TSharedPageCacheConfig>();

NKikimrSharedCache::TSharedCacheConfig cfg;
NKikimrSharedCache::TSharedCacheConfig config;
if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasSharedCacheConfig()) {
cfg.MergeFrom(Config.GetBootstrapConfig().GetSharedCacheConfig());
config.MergeFrom(Config.GetBootstrapConfig().GetSharedCacheConfig());
}
if (Config.HasSharedCacheConfig()) {
cfg.MergeFrom(Config.GetSharedCacheConfig());
config.MergeFrom(Config.GetSharedCacheConfig());
}

if (cfg.HasMemoryLimit() && cfg.GetMemoryLimit() != 0) {
// config limit is optional
// if preserved apply both memory controller limit and config limit
config->LimitBytes = cfg.GetMemoryLimit();
} else {
config->LimitBytes = {};
}
config->TotalAsyncQueueInFlyLimit = cfg.GetAsyncQueueInFlyLimit();
config->TotalScanQueueInFlyLimit = cfg.GetScanQueueInFlyLimit();
config->ReplacementPolicy = cfg.GetReplacementPolicy();
config->ActivePagesReservationPercent = cfg.GetActivePagesReservationPercent();

TIntrusivePtr<::NMonitoring::TDynamicCounters> tabletGroup = GetServiceCounters(appData->Counters, "tablets");
TIntrusivePtr<::NMonitoring::TDynamicCounters> sausageGroup = tabletGroup->GetSubgroup("type", "S_CACHE");
config->Counters = new TSharedPageCacheCounters(sausageGroup);

setup->LocalServices.push_back(std::pair<TActorId, TActorSetupCmd>(MakeSharedPageCacheId(0),
TActorSetupCmd(CreateSharedPageCache(std::move(config)), TMailboxType::ReadAsFilled, appData->UserPoolId)));

auto *configurator = NConsole::CreateSharedCacheConfigurator();
setup->LocalServices.emplace_back(TActorId(),
TActorSetupCmd(configurator, TMailboxType::HTSwap, appData->UserPoolId));
auto* actor = NSharedCache::CreateSharedPageCache(config, appData->Counters);
setup->LocalServices.emplace_back(NSharedCache::MakeSharedPageCacheId(0),
TActorSetupCmd(actor, TMailboxType::ReadAsFilled, appData->UserPoolId));
}

// TBlobCacheInitializer
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/memory_controller/memory_controller_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace NKikimr::NMemory {

using namespace Tests;
using namespace NSharedCache;

namespace {

Expand Down Expand Up @@ -332,7 +333,7 @@ Y_UNIT_TEST(SharedCache_ConfigLimit) {

auto memoryControllerConfig = serverSettings.AppConfig->MutableMemoryControllerConfig();
memoryControllerConfig->SetHardLimitBytes(300_MB);
serverSettings.CacheParams.Shared = 100_MB;
serverSettings.AppConfig->MutableSharedCacheConfig()->SetMemoryLimit(100_MB);

auto server = MakeIntrusive<TWithMemoryControllerServer>(serverSettings);
auto& runtime = *server->GetRuntime();
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/mind/node_broker_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ void SetupServices(TTestActorRuntime &runtime,
SetupNodeWhiteboard(runtime, nodeIndex);
SetupTabletResolver(runtime, nodeIndex);
SetupResourceBroker(runtime, nodeIndex, {});
SetupSharedPageCache(runtime, nodeIndex, NFake::TCaches{
.Shared = 1,
});
NSharedCache::TSharedCacheConfig sharedCacheConfig;
sharedCacheConfig.SetMemoryLimit(0);
SetupSharedPageCache(runtime, nodeIndex, sharedCacheConfig);
SetupSchemeCache(runtime, nodeIndex, DOMAIN_NAME);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_executor_bootlogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ NBoot::TSpawned TExecutorBootLogic::LoadPages(NBoot::IStep *step, TAutoPtr<NPage
Y_ABORT_UNLESS(success, "IPageCollection queued twice for loading");

Ops->Send(
MakeSharedPageCacheId(),
NSharedCache::MakeSharedPageCacheId(),
new NSharedCache::TEvRequest(
NBlockIO::EPriority::Fast,
req,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_part_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ namespace NTable {
{
if (cookie == 0 && NeedPages.erase(loaded.PageId)) {
auto type = Cache->GetPageType(loaded.PageId);
SavedPages[loaded.PageId] = TPinnedPageRef(loaded.Page).GetData();
SavedPages[loaded.PageId] = NSharedCache::TPinnedPageRef(loaded.Page).GetData();
if (type != EPage::FlatIndex) {
// hack: saving flat index to private cache will break sticky logic
// keep it in shared cache only for now
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tablet_flat/flat_sausagecache.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
namespace NKikimr {
namespace NTabletFlatExecutor {

using namespace NSharedCache;

struct TPrivatePageCachePinPad : public TAtomicRefCount<TPrivatePageCachePinPad> {
// no internal state
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_cache_clock_pro.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <library/cpp/monlib/counters/counters.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>

namespace NKikimr::NCache {
namespace NKikimr::NSharedCache {

// TODO: remove template args and make some page base class

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_cache_clock_pro_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <util/random/mersenne.h>
#include "shared_cache_clock_pro.h"

namespace NKikimr::NCache {
namespace NKikimr::NSharedCache {

namespace {

Expand Down
5 changes: 1 addition & 4 deletions ydb/core/tablet_flat/shared_cache_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>

namespace NKikimr {
namespace NSharedCache {

namespace NKikimr::NSharedCache {
using EPriority = NTabletFlatExecutor::NBlockIO::EPriority;

enum EEv {
Expand Down Expand Up @@ -129,7 +127,6 @@ namespace NSharedCache {
THashMap<TLogoBlobID, TActions> Actions;
};
}
}

template<> inline
void Out<NKikimr::NTabletFlatExecutor::NBlockIO::EPriority>(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_cache_s3fifo.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <library/cpp/monlib/counters/counters.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>

namespace NKikimr::NCache {
namespace NKikimr::NSharedCache {

// TODO: remove template args and make some page base class

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_cache_s3fifo_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <library/cpp/resource/resource.h>
#include "shared_cache_s3fifo.h"

namespace NKikimr::NCache {
namespace NKikimr::NSharedCache {

namespace {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_cache_switchable.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <library/cpp/monlib/counters/counters.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>

namespace NKikimr::NCache {
namespace NKikimr::NSharedCache {

template <typename TPage, typename TPageTraits>
class TSwitchableCache : public ICacheCache<TPage> {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_cache_switchable_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <ydb/core/tablet_flat/shared_cache_switchable.h>
#include <ydb/core/util/cache_cache_iface.h>

namespace NKikimr::NCache {
namespace NKikimr::NSharedCache {

namespace {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <atomic>

namespace NKikimr {
namespace NKikimr::NSharedCache {

class TSharedPageGCList;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/shared_handle_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr {
namespace NKikimr::NSharedCache {

Y_UNIT_TEST_SUITE(TPageHandleTest) {
class TTestHandle : public TSharedPageHandle {
Expand Down
Loading

0 comments on commit fdd7168

Please sign in to comment.