Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ use lightning::routing::utxo::UtxoLookup;
use lightning::sign::{
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
};
#[cfg(not(c_bindings))]
use lightning::util::async_poll::MaybeSend;
use lightning::util::logger::Logger;
#[cfg(not(c_bindings))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit msg typo

use lightning::util::native_async::MaybeSend;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
Expand Down
1 change: 1 addition & 0 deletions lightning-block-sync/src/async_poll.rs
232 changes: 87 additions & 145 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects
//! from disk.

use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier};
use crate::async_poll::{MultiResultFuturePoller, ResultFuture};
use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};

use bitcoin::block::Header;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::Network;

use lightning::chain;
Expand All @@ -32,19 +32,18 @@ where
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
///
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
/// failure, each listener may be left at a different block hash than the one it was originally
/// paired with.
/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In
/// the case of failure, each listener may be left at a different block hash than the one it was
/// originally paired with.
///
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
/// switching to [`SpvClient`]. For example:
///
/// ```
/// use bitcoin::hash_types::BlockHash;
/// use bitcoin::network::Network;
///
/// use lightning::chain;
/// use lightning::chain::Watch;
/// use lightning::chain::{BestBlock, Watch};
/// use lightning::chain::chainmonitor;
/// use lightning::chain::chainmonitor::ChainMonitor;
/// use lightning::chain::channelmonitor::ChannelMonitor;
Expand Down Expand Up @@ -89,14 +88,14 @@ where
/// logger: &L,
/// persister: &P,
/// ) {
/// // Read a serialized channel monitor paired with the block hash when it was persisted.
/// // Read a serialized channel monitor paired with the best block when it was persisted.
/// let serialized_monitor = "...";
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<SP::EcdsaSigner>)>::read(
/// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor<SP::EcdsaSigner>)>::read(
/// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap();
///
/// // Read the channel manager paired with the block hash when it was persisted.
/// // Read the channel manager paired with the best block when it was persisted.
/// let serialized_manager = "...";
/// let (manager_block_hash, mut manager) = {
/// let (manager_best_block, mut manager) = {
/// let read_args = ChannelManagerReadArgs::new(
/// entropy_source,
/// node_signer,
Expand All @@ -110,19 +109,18 @@ where
/// config,
/// vec![&mut monitor],
/// );
/// <(BlockHash, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P, &ES>, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read(
/// <(BestBlock, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P, &ES>, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read(
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
/// };
///
/// // Synchronize any channel monitors and the channel manager to be on the best block.
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
/// let listeners = vec![
/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen),
/// (manager_block_hash, &manager as &dyn chain::Listen),
/// (monitor_best_block, &monitor_listener as &dyn chain::Listen),
/// (manager_best_block, &manager as &dyn chain::Listen),
/// ];
/// let chain_tip = init::synchronize_listeners(
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
/// let (chain_cache, chain_tip) = init::synchronize_listeners(
/// block_source, Network::Bitcoin, listeners).await.unwrap();
///
/// // Allow the chain monitor to watch any channels.
/// let monitor = monitor_listener.0;
Expand All @@ -131,94 +129,94 @@ where
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
/// let mut chain_listener = (chain_monitor, &manager);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener);
/// }
/// ```
///
/// [`SpvClient`]: crate::SpvClient
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub async fn synchronize_listeners<
B: Deref + Sized + Send + Sync,
C: Cache,
L: chain::Listen + ?Sized,
>(
block_source: B, network: Network, header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &L)>,
) -> BlockSourceResult<ValidatedBlockHeader>
pub async fn synchronize_listeners<B: Deref + Sized + Send + Sync, L: chain::Listen + ?Sized>(
block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>,
) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)>
where
B::Target: BlockSource,
{
let best_header = validate_best_block_header(&*block_source).await?;

// Fetch the header for the block hash paired with each listener.
let mut chain_listeners_with_old_headers = Vec::new();
for (old_block_hash, chain_listener) in chain_listeners.drain(..) {
let old_header = match header_cache.look_up(&old_block_hash) {
Some(header) => *header,
None => {
block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)?
},
};
chain_listeners_with_old_headers.push((old_header, chain_listener))
}

// Find differences and disconnect blocks for each listener individually.
let mut chain_poller = ChainPoller::new(block_source, network);
let mut chain_listeners_at_height = Vec::new();
let mut most_common_ancestor = None;
let mut most_connected_blocks = Vec::new();
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
let mut header_cache = HeaderCache::new();
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
let header_cache = &mut ReadOnlyCache(header_cache);
let (common_ancestor, connected_blocks) = {
let chain_listener = &DynamicChainListener(chain_listener);
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
let difference =
chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?;
chain_notifier.disconnect_blocks(difference.disconnected_blocks);
let mut chain_notifier =
ChainNotifier { header_cache: &mut header_cache, chain_listener };
let difference = chain_notifier
.find_difference_from_best_block(best_header, old_best_block, &mut chain_poller)
.await?;
if difference.common_ancestor.block_hash != old_best_block.block_hash {
chain_notifier.disconnect_blocks(difference.common_ancestor);
}
(difference.common_ancestor, difference.connected_blocks)
};

