Skip to content

[chore](info) Record data_footprint and total_data_size in SegmentFooterPB and ColumnMetaPB #51001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class StoragePageCache {
// Since we are using std::shared_ptr, so lify cycle of the page is not managed by
// this cache alone.
// User could store a weak_ptr to the page, and lock it when needed.
// See Segment::_get_segment_footer for example.
// See Segment::get_segment_footer for example.
template <typename T>
void insert(const CacheKey& key, T data, size_t size, PageCacheHandle* handle,
segment_v2::PageTypePB page_type, bool in_memory = false);
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void Segment::update_metadata_size() {

Status Segment::_open(OlapReaderStatistics* stats) {
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
RETURN_IF_ERROR(get_segment_footer(footer_pb_shared, stats));

_pk_index_meta.reset(
footer_pb_shared->has_primary_key_index_meta()
Expand Down Expand Up @@ -608,7 +608,7 @@ Status Segment::_create_column_readers_once(OlapReaderStatistics* stats) {
SCOPED_RAW_TIMER(&stats->segment_create_column_readers_timer_ns);
return _create_column_readers_once_call.call([&] {
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
RETURN_IF_ERROR(_get_segment_footer(footer_pb_shared, stats));
RETURN_IF_ERROR(get_segment_footer(footer_pb_shared, stats));
return _create_column_readers(*footer_pb_shared);
});
}
Expand Down Expand Up @@ -1187,8 +1187,8 @@ Status Segment::seek_and_read_by_rowid(const TabletSchema& schema, SlotDescripto
return Status::OK();
}

Status Segment::_get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
OlapReaderStatistics* stats) {
Status Segment::get_segment_footer(std::shared_ptr<SegmentFooterPB>& footer_pb,
OlapReaderStatistics* stats) {
std::shared_ptr<SegmentFooterPB> footer_pb_shared = _footer_pb.lock();
if (footer_pb_shared != nullptr) {
footer_pb = footer_pb_shared;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

const TabletSchemaSPtr& tablet_schema() { return _tablet_schema; }

Status get_segment_footer(std::shared_ptr<SegmentFooterPB>&, OlapReaderStatistics* stats);

private:
DISALLOW_COPY_AND_ASSIGN(Segment);
Segment(uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
Expand Down Expand Up @@ -248,8 +250,6 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd

Status _create_column_readers_once(OlapReaderStatistics* stats);

Status _get_segment_footer(std::shared_ptr<SegmentFooterPB>&, OlapReaderStatistics* stats);

StoragePageCache::CacheKey get_segment_footer_cache_key() const;

friend class SegmentIterator;
Expand Down
19 changes: 17 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1030,9 +1030,24 @@ Status SegmentWriter::finalize_columns_data() {
}
_num_rows_written = 0;

for (auto& column_writer : _column_writers) {
RETURN_IF_ERROR(column_writer->finish());
#ifdef BE_TEST
int64_t total_data_size = 0;
for (const auto& _column_writer : _column_writers) {
total_data_size += _column_writer->estimate_buffer_size();
}
auto origin_data_footprint = _footer.data_footprint();
_footer.set_data_footprint(origin_data_footprint + total_data_size);
#endif

for (size_t id = 0; id < _column_writers.size(); ++id) {
// record the data size of each column before page builder reset in finish()
auto cid = _column_ids[id];
// estimate column data size for flush memtable, may be inaccurate at low cardinality
_footer.mutable_columns(cid)->set_estimate_total_data_size(
_column_writers[id]->estimate_buffer_size());
RETURN_IF_ERROR(_column_writers[id]->finish());
}

RETURN_IF_ERROR(_write_data());

return Status::OK();
Expand Down
16 changes: 14 additions & 2 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,9 @@ Status VerticalSegmentWriter::write_batch() {
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
}
}

#ifdef BE_TEST
int64_t total_data_size = 0;
#endif
std::vector<vectorized::IOlapColumnDataAccessor*> key_columns;
vectorized::IOlapColumnDataAccessor* seq_column = nullptr;
// the key is cluster key column unique id
Expand Down Expand Up @@ -1205,6 +1207,13 @@ Status VerticalSegmentWriter::write_batch() {
data.num_rows));
_olap_data_convertor->clear_source_content();
}
#ifdef BE_TEST
total_data_size += _column_writers[cid]->estimate_buffer_size();
#endif
// estimate column data size for flush memtable, may be inaccurate at low cardinality
_footer.mutable_columns(cid)->set_estimate_total_data_size(
_column_writers[cid]->estimate_buffer_size());

if (_data_dir != nullptr &&
_data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
Expand All @@ -1213,7 +1222,10 @@ Status VerticalSegmentWriter::write_batch() {
RETURN_IF_ERROR(_column_writers[cid]->finish());
RETURN_IF_ERROR(_column_writers[cid]->write_data());
}

#ifdef BE_TEST
auto origin_data_footprint = _footer.data_footprint();
_footer.set_data_footprint(origin_data_footprint + total_data_size);
#endif
for (auto& data : _batched_blocks) {
_olap_data_convertor->set_source_content(data.block, data.row_pos, data.num_rows);
RETURN_IF_ERROR(_generate_key_index(data, key_columns, seq_column, cid_to_column));
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/date_bloom_filter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ TEST_F(DateBloomFilterTest, query_index_test) {
segment_v2::SegmentSharedPtr segment;
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, nullptr, &segment).ok());
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
auto st = segment->_get_segment_footer(footer_pb_shared, nullptr);
auto st = segment->get_segment_footer(footer_pb_shared, nullptr);
EXPECT_TRUE(st.ok());
st = segment->_create_column_readers(*footer_pb_shared);
EXPECT_TRUE(st.ok());
Expand Down Expand Up @@ -232,7 +232,7 @@ TEST_F(DateBloomFilterTest, in_list_predicate_test) {
segment_v2::SegmentSharedPtr segment;
EXPECT_TRUE(((BetaRowset*)rowset.get())->load_segment(0, nullptr, &segment).ok());
std::shared_ptr<SegmentFooterPB> footer_pb_shared;
auto st = segment->_get_segment_footer(footer_pb_shared, nullptr);
auto st = segment->get_segment_footer(footer_pb_shared, nullptr);
EXPECT_TRUE(st.ok());
st = segment->_create_column_readers(*(footer_pb_shared));
EXPECT_TRUE(st.ok());
Expand Down
197 changes: 197 additions & 0 deletions be/test/olap/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/AgentService_types.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <stdlib.h>
#include <unistd.h>

Expand All @@ -39,9 +40,11 @@
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/rowset_builder.h"
#include "olap/schema.h"
Expand All @@ -55,7 +58,10 @@
#include "runtime/descriptor_helper.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "testutil/desc_tbl_builder.h"
#include "vec/columns/column.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_struct.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/runtime/vdatetime_value.h"
Expand Down Expand Up @@ -1047,4 +1053,195 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
res = engine_ref->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
ASSERT_TRUE(res.ok());
}

TEST_F(TestDeltaWriter, write_sigle_block_statistics_segment_meta_pb) {
std::unique_ptr<RuntimeProfile> profile;
profile = std::make_unique<RuntimeProfile>("CreateTablet");
TCreateTabletReq request;
create_tablet_request(10010, 270068330, &request);
Status res = engine_ref->create_tablet(request, profile.get());
EXPECT_EQ(Status::OK(), res);

TDescriptorTable tdesc_tbl = create_descriptor_tablet();
ObjectPool obj_pool;
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
WriteRequest write_req;
write_req.tablet_id = 10010;
write_req.schema_hash = 270068330;
write_req.txn_id = 20002;
write_req.partition_id = 30003;
write_req.load_id = load_id;
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = true;
write_req.table_schema_param = param;

// test vec delta writer
profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
std::make_unique<DeltaWriter>(*engine_ref, write_req, profile.get(), TUniqueId {});
EXPECT_NE(delta_writer, nullptr);

vectorized::Block block;
for (const auto& slot_desc : tuple_desc->slots()) {
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->type(), slot_desc->col_name()));
}

auto columns = block.mutate_columns();
{
int8_t k1 = -127;
columns[0]->insert_data((const char*)&k1, sizeof(k1));

int16_t k2 = -32767;
columns[1]->insert_data((const char*)&k2, sizeof(k2));

int32_t k3 = -2147483647;
columns[2]->insert_data((const char*)&k3, sizeof(k3));

int64_t k4 = -9223372036854775807L;
columns[3]->insert_data((const char*)&k4, sizeof(k4));

int128_t k5 = -90000;
columns[4]->insert_data((const char*)&k5, sizeof(k5));

VecDateTimeValue k6;
k6.from_date_str("2048-11-10", 10);
auto k6_int = k6.to_int64();
columns[5]->insert_data((const char*)&k6_int, sizeof(k6_int));

VecDateTimeValue k7;
k7.from_date_str("2636-08-16 19:39:43", 19);
auto k7_int = k7.to_int64();
columns[6]->insert_data((const char*)&k7_int, sizeof(k7_int));

columns[7]->insert_data("abcd", 4);
columns[8]->insert_data("abcde", 5);

DecimalV2Value decimal_value;
decimal_value.assign_from_double(1.1);
columns[9]->insert_data((const char*)&decimal_value, sizeof(decimal_value));

DateV2Value<DateV2ValueType> date_v2;
date_v2.from_date_str("2048-11-10", 10);
auto date_v2_int = date_v2.to_date_int_val();
columns[10]->insert_data((const char*)&date_v2_int, sizeof(date_v2_int));

int8_t v1 = -127;
columns[11]->insert_data((const char*)&v1, sizeof(v1));

int16_t v2 = -32767;
columns[12]->insert_data((const char*)&v2, sizeof(v2));

int32_t v3 = -2147483647;
columns[13]->insert_data((const char*)&v3, sizeof(v3));

int64_t v4 = -9223372036854775807L;
columns[14]->insert_data((const char*)&v4, sizeof(v4));

int128_t v5 = -90000;
columns[15]->insert_data((const char*)&v5, sizeof(v5));

VecDateTimeValue v6;
v6.from_date_str("2048-11-10", 10);
auto v6_int = v6.to_int64();
columns[16]->insert_data((const char*)&v6_int, sizeof(v6_int));

VecDateTimeValue v7;
v7.from_date_str("2636-08-16 19:39:43", 19);
auto v7_int = v7.to_int64();
columns[17]->insert_data((const char*)&v7_int, sizeof(v7_int));

columns[18]->insert_data("abcd", 4);
columns[19]->insert_data("abcde", 5);

decimal_value.assign_from_double(1.1);
columns[20]->insert_data((const char*)&decimal_value, sizeof(decimal_value));

date_v2.from_date_str("2048-11-10", 10);
date_v2_int = date_v2.to_date_int_val();
columns[21]->insert_data((const char*)&date_v2_int, sizeof(date_v2_int));

res = delta_writer->write(&block, {0});
ASSERT_TRUE(res.ok());
}

res = delta_writer->close();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->build_rowset();
EXPECT_EQ(Status::OK(), res);
res = delta_writer->commit_txn(PSlaveTabletNodes());
EXPECT_EQ(Status::OK(), res);

// publish version success
TabletSharedPtr tablet = engine_ref->tablet_manager()->get_tablet(write_req.tablet_id);
std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl;
OlapMeta* meta = tablet->data_dir()->get_meta();
Version version;
version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
std::cout << "start to add rowset version:" << version.first << "-" << version.second
<< std::endl;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
engine_ref->txn_manager()->get_txn_related_tablets(write_req.txn_id, write_req.partition_id,
&tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_rs.second;
TabletPublishStatistics stats;
std::shared_ptr<TabletTxnInfo> extend_tablet_txn_info_lifetime = nullptr;
res = engine_ref->txn_manager()->publish_txn(
meta, write_req.partition_id, write_req.txn_id, write_req.tablet_id,
tablet_rs.first.tablet_uid, version, &stats, extend_tablet_txn_info_lifetime);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first
<< "-" << rowset->version().second << std::endl;
res = tablet->add_inc_rowset(rowset);
ASSERT_TRUE(res.ok());
}
ASSERT_EQ(1, tablet->num_rows());

std::vector<RowsetSharedPtr> all_rowsets;
res = tablet->capture_consistent_rowsets_unlocked(
Version(0, tablet->get_rowset_with_max_version()->end_version()), &all_rowsets);
EXPECT_EQ(Status::OK(), res);

auto total_column_data_size = 0;
auto total_segoemt_data_footprint = 0;

for (auto& rs : all_rowsets) {
std::vector<segment_v2::SegmentSharedPtr> segments;
Status res = ((BetaRowset*)rs.get())->load_segments(&segments);

for (const auto& segment : segments) {
std::shared_ptr<SegmentFooterPB> footer_pb_shared;

res = segment->get_segment_footer(footer_pb_shared, nullptr);
EXPECT_EQ(Status::OK(), res);

auto seg_footprint = footer_pb_shared->data_footprint();

for (const auto& column : footer_pb_shared->columns()) {
auto column_data_size = column.estimate_total_data_size();
total_column_data_size += column_data_size;
}

total_segoemt_data_footprint += seg_footprint;
}
}

EXPECT_EQ(total_column_data_size, total_segoemt_data_footprint);

res = engine_ref->tablet_manager()->drop_tablet(request.tablet_id, request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
}

} // namespace doris
23 changes: 23 additions & 0 deletions be/test/olap/segcompaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,29 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
}
EXPECT_EQ(total_num_rows, num_rows_read);
}
auto total_column_data_size = 0;
auto total_segment_data_footprint = 0;

std::vector<segment_v2::SegmentSharedPtr> segments;
Status res = ((BetaRowset*)rowset.get())->load_segments(&segments);
ASSERT_TRUE(res.ok());

for (const auto& segment : segments) {
std::shared_ptr<SegmentFooterPB> footer_pb_shared;

res = segment->get_segment_footer(footer_pb_shared, nullptr);
EXPECT_EQ(Status::OK(), res);

auto seg_footprint = footer_pb_shared->data_footprint();

for (const auto& column : footer_pb_shared->columns()) {
auto column_data_size = column.estimate_total_data_size();
total_column_data_size += column_data_size;
}

total_segment_data_footprint += seg_footprint;
}
EXPECT_EQ(total_segment_data_footprint, total_column_data_size);
}
}

Expand Down
Loading