Skip to content

Commit 7cfec6a

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent adb206a commit 7cfec6a

File tree

1 file changed

+38
-27
lines changed

1 file changed

+38
-27
lines changed

lightning/src/util/sweep.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -450,7 +452,7 @@ where
450452

451453
state_lock.persistent.outputs.push(output_info);
452454
}
453-
self.persist_state(&state_lock).map_err(|e| {
455+
self.flush_state(&mut state_lock).map_err(|e| {
454456
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
455457
})
456458
}
@@ -478,7 +480,19 @@ where
478480
return Ok(());
479481
}
480482

481-
let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
483+
let result = {
484+
self.regenerate_and_broadcast_spend_if_necessary_internal().await?;
485+
486+
// If there is still dirty state, we need to persist it.
487+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
488+
if sweeper_state.dirty {
489+
self.flush_state(&mut sweeper_state).map_err(|e| {
490+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
491+
})
492+
} else {
493+
Ok(())
494+
}
495+
};
482496

483497
// Release the pending sweep flag again, regardless of result.
484498
self.pending_sweep.store(false, Ordering::Release);
@@ -571,7 +585,7 @@ where
571585
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
572586
}
573587

574-
self.persist_state(&sweeper_state).map_err(|e| {
588+
self.flush_state(&mut sweeper_state).map_err(|e| {
575589
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
576590
})?;
577591

@@ -599,9 +613,12 @@ where
599613
}
600614
true
601615
});
616+
617+
sweeper_state.dirty = true;
602618
}
603619

604-
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
620+
/// Flushes the current state to the persistence layer and marks the state as clean.
621+
fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
605622
self.kv_store
606623
.write(
607624
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -620,6 +637,9 @@ where
620637
);
621638
e
622639
})
640+
.map(|_| {
641+
sweeper_state.dirty = false;
642+
})
623643
}
624644

625645
fn spend_outputs(
@@ -652,13 +672,17 @@ where
652672
}
653673
}
654674
}
675+
676+
sweeper_state.dirty = true;
655677
}
656678

657679
fn best_block_updated_internal(
658680
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
659681
) {
660682
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
661683
self.prune_confirmed_outputs(sweeper_state);
684+
685+
sweeper_state.dirty = true;
662686
}
663687
}
664688

@@ -682,12 +706,8 @@ where
682706
assert_eq!(state_lock.persistent.best_block.height, height - 1,
683707
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
684708

685-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
686-
self.best_block_updated_internal(&mut *state_lock, header, height);
687-
688-
let _ = self.persist_state(&*state_lock).map_err(|e| {
689-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
690-
});
709+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
710+
self.best_block_updated_internal(&mut state_lock, header, height);
691711
}
692712

693713
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -709,9 +729,7 @@ where
709729
}
710730
}
711731

712-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
713-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
714-
});
732+
state_lock.dirty = true;
715733
}
716734
}
717735

@@ -731,9 +749,6 @@ where
731749
) {
732750
let mut state_lock = self.sweeper_state.lock().unwrap();
733751
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
734-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
735-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
736-
});
737752
}
738753

739754
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -756,18 +771,13 @@ where
756771
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
757772
.for_each(|o| o.status.unconfirmed());
758773

759-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
760-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
761-
});
774+
state_lock.dirty = true;
762775
}
763776
}
764777

765778
fn best_block_updated(&self, header: &Header, height: u32) {
766779
let mut state_lock = self.sweeper_state.lock().unwrap();
767-
self.best_block_updated_internal(&mut *state_lock, header, height);
768-
let _ = self.persist_state(&*state_lock).map_err(|e| {
769-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
770-
});
780+
self.best_block_updated_internal(&mut state_lock, header, height);
771781
}
772782

773783
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -796,6 +806,7 @@ where
796806
#[derive(Debug)]
797807
struct SweeperState {
798808
persistent: PersistentSweeperState,
809+
dirty: bool,
799810
}
800811

801812
#[derive(Debug, Clone)]
@@ -860,7 +871,7 @@ where
860871
}
861872
}
862873

863-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
874+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
864875
Ok(Self {
865876
sweeper_state,
866877
pending_sweep: AtomicBool::new(false),
@@ -909,7 +920,7 @@ where
909920
}
910921
}
911922

912-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
923+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
913924
Ok((
914925
best_block,
915926
OutputSweeper {

0 commit comments

Comments
 (0)