Skip to content

Commit a11a66f

Browse files
authored
Fixed the deletion of blobs by retention in the supportive partition of the topic (#19916)
2 parents 10071e0 + 67a29ca commit a11a66f

File tree

4 files changed

+65
-5
lines changed

4 files changed

+65
-5
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,10 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
403403
}
404404

405405
bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
406+
if (IsSupportive()) {
407+
return false;
408+
}
409+
406410
bool haveChanges = CleanUpBlobs(request, ctx);
407411

408412
PQ_LOG_T("Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff");

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ class TFixture : public NUnitTest::TBaseFixture {
8484
void CreateTopic(const TString& path = TString{TEST_TOPIC},
8585
const TString& consumer = TEST_CONSUMER,
8686
size_t partitionCount = 1,
87-
std::optional<size_t> maxPartitionCount = std::nullopt);
87+
std::optional<size_t> maxPartitionCount = std::nullopt,
88+
const TDuration retention = TDuration::Hours(1),
89+
bool important = false);
8890
TTopicDescription DescribeTopic(const TString& path);
8991

9092
void AddConsumer(const TString& topicPath, const TVector<TString>& consumers);
@@ -474,10 +476,12 @@ void TFixture::WriteMessages(const TVector<TString>& messages,
474476
void TFixture::CreateTopic(const TString& path,
475477
const TString& consumer,
476478
size_t partitionCount,
477-
std::optional<size_t> maxPartitionCount)
479+
std::optional<size_t> maxPartitionCount,
480+
const TDuration retention,
481+
bool important)
478482

479483
{
480-
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
484+
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount, retention, important);
481485
}
482486

483487
void TFixture::AddConsumer(const TString& topicPath,
@@ -3286,6 +3290,56 @@ Y_UNIT_TEST_F(The_Transaction_Starts_On_One_Version_And_Ends_On_The_Other, TFixt
32863290
RestartPQTablet("topic_A", 1);
32873291
}
32883292

3293+
Y_UNIT_TEST_F(TestRetentionOnLongTxAndBigMessages, TFixture)
3294+
{
3295+
3296+
auto bigMessage = []() {
3297+
TStringBuilder sb;
3298+
sb.reserve(10_MB);
3299+
for (size_t i = 0; i < sb.capacity(); ++i) {
3300+
sb << RandomNumber<char>();
3301+
}
3302+
3303+
return sb;
3304+
};
3305+
3306+
TString msg = bigMessage();
3307+
3308+
CreateTopic("topic_A", TEST_CONSUMER, 1, 1, TDuration::Seconds(1), true);
3309+
3310+
auto session = CreateTableSession();
3311+
auto tx0 = BeginTx(session);
3312+
auto tx1 = BeginTx(session);
3313+
3314+
WriteToTopic("topic_A", "grp-0", msg, &tx0);
3315+
WriteToTopic("topic_A", "grp-1", msg, &tx1);
3316+
3317+
Sleep(TDuration::Seconds(3));
3318+
3319+
WriteToTopic("topic_A", "grp-0", "short-message", &tx0);
3320+
WriteToTopic("topic_A", "grp-1", "short-message", &tx1);
3321+
3322+
WriteToTopic("topic_A", "grp-0", msg, &tx0);
3323+
WriteToTopic("topic_A", "grp-1", msg, &tx1);
3324+
3325+
Sleep(TDuration::Seconds(3));
3326+
3327+
WriteToTopic("topic_A", "grp-0", msg, &tx0);
3328+
WriteToTopic("topic_A", "grp-1", msg, &tx1);
3329+
3330+
Sleep(TDuration::Seconds(3));
3331+
3332+
CommitTx(tx0);
3333+
CommitTx(tx1);
3334+
3335+
//RestartPQTablet("topic_A", 0);
3336+
3337+
auto read = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
3338+
UNIT_ASSERT(read.size() > 0);
3339+
UNIT_ASSERT_EQUAL(msg, read[0]);
3340+
}
3341+
3342+
32893343
}
32903344

32913345
}

ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ void TTopicSdkTestSetup::CreateTopicWithAutoscale(const std::string& path, const
2626
CreateTopic(path, consumer, partitionCount, maxPartitionCount);
2727
}
2828

29-
void TTopicSdkTestSetup::CreateTopic(const std::string& path, const std::string& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount)
29+
void TTopicSdkTestSetup::CreateTopic(const std::string& path, const std::string& consumer, size_t partitionCount, std::optional<size_t> maxPartitionCount, const TDuration retention, bool important)
3030
{
3131
TTopicClient client(MakeDriver());
3232

3333
TCreateTopicSettings topics;
3434
topics
35+
.RetentionPeriod(retention)
3536
.BeginConfigurePartitioningSettings()
3637
.MinActivePartitions(partitionCount)
3738
.MaxActivePartitions(maxPartitionCount.value_or(partitionCount));
@@ -44,6 +45,7 @@ void TTopicSdkTestSetup::CreateTopic(const std::string& path, const std::string&
4445
}
4546

4647
TConsumerSettings<TCreateTopicSettings> consumers(topics, consumer);
48+
consumers.Important(important);
4749
topics.AppendConsumers(consumers);
4850

4951
auto status = client.CreateTopic(path, topics).GetValueSync();

ydb/public/sdk/cpp/src/client/topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class TTopicSdkTestSetup {
1717
TTopicSdkTestSetup(const TString& testCaseName, const NKikimr::Tests::TServerSettings& settings = MakeServerSettings(), bool createTopic = true);
1818

1919
void CreateTopic(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER, size_t partitionCount = 1,
20-
std::optional<size_t> maxPartitionCount = std::nullopt);
20+
std::optional<size_t> maxPartitionCount = std::nullopt, const TDuration retention = TDuration::Hours(1), bool important = false);
2121
void CreateTopicWithAutoscale(const std::string& path = TEST_TOPIC, const std::string& consumer = TEST_CONSUMER, size_t partitionCount = 1,
2222
size_t maxPartitionCount = 100);
2323

0 commit comments

Comments
 (0)