Skip to content

Commit f687dbd

Browse files
committed
Refactor code
Signed-off-by: Jin Hai <[email protected]>
1 parent ba017c4 commit f687dbd

File tree

4 files changed

+26
-71
lines changed

4 files changed

+26
-71
lines changed

src/storage/catalog/new_catalog_static.cpp

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,7 @@ Status NewCatalog::InitCatalog(KVInstance *kv_instance, TxnTimeStamp checkpoint_
114114
}
115115

116116
auto InitBlockColumn = [&](ColumnMeta &column_meta) {
117-
Status status = column_meta.LoadSet();
118-
if (!status.ok()) {
119-
return status;
120-
}
121-
return Status::OK();
117+
return column_meta.LoadSet();
122118
};
123119
auto InitBlock = [&](BlockMeta &block_meta) {
124120
SharedPtr<Vector<SharedPtr<ColumnDef>>> column_defs_ptr;
@@ -133,12 +129,7 @@ Status NewCatalog::InitCatalog(KVInstance *kv_instance, TxnTimeStamp checkpoint_
133129
return status;
134130
}
135131
}
136-
Status status = block_meta.LoadSet(checkpoint_ts);
137-
if (!status.ok()) {
138-
return status;
139-
}
140-
141-
return Status::OK();
132+
return block_meta.LoadSet(checkpoint_ts);
142133
};
143134
auto InitSegment = [&](SegmentMeta &segment_meta) {
144135
auto [block_ids, status] = segment_meta.GetBlockIDs1();
@@ -376,9 +367,9 @@ Status NewCatalog::GetAllMemIndexes(KVInstance *kv_instance,
376367
SegmentIndexMeta segment_index_meta(segment_id, table_index_meta);
377368

378369
SharedPtr<MemIndex> mem_index;
379-
Status status = segment_index_meta.GetMemIndex(mem_index);
380-
if (!status.ok()) {
381-
return status;
370+
Status status_mem_index = segment_index_meta.GetMemIndex(mem_index);
371+
if (!status_mem_index.ok()) {
372+
return status_mem_index;
382373
}
383374

384375
mem_indexes.push_back(mem_index);
@@ -433,7 +424,7 @@ Status NewCatalog::GetAllMemIndexes(KVInstance *kv_instance,
433424
const String &db_id_str = (*db_id_strs_ptr)[i];
434425
const String &db_name = (*db_names_ptr)[i];
435426
DBMeeta db_meta(db_id_str, *kv_instance);
436-
Status status = TraverseDB(db_meta, db_name);
427+
status = TraverseDB(db_meta, db_name);
437428
if (!status.ok()) {
438429
return status;
439430
}
@@ -509,7 +500,7 @@ Status NewCatalog::AddNewTable(DBMeeta &db_meta,
509500
TxnTimeStamp commit_ts,
510501
const SharedPtr<TableDef> &table_def,
511502
Optional<TableMeeta> &table_meta) {
512-
// Create table key value pair
503+
// Create table a key value pair
513504
KVInstance &kv_instance = db_meta.kv_instance();
514505
String table_key = KeyEncode::CatalogTableKey(db_meta.db_id_str(), *table_def->table_name(), commit_ts);
515506
Status status = kv_instance.Put(table_key, table_id_str);
@@ -584,7 +575,7 @@ Status NewCatalog::AddNewTableIndex(TableMeeta &table_meta,
584575
TxnTimeStamp commit_ts,
585576
const SharedPtr<IndexBase> &index_base,
586577
Optional<TableIndexMeeta> &table_index_meta) {
587-
// Create index key value pair
578+
// Create index a key value pair
588579
KVInstance &kv_instance = table_meta.kv_instance();
589580
const String &index_name = *index_base->index_name_;
590581
String index_key = KeyEncode::CatalogIndexKey(table_meta.db_id_str(), table_meta.table_id_str(), index_name, commit_ts);
@@ -664,12 +655,7 @@ Status NewCatalog::CleanTableIndex(TableIndexMeeta &table_index_meta,
664655
}
665656
}
666657

667-
Status status = table_index_meta.UninitSet1(usage_flag);
668-
if (!status.ok()) {
669-
return status;
670-
}
671-
672-
return Status::OK();
658+
return table_index_meta.UninitSet1(usage_flag);
673659
}
674660

675661
// Status NewCatalog::AddNewSegment(TableMeeta &table_meta, SegmentID segment_id, Optional<SegmentMeta> &segment_meta) {
@@ -697,11 +683,7 @@ Status NewCatalog::AddNewSegment1(TableMeeta &table_meta, TxnTimeStamp commit_ts
697683
return status;
698684
}
699685
segment_meta.emplace(segment_id, table_meta);
700-
status = segment_meta->InitSet();
701-
if (!status.ok()) {
702-
return status;
703-
}
704-
return Status::OK();
686+
return segment_meta->InitSet();
705687
}
706688

707689
Status NewCatalog::AddNewSegmentWithID(TableMeeta &table_meta, TxnTimeStamp commit_ts, Optional<SegmentMeta> &segment_meta, SegmentID segment_id) {
@@ -710,11 +692,7 @@ Status NewCatalog::AddNewSegmentWithID(TableMeeta &table_meta, TxnTimeStamp comm
710692
return status;
711693
}
712694
segment_meta.emplace(segment_id, table_meta);
713-
status = segment_meta->InitSet();
714-
if (!status.ok()) {
715-
return status;
716-
}
717-
return Status::OK();
695+
return segment_meta->InitSet();
718696
}
719697

720698
Status NewCatalog::LoadFlushedSegment1(TableMeeta &table_meta, const WalSegmentInfo &segment_info, TxnTimeStamp checkpoint_ts) {
@@ -859,7 +837,7 @@ Status NewCatalog::AddNewBlockForTransform(SegmentMeta &segment_meta, TxnTimeSta
859837
return Status::OK();
860838
}
861839
//
862-
// Status NewCatalog::AddNewTableIndexForTransform(TableMeeta &table_meta,TxnTimeStamp commit_ts,Optional<TableIndexMeta> &table_index_meta) {
840+
// Status NewCatalog::AddNewTableIndexForTransform(TableMeeta &table_meta, TxnTimeStamp commit_ts, Optional<TableIndexMeta> &table_index_meta) {
863841
// Status status;
864842
// BlockID block_id;
865843
// std::tie(block_id, status) = table_meta.AddBlockID1(commit_ts);
@@ -1246,7 +1224,7 @@ Status NewCatalog::GetBlockVisibleRange(BlockMeta &block_meta, TxnTimeStamp begi
12461224
BufferHandle buffer_handle = version_buffer->Load();
12471225
SharedPtr<BlockLock> block_lock;
12481226
{
1249-
Status status = block_meta.GetBlockLock(block_lock);
1227+
status = block_meta.GetBlockLock(block_lock);
12501228
if (!status.ok()) {
12511229
return status;
12521230
}
@@ -1371,9 +1349,9 @@ Status NewCatalog::GetBlockFilePaths(BlockMeta &block_meta, Vector<String> &file
13711349
if (!status.ok()) {
13721350
return status;
13731351
}
1374-
if (!column_def) {
1375-
for (const auto &column_def : *column_defs_ptr) {
1376-
ColumnMeta column_meta(column_def->id(), block_meta);
1352+
if (column_def == nullptr) {
1353+
for (const auto &column_def_ptr : *column_defs_ptr) {
1354+
ColumnMeta column_meta(column_def_ptr->id(), block_meta);
13771355
status = GetBlockColumnFilePaths(column_meta, file_paths);
13781356
if (!status.ok()) {
13791357
return status;
@@ -1539,7 +1517,7 @@ Status NewCatalog::CheckSegmentRowsVisible(SegmentMeta &segment_meta, TxnTimeSta
15391517
}
15401518
for (BlockID block_id : *block_ids_ptr) {
15411519
BlockMeta block_meta(block_id, segment_meta);
1542-
Status status = NewCatalog::SetBlockDeleteBitmask(block_meta, begin_ts, bitmask);
1520+
status = NewCatalog::SetBlockDeleteBitmask(block_meta, begin_ts, bitmask);
15431521
if (!status.ok()) {
15441522
return status;
15451523
}

src/storage/new_txn/new_txn_manager.cpp

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ UniquePtr<NewTxn> NewTxnManager::BeginRecoveryTxn() {
192192

193193
// current_ts_ += 2;
194194
prepare_commit_ts_ = current_ts_ + 2;
195-
TxnTimeStamp commit_ts = current_ts_ + 2; // Will not be used, actually.
195+
TxnTimeStamp commit_ts = current_ts_ + 2; // Will not be used
196196
TxnTimeStamp begin_ts = current_ts_ + 1; // current_ts_ > 0
197197

198198
// Create txn instance
@@ -239,14 +239,6 @@ TxnTimeStamp NewTxnManager::GetWriteCommitTS(SharedPtr<NewTxn> txn) {
239239
return commit_ts;
240240
}
241241

242-
TxnTimeStamp NewTxnManager::GetReplayWriteCommitTS(NewTxn *txn) {
243-
std::lock_guard guard(locker_);
244-
prepare_commit_ts_ += 2;
245-
TxnTimeStamp commit_ts = prepare_commit_ts_;
246-
txn->SetTxnWrite();
247-
return commit_ts;
248-
}
249-
250242
bool NewTxnManager::CheckConflict1(NewTxn *txn, String &conflict_reason, bool &retry_query) {
251243
TxnTimeStamp begin_ts = txn->BeginTS();
252244
TxnTimeStamp commit_ts = txn->CommitTS();
@@ -423,7 +415,7 @@ void NewTxnManager::CommitBottom(NewTxn *txn) {
423415

424416
// ensure notify top half orderly per commit_ts
425417
while (!bottom_txns_.empty()) {
426-
auto iter = bottom_txns_.begin();
418+
iter = bottom_txns_.begin();
427419
TxnTimeStamp it_ts = iter->first;
428420
SharedPtr<NewTxn> it_txn = iter->second;
429421
if (current_ts_ > it_ts || it_ts > prepare_commit_ts_) {
@@ -509,7 +501,7 @@ void NewTxnManager::CleanupTxn(NewTxn *txn) {
509501
TransactionID txn_id = txn->TxnID();
510502
LOG_DEBUG(fmt::format("Cleanup txn, id: {}, begin_ts: {}", txn_id, begin_ts));
511503
if (is_write_transaction) {
512-
// For write txn, we need to update the state: committing->committed, rollbacking->rollbacked
504+
// For writing txn, we need to update the state: committing->committed, rollbacking->rollbacked
513505
TxnState txn_state = txn->GetTxnState();
514506
switch (txn_state) {
515507
case TxnState::kCommitting: {
@@ -580,16 +572,6 @@ void NewTxnManager::CleanupTxnBottomNolock(TransactionID txn_id, TxnTimeStamp be
580572
}
581573
}
582574

583-
bool NewTxnManager::InCheckpointProcess(TxnTimeStamp commit_ts) {
584-
std::lock_guard guard(locker_);
585-
if (commit_ts > ckp_begin_ts_) {
586-
LOG_TRACE(fmt::format("Full/Delta checkpoint begin at {}, cur txn commit_ts: {}, swap to new wal file", ckp_begin_ts_, commit_ts));
587-
ckp_begin_ts_ = UNCOMMIT_TS;
588-
return true;
589-
}
590-
return false;
591-
}
592-
593575
void NewTxnManager::PrintAllKeyValue() const {
594576
std::cout << String("All store key and value: ") << std::endl;
595577
std::cout << kv_store_->ToString() << std::endl;

src/storage/new_txn/new_txn_manager.cppm

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ public:
6464

6565
TxnTimeStamp GetWriteCommitTS(SharedPtr<NewTxn> txn);
6666

67-
TxnTimeStamp GetReplayWriteCommitTS(NewTxn *txn);
68-
6967
// Optional<String> CheckTxnConflict(NewTxn *txn);
7068

7169
bool CheckConflict1(NewTxn *txn, String &conflict_reason, bool &retry_query);
@@ -118,8 +116,6 @@ private:
118116
void CleanupTxnBottomNolock(TransactionID txn_id, TxnTimeStamp begin_ts);
119117

120118
public:
121-
bool InCheckpointProcess(TxnTimeStamp commit_ts);
122-
123119
// Only used by follower and learner when received the replicated log from leader
124120
void SetStartTS(TxnTimeStamp new_start_ts) { current_ts_ = new_start_ts; }
125121

src/storage/storage.cppm

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,8 @@
1515
module;
1616

1717
import stl;
18-
import config;
19-
import catalog;
20-
import txn_manager;
21-
import buffer_manager;
2218
import wal_manager;
23-
import object_storage_process;
2419
import log_file;
25-
import persistence_manager;
26-
import virtual_store;
2720
import status;
2821

2922
export module storage;
@@ -33,13 +26,19 @@ namespace infinity {
3326
class CleanupInfoTracer;
3427
class ResultCacheManager;
3528
class NewCatalog;
29+
class Catalog;
3630
class NewTxnManager;
31+
class TxnManager;
3732
class KVStore;
3833
class KVInstance;
3934
class PeriodicTriggerThread;
4035
class CompactionProcessor;
4136
class BGTaskProcessor;
4237
class BGMemIndexTracer;
38+
class ObjectStorageProcess;
39+
class Config;
40+
class BufferManager;
41+
class PersistenceManager;
4342

4443
export enum class ReaderInitPhase {
4544
kInvalid,

0 commit comments

Comments
 (0)