// Keep track of the most common ancestor and all blocks connected across all listeners.
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
if connected_blocks.len() > most_connected_blocks.len() {
most_common_ancestor = Some(common_ancestor);
most_connected_blocks = connected_blocks;
}
}

// Connect new blocks for all listeners at once to avoid re-fetching blocks.
if let Some(common_ancestor) = most_common_ancestor {
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
chain_notifier
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
.await
.map_err(|(e, _)| e)?;
}

Ok(best_header)
}

/// A wrapper to make a cache read-only.
///
/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
/// listener.
struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
while !most_connected_blocks.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the parallel fetching useful in ChainNotifier.connect_blocks too?

#[cfg(not(test))]
const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded
#[cfg(test)]
const MAX_BLOCKS_AT_ONCE: usize = 2;

let mut fetch_block_futures =
Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len()));
for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) {
let fetch_future = chain_poller.fetch_block(header);
fetch_block_futures
.push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) })));
}
let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter();

impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.0.look_up(block_hash)
}
let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE];
for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) {
*result = Some((header.height, block_res?));
}
debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some()));
debug_assert!(fetched_blocks
.is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX)));

for (listener_height, listener) in chain_listeners_at_height.iter() {
// Connect blocks for this listener.
for result in fetched_blocks.iter() {
if let Some((height, block_data)) = result {
if *height > *listener_height {
match &**block_data {
BlockData::FullBlock(block) => {
listener.block_connected(&block, *height);
},
BlockData::HeaderOnly(header_data) => {
listener.filtered_block_connected(&header_data, &[], *height);
},
}
}
}
}
}

fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
unreachable!()
most_connected_blocks
.truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE));
}

fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
None
}
Ok((header_cache, best_header))
}

/// Wrapper for supporting dynamically sized chain listeners.
Expand All @@ -236,33 +234,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
}
}

/// A set of dynamically sized chain listeners, each paired with a starting block height.
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);

impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.block_connected(block, height);
}
}
}

fn filtered_block_connected(
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.filtered_block_connected(header, txdata, height);
}
}
}

fn blocks_disconnected(&self, _fork_point: BestBlock) {
unreachable!()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -282,13 +253,12 @@ mod tests {
let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4));

let listeners = vec![
(chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen),
(chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen),
(chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen),
(chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen),
(chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen),
(chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen),
];
let mut cache = chain.header_cache(0..=4);
match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(header) => assert_eq!(header, chain.tip()),
match synchronize_listeners(&chain, Network::Bitcoin, listeners).await {
Ok((_, header)) => assert_eq!(header, chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
Expand All @@ -314,15 +284,12 @@ mod tests {
.expect_block_connected(*main_chain.at_height(4));

let listeners = vec![
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
(fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen),
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
];
let mut cache = fork_chain_1.header_cache(2..=4);
cache.extend(fork_chain_2.header_cache(3..=4));
cache.extend(fork_chain_3.header_cache(4..=4));
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(header) => assert_eq!(header, main_chain.tip()),
match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await {
Ok((_, header)) => assert_eq!(header, main_chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
Expand Down Expand Up @@ -351,37 +318,12 @@ mod tests {
.expect_block_connected(*main_chain.at_height(4));

let listeners = vec![
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
(fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen),
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
];
let mut cache = fork_chain_1.header_cache(2..=4);
cache.extend(fork_chain_2.header_cache(3..=4));
cache.extend(fork_chain_3.header_cache(4..=4));
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(header) => assert_eq!(header, main_chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}

#[tokio::test]
async fn cache_connected_and_keep_disconnected_blocks() {
let main_chain = Blockchain::default().with_height(2);
let fork_chain = main_chain.fork_at_height(1);
let new_tip = main_chain.tip();
let old_tip = fork_chain.tip();

let listener = MockChainListener::new()
.expect_blocks_disconnected(*fork_chain.at_height(1))
.expect_block_connected(*new_tip);

let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)];
let mut cache = fork_chain.header_cache(2..=2);
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(_) => {
assert!(cache.contains_key(&new_tip.block_hash));
assert!(cache.contains_key(&old_tip.block_hash));
},
match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await {
Ok((_, header)) => assert_eq!(header, main_chain.tip()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there test coverage for the returned cache?

Err(e) => panic!("Unexpected error: {:?}", e),
}
}
Expand Down
Loading
Loading