Skip to content

Commit a439492

Browse files
committed
1
1 parent f138847 commit a439492

File tree

10 files changed

+265
-16
lines changed

10 files changed

+265
-16
lines changed

be/src/olap/page_cache.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ class StoragePageCache {
177177
// Since we are using std::shared_ptr, so lify cycle of the page is not managed by
178178
// this cache alone.
179179
// User could store a weak_ptr to the page, and lock it when needed.
180-
// See Segment::_get_segment_footer for example.
180+
// See Segment::get_segment_footer for example.
181181
template <typename T>
182182
void insert(const CacheKey& key, T data, size_t size, PageCacheHandle* handle,
183183
segment_v2::PageTypePB page_type, bool in_memory = false);

be/src/olap/rowset/segment_v2/segment.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void Segment::update_metadata_size() {
179179

180180
Status Segment::_open(OlapReaderStatistics* stats) {
181181
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
182-
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
182+
RETURN_IF_ERROR(get_segment_footer(footer_pb_shared, stats));
183183

184184
_pk_index_meta.reset(
185185
footer_pb_shared->has_primary_key_index_meta()
@@ -608,7 +608,7 @@ Status Segment::_create_column_readers_once(OlapReaderStatistics* stats) {
608608
SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
609609
return _create_column_readers_once_call.call([&] {
610610
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
611-
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
611+
RETURN_IF_ERROR(get_segment_footer(footer_pb_shared, stats));
612612
return _create_column_readers(*footer_pb_shared);
613613
});
614614
}
@@ -1187,8 +1187,8 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
11871187
return Status::OK();
11881188
}
11891189

1190-
Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
1191-
OlapReaderStatistics* stats) {
1190+
Status Segment::get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
1191+
OlapReaderStatistics* stats) {
11921192
std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
11931193
if (footer_pb_shared != nullptr) {
11941194
footer_pb = footer_pb_shared;

be/src/olap/rowset/segment_v2/segment.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
220220

221221
const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; }
222222

223+
Status get_segment_footer(std::shared_ptr<SegmentFooterPB>&, OlapReaderStatistics* stats);
224+
223225
private:
224226
DISALLOW_COPY_AND_ASSIGN(Segment);
225227
Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
@@ -248,8 +250,6 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
248250

249251
Status _create_column_readers_once(OlapReaderStatistics* stats);
250252

251-
Status _get_segment_footer(std::shared_ptr<SegmentFooterPB>&, OlapReaderStatistics* stats);
252-
253253
StoragePageCache::CacheKey get_segment_footer_cache_key() const;
254254

255255
friend class SegmentIterator;

be/src/olap/rowset/segment_v2/segment_writer.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,9 +1030,24 @@ Status SegmentWriter::finalize_columns_data() {
10301030
}
10311031
_num_rows_written = 0;
10321032

1033-
for (auto& column_writer : _column_writers) {
1034-
RETURN_IF_ERROR(column_writer->finish());
1033+
#ifdef BE_TEST
1034+
int64_t total_data_size = 0;
1035+
for (const auto& _column_writer : _column_writers) {
1036+
total_data_size += _column_writer->estimate_buffer_size();
1037+
}
1038+
auto origin_data_footprint = _footer.data_footprint();
1039+
_footer.set_data_footprint(origin_data_footprint + total_data_size);
1040+
#endif
1041+
1042+
for (size_t id = 0; id < _column_writers.size(); ++id) {
1043+
// record the data size of each column before page builder reset in finish()
1044+
auto cid = _column_ids[id];
1045+
// estimate column data size for flush memtable, may be inaccurate at low cardinality
1046+
_footer.mutable_columns(cid)->set_estimate_total_data_size(
1047+
_column_writers[id]->estimate_buffer_size());
1048+
RETURN_IF_ERROR(_column_writers[id]->finish());
10351049
}
1050+
10361051
RETURN_IF_ERROR(_write_data());
10371052

10381053
return Status::OK();

be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,9 @@ Status VerticalSegmentWriter::write_batch() {
11721172
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
11731173
}
11741174
}
1175-
1175+
#ifdef BE_TEST
1176+
int64_t total_data_size = 0;
1177+
#endif
11761178
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
11771179
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
11781180
// the key is cluster key column unique id
@@ -1205,6 +1207,13 @@ Status VerticalSegmentWriter::write_batch() {
12051207
data.num_rows));
12061208
_olap_data_convertor->clear_source_content();
12071209
}
1210+
#ifdef BE_TEST
1211+
total_data_size += _column_writers[cid]->estimate_buffer_size();
1212+
#endif
1213+
// estimate column data size for flush memtable, may be inaccurate at low cardinality
1214+
_footer.mutable_columns(cid)->set_estimate_total_data_size(
1215+
_column_writers[cid]->estimate_buffer_size());
1216+
12081217
if (_data_dir != nullptr &&
12091218
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
12101219
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
@@ -1213,7 +1222,10 @@ Status VerticalSegmentWriter::write_batch() {
12131222
RETURN_IF_ERROR(_column_writers[cid]->finish());
12141223
RETURN_IF_ERROR(_column_writers[cid]->write_data());
12151224
}
1216-
1225+
#ifdef BE_TEST
1226+
auto origin_data_footprint = _footer.data_footprint();
1227+
_footer.set_data_footprint(origin_data_footprint + total_data_size);
1228+
#endif
12171229
for (auto& data : _batched_blocks) {
12181230
_olap_data_convertor->set_source_content(data.block, data.row_pos, data.num_rows);
12191231
RETURN_IF_ERROR(_generate_key_index(data, key_columns, seq_column, cid_to_column));

be/test/olap/date_bloom_filter_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ TEST_F(DateBloomFilterTest, query_index_test) {
152152
segment_v2::SegmentSharedPtr segment;
153153
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, nullptr, &segment).ok());
154154
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
155-
auto st = segment->_get_segment_footer(footer_pb_shared, nullptr);
155+
auto st = segment->get_segment_footer(footer_pb_shared, nullptr);
156156
EXPECT_TRUE(st.ok());
157157
st = segment->_create_column_readers(*footer_pb_shared);
158158
EXPECT_TRUE(st.ok());
@@ -232,7 +232,7 @@ TEST_F(DateBloomFilterTest, in_list_predicate_test) {
232232
segment_v2::SegmentSharedPtr segment;
233233
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, nullptr, &segment).ok());
234234
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
235-
auto st = segment->_get_segment_footer(footer_pb_shared, nullptr);
235+
auto st = segment->get_segment_footer(footer_pb_shared, nullptr);
236236
EXPECT_TRUE(st.ok());
237237
st = segment->_create_column_readers(*(footer_pb_shared));
238238
EXPECT_TRUE(st.ok());

be/test/olap/delta_writer_test.cpp

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <gen_cpp/AgentService_types.h>
2121
#include <gtest/gtest-message.h>
2222
#include <gtest/gtest-test-part.h>
23+
#include <gtest/gtest.h>
2324
#include <stdlib.h>
2425
#include <unistd.h>
2526

@@ -39,9 +40,11 @@
3940
#include "io/fs/local_file_system.h"
4041
#include "olap/data_dir.h"
4142
#include "olap/iterators.h"
43+
#include "olap/olap_common.h"
4244
#include "olap/olap_define.h"
4345
#include "olap/options.h"
4446
#include "olap/rowset/beta_rowset.h"
47+
#include "olap/rowset/rowset_fwd.h"
4548
#include "olap/rowset/segment_v2/segment.h"
4649
#include "olap/rowset_builder.h"
4750
#include "olap/schema.h"
@@ -55,7 +58,10 @@
5558
#include "runtime/descriptor_helper.h"
5659
#include "runtime/descriptors.h"
5760
#include "runtime/exec_env.h"
61+
#include "testutil/desc_tbl_builder.h"
5862
#include "vec/columns/column.h"
63+
#include "vec/columns/column_map.h"
64+
#include "vec/columns/column_struct.h"
5965
#include "vec/core/block.h"
6066
#include "vec/core/column_with_type_and_name.h"
6167
#include "vec/runtime/vdatetime_value.h"
@@ -1047,4 +1053,195 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
10471053
res = engine_ref->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
10481054
ASSERT_TRUE(res.ok());
10491055
}
1056+
1057+
TEST_F(TestDeltaWriter, write_sigle_block_statistics_segment_meta_pb) {
1058+
std::unique_ptr<RuntimeProfile> profile;
1059+
profile = std::make_unique<RuntimeProfile>("CreateTablet");
1060+
TCreateTabletReq request;
1061+
create_tablet_request(10010, 270068330, &request);
1062+
Status res = engine_ref->create_tablet(request, profile.get());
1063+
EXPECT_EQ(Status::OK(), res);
1064+
1065+
TDescriptorTable tdesc_tbl = create_descriptor_tablet();
1066+
ObjectPool obj_pool;
1067+
DescriptorTbl* desc_tbl = nullptr;
1068+
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
1069+
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
1070+
auto param = std::make_shared<OlapTableSchemaParam>();
1071+
1072+
PUniqueId load_id;
1073+
load_id.set_hi(0);
1074+
load_id.set_lo(0);
1075+
WriteRequest write_req;
1076+
write_req.tablet_id = 10010;
1077+
write_req.schema_hash = 270068330;
1078+
write_req.txn_id = 20002;
1079+
write_req.partition_id = 30003;
1080+
write_req.load_id = load_id;
1081+
write_req.tuple_desc = tuple_desc;
1082+
write_req.slots = &(tuple_desc->slots());
1083+
write_req.is_high_priority = true;
1084+
write_req.table_schema_param = param;
1085+
1086+
// test vec delta writer
1087+
profile = std::make_unique<RuntimeProfile>("LoadChannels");
1088+
auto delta_writer =
1089+
std::make_unique<DeltaWriter>(*engine_ref, write_req, profile.get(), TUniqueId {});
1090+
EXPECT_NE(delta_writer, nullptr);
1091+
1092+
vectorized::Block block;
1093+
for (const auto& slot_desc : tuple_desc->slots()) {
1094+
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
1095+
slot_desc->type(), slot_desc->col_name()));
1096+
}
1097+
1098+
auto columns = block.mutate_columns();
1099+
{
1100+
int8_t k1 = -127;
1101+
columns[0]->insert_data((const char*)&k1, sizeof(k1));
1102+
1103+
int16_t k2 = -32767;
1104+
columns[1]->insert_data((const char*)&k2, sizeof(k2));
1105+
1106+
int32_t k3 = -2147483647;
1107+
columns[2]->insert_data((const char*)&k3, sizeof(k3));
1108+
1109+
int64_t k4 = -9223372036854775807L;
1110+
columns[3]->insert_data((const char*)&k4, sizeof(k4));
1111+
1112+
int128_t k5 = -90000;
1113+
columns[4]->insert_data((const char*)&k5, sizeof(k5));
1114+
1115+
VecDateTimeValue k6;
1116+
k6.from_date_str("2048-11-10", 10);
1117+
auto k6_int = k6.to_int64();
1118+
columns[5]->insert_data((const char*)&k6_int, sizeof(k6_int));
1119+
1120+
VecDateTimeValue k7;
1121+
k7.from_date_str("2636-08-16 19:39:43", 19);
1122+
auto k7_int = k7.to_int64();
1123+
columns[6]->insert_data((const char*)&k7_int, sizeof(k7_int));
1124+
1125+
columns[7]->insert_data("abcd", 4);
1126+
columns[8]->insert_data("abcde", 5);
1127+
1128+
DecimalV2Value decimal_value;
1129+
decimal_value.assign_from_double(1.1);
1130+
columns[9]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
1131+
1132+
DateV2Value<DateV2ValueType> date_v2;
1133+
date_v2.from_date_str("2048-11-10", 10);
1134+
auto date_v2_int = date_v2.to_date_int_val();
1135+
columns[10]->insert_data((const char*)&date_v2_int, sizeof(date_v2_int));
1136+
1137+
int8_t v1 = -127;
1138+
columns[11]->insert_data((const char*)&v1, sizeof(v1));
1139+
1140+
int16_t v2 = -32767;
1141+
columns[12]->insert_data((const char*)&v2, sizeof(v2));
1142+
1143+
int32_t v3 = -2147483647;
1144+
columns[13]->insert_data((const char*)&v3, sizeof(v3));
1145+
1146+
int64_t v4 = -9223372036854775807L;
1147+
columns[14]->insert_data((const char*)&v4, sizeof(v4));
1148+
1149+
int128_t v5 = -90000;
1150+
columns[15]->insert_data((const char*)&v5, sizeof(v5));
1151+
1152+
VecDateTimeValue v6;
1153+
v6.from_date_str("2048-11-10", 10);
1154+
auto v6_int = v6.to_int64();
1155+
columns[16]->insert_data((const char*)&v6_int, sizeof(v6_int));
1156+
1157+
VecDateTimeValue v7;
1158+
v7.from_date_str("2636-08-16 19:39:43", 19);
1159+
auto v7_int = v7.to_int64();
1160+
columns[17]->insert_data((const char*)&v7_int, sizeof(v7_int));
1161+
1162+
columns[18]->insert_data("abcd", 4);
1163+
columns[19]->insert_data("abcde", 5);
1164+
1165+
decimal_value.assign_from_double(1.1);
1166+
columns[20]->insert_data((const char*)&decimal_value, sizeof(decimal_value));
1167+
1168+
date_v2.from_date_str("2048-11-10", 10);
1169+
date_v2_int = date_v2.to_date_int_val();
1170+
columns[21]->insert_data((const char*)&date_v2_int, sizeof(date_v2_int));
1171+
1172+
res = delta_writer->write(&block, {0});
1173+
ASSERT_TRUE(res.ok());
1174+
}
1175+
1176+
res = delta_writer->close();
1177+
EXPECT_EQ(Status::OK(), res);
1178+
res = delta_writer->build_rowset();
1179+
EXPECT_EQ(Status::OK(), res);
1180+
res = delta_writer->commit_txn(PSlaveTabletNodes());
1181+
EXPECT_EQ(Status::OK(), res);
1182+
1183+
// publish version success
1184+
TabletSharedPtr tablet = engine_ref->tablet_manager()->get_tablet(write_req.tablet_id);
1185+
std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl;
1186+
OlapMeta* meta = tablet->data_dir()->get_meta();
1187+
Version version;
1188+
version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
1189+
version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
1190+
std::cout << "start to add rowset version:" << version.first << "-" << version.second
1191+
<< std::endl;
1192+
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
1193+
engine_ref->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id,
1194+
&tablet_related_rs);
1195+
for (auto& tablet_rs : tablet_related_rs) {
1196+
std::cout << "start to publish txn" << std::endl;
1197+
RowsetSharedPtr rowset = tablet_rs.second;
1198+
TabletPublishStatistics stats;
1199+
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
1200+
res = engine_ref->txn_manager()->publish_txn(
1201+
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
1202+
tablet_rs.first.tablet_uid, version, &stats, extend_tablet_txn_info_lifetime);
1203+
ASSERT_TRUE(res.ok());
1204+
std::cout << "start to add inc rowset:" << rowset->rowset_id()
1205+
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first
1206+
<< "-" << rowset->version().second << std::endl;
1207+
res = tablet->add_inc_rowset(rowset);
1208+
ASSERT_TRUE(res.ok());
1209+
}
1210+
ASSERT_EQ(1, tablet->num_rows());
1211+
1212+
std::vector<RowsetSharedPtr> all_rowsets;
1213+
res = tablet->capture_consistent_rowsets_unlocked(
1214+
Version(0, tablet->get_rowset_with_max_version()->end_version()), &all_rowsets);
1215+
EXPECT_EQ(Status::OK(), res);
1216+
1217+
auto total_column_data_size = 0;
1218+
auto total_segoemt_data_footprint = 0;
1219+
1220+
for (auto& rs : all_rowsets) {
1221+
std::vector<segment_v2::SegmentSharedPtr> segments;
1222+
Status res = ((BetaRowset*)rs.get())->load_segments(&segments);
1223+
1224+
for (const auto& segment : segments) {
1225+
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
1226+
1227+
res = segment->get_segment_footer(footer_pb_shared, nullptr);
1228+
EXPECT_EQ(Status::OK(), res);
1229+
1230+
auto seg_footprint = footer_pb_shared->data_footprint();
1231+
1232+
for (const auto& column : footer_pb_shared->columns()) {
1233+
auto column_data_size = column.estimate_total_data_size();
1234+
total_column_data_size += column_data_size;
1235+
}
1236+
1237+
total_segoemt_data_footprint += seg_footprint;
1238+
}
1239+
}
1240+
1241+
EXPECT_EQ(total_column_data_size, total_segoemt_data_footprint);
1242+
1243+
res = engine_ref->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
1244+
EXPECT_EQ(Status::OK(), res);
1245+
}
1246+
10501247
} // namespace doris

be/test/olap/segcompaction_test.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,29 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
395395
}
396396
EXPECT_EQ(total_num_rows, num_rows_read);
397397
}
398+
auto total_column_data_size = 0;
399+
auto total_segment_data_footprint = 0;
400+
401+
std::vector<segment_v2::SegmentSharedPtr> segments;
402+
Status res = ((BetaRowset*)rowset.get())->load_segments(&segments);
403+
ASSERT_TRUE(res.ok());
404+
405+
for (const auto& segment : segments) {
406+
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
407+
408+
res = segment->get_segment_footer(footer_pb_shared, nullptr);
409+
EXPECT_EQ(Status::OK(), res);
410+
411+
auto seg_footprint = footer_pb_shared->data_footprint();
412+
413+
for (const auto& column : footer_pb_shared->columns()) {
414+
auto column_data_size = column.estimate_total_data_size();
415+
total_column_data_size += column_data_size;
416+
}
417+
418+
total_segment_data_footprint += seg_footprint;
419+
}
420+
EXPECT_EQ(total_segment_data_footprint, total_column_data_size);
398421
}
399422
}
400423

0 commit comments

Comments
 (0)