Skip to content

Commit

Permalink
Fix issue ydb-platform#9461 with altering CDC streams
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar committed Oct 29, 2024
1 parent a24b8ea commit 2386618
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
64 changes: 62 additions & 2 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3127,7 +3127,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1;
Expand Down Expand Up @@ -4737,6 +4737,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CheckDirEntry(kikimr, entriesToCheck);
}
}

Y_UNIT_TEST(CreateOrDropTopicOverTable) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down Expand Up @@ -4808,6 +4809,65 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(AlterCdcTopic) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});
TKikimrRunner kikimr{serverSettings};
auto tableClient = kikimr.GetTableClient();

{
auto tcSession = tableClient.CreateSession().GetValueSync().GetSession();
UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
CREATE TABLE `/Root/TmpTable` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)").GetValueSync().IsSuccess());

UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"(
ALTER TABLE `/Root/TmpTable` ADD CHANGEFEED `feed` WITH (
MODE = 'KEYS_ONLY', FORMAT = 'JSON'
);
)").GetValueSync().IsSuccess());
tcSession.Close();
}

auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(),
NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin"));

auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin"));
auto session = client.GetSession().GetValueSync().GetSession();
{

const auto query = Q_(R"(
--!syntax_v1
ALTER TOPIC `/Root/TmpTable/feed` ADD CONSUMER consumer21;
)");

RunQuery(query, session);
auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync();
const auto& consumers = desc.GetTopicDescription().GetConsumers();
UNIT_ASSERT_VALUES_EQUAL(consumers.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(consumers[0].GetConsumerName(), "consumer21");

}
{
const auto query = Q_(R"(
--!syntax_v1
ALTER TOPIC `/Root/TmpTable/feed` SET (min_active_partitions = 10);
)");
RunQuery(query, session, false);
auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1);
}

}

Y_UNIT_TEST(TableSink_OlapRWQueries) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
Expand Down Expand Up @@ -4914,7 +4974,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto it = client.StreamExecuteQuery(R"sql(
SELECT r.Col3
FROM `/Root/DataShard` AS r
JOIN `/Root/ColumnShard` AS c
JOIN `/Root/ColumnShard` AS c
ON r.Col1 = c.Col1;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
Expand Down
10 changes: 10 additions & 0 deletions ydb/services/lib/actors/pq_schema_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ namespace NKikimr::NGRpcProxy::V1 {
return path;
}

const TMaybe<TString>& GetCdcStreamName() const {
return CdcStreamName;
}

void SendDescribeProposeRequest(bool showPrivate = false) {
return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate);
}
Expand Down Expand Up @@ -603,6 +607,10 @@ namespace NKikimr::NGRpcProxy::V1 {
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1);
PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;

if (response.Self) {
CdcStreamName = response.Self->Info.GetName();
}
SendDescribeProposeRequest(true);
return true;
}
Expand All @@ -620,6 +628,8 @@ namespace NKikimr::NGRpcProxy::V1 {
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self;
TMaybe<TString> PrivateTopicName;
TMaybe<TString> CdcStreamName;

};

}
15 changes: 13 additions & 2 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,18 @@ void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache
}
TUpdateSchemeBase::HandleCacheNavigateResponse(ev);
auto& schemeTx = Response->Response.ModifyScheme;
FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name);
std::pair <TString, TString> pathPair;
try {
pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath());
} catch (const std::exception &ex) {
Response->Response.Issues.AddIssue(NYql::ExceptionToIssue(ex));
RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
return;
}

const auto& workingDir = pathPair.first;
const auto& name = pathPair.second;
FillModifyScheme(schemeTx, ActorContext(), workingDir, name);
}

void TAlterTopicActorInternal::ModifyPersqueueConfig(
Expand All @@ -1601,7 +1612,7 @@ void TAlterTopicActorInternal::ModifyPersqueueConfig(
TString error;
Y_UNUSED(selfInfo);

auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false);
auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, GetCdcStreamName().Defined());
if (!error.empty()) {
Response->Response.Issues.AddIssue(error);
}
Expand Down

0 comments on commit 2386618

Please sign in to comment.