54
54
#include " olap/short_key_index.h"
55
55
#include " olap/tablet_schema.h"
56
56
#include " olap/utils.h"
57
+ #include " runtime/define_primitive_type.h"
57
58
#include " runtime/exec_env.h"
58
59
#include " runtime/memory/mem_tracker.h"
59
60
#include " service/point_query_executor.h"
62
63
#include " util/debug_points.h"
63
64
#include " util/faststring.h"
64
65
#include " util/key_util.h"
66
+ #include " vec/columns/column_map.h"
65
67
#include " vec/columns/column_nullable.h"
66
68
#include " vec/columns/column_vector.h"
67
69
#include " vec/columns/columns_number.h"
@@ -1174,7 +1176,9 @@ Status VerticalSegmentWriter::write_batch() {
1174
1176
vectorized::IOlapColumnDataAccessor* seq_column = nullptr ;
1175
1177
// the key is cluster key column unique id
1176
1178
std::map<uint32_t , vectorized::IOlapColumnDataAccessor*> cid_to_column;
1179
+ int64_t total_data_size = 0 ;
1177
1180
for (uint32_t cid = 0 ; cid < _tablet_schema->num_columns (); ++cid) {
1181
+ int64_t column_data_size = 0 ;
1178
1182
RETURN_IF_ERROR (_create_column_writer (cid, _tablet_schema->column (cid), _tablet_schema));
1179
1183
for (auto & data : _batched_blocks) {
1180
1184
RETURN_IF_ERROR (_olap_data_convertor->set_source_content_with_specifid_columns (
@@ -1200,17 +1204,28 @@ Status VerticalSegmentWriter::write_batch() {
1200
1204
}
1201
1205
RETURN_IF_ERROR (_column_writers[cid]->append (column->get_nullmap (), column->get_data (),
1202
1206
data.num_rows ));
1207
+ if (_data_dir != nullptr &&
1208
+ _data_dir->reach_capacity_limit (_column_writers[cid]->estimate_buffer_size ())) {
1209
+ return Status::Error<DISK_REACH_CAPACITY_LIMIT>(" disk {} exceed capacity limit." ,
1210
+ _data_dir->path_hash ());
1211
+ }
1212
+
1213
+ // estimate column data size for flush memtable, may be inaccurate at low cardinality
1214
+ column_data_size += _column_writers[cid]->estimate_buffer_size ();
1215
+ total_data_size += column_data_size;
1216
+ auto origin_data_size = _footer.columns (cid).total_data_size ();
1217
+ _footer.mutable_columns (cid)->set_total_data_size (origin_data_size + column_data_size);
1218
+
1219
+ RETURN_IF_ERROR (_column_writers[cid]->finish ());
1220
+ RETURN_IF_ERROR (_column_writers[cid]->write_data ());
1221
+
1203
1222
_olap_data_convertor->clear_source_content ();
1204
1223
}
1205
- if (_data_dir != nullptr &&
1206
- _data_dir->reach_capacity_limit (_column_writers[cid]->estimate_buffer_size ())) {
1207
- return Status::Error<DISK_REACH_CAPACITY_LIMIT>(" disk {} exceed capacity limit." ,
1208
- _data_dir->path_hash ());
1209
- }
1210
- RETURN_IF_ERROR (_column_writers[cid]->finish ());
1211
- RETURN_IF_ERROR (_column_writers[cid]->write_data ());
1212
1224
}
1213
1225
1226
+ auto origin_data_footprint = _footer.data_footprint ();
1227
+ _footer.set_data_footprint (origin_data_footprint + total_data_size);
1228
+
1214
1229
for (auto & data : _batched_blocks) {
1215
1230
_olap_data_convertor->set_source_content (data.block , data.row_pos , data.num_rows );
1216
1231
RETURN_IF_ERROR (_generate_key_index (data, key_columns, seq_column, cid_to_column));
0 commit comments