-
Notifications
You must be signed in to change notification settings - Fork 432
Use BestBlock for chain state serialization (and somewhat parallelize init)
#4266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a147ba1
ba279a8
98ca43d
9f71c38
43f0a6f
b64cb54
ef4a298
f28cf2f
a15455e
61e1dff
f54a087
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| ../../lightning/src/util/async_poll.rs | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this symlinking work on Windows? I can't believe that it is necessary to use this hack. I think it can be called a hack? |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,11 @@ | ||
| //! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects | ||
| //! from disk. | ||
|
|
||
| use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; | ||
| use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; | ||
| use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; | ||
| use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; | ||
| use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; | ||
|
|
||
| use bitcoin::block::Header; | ||
| use bitcoin::hash_types::BlockHash; | ||
| use bitcoin::network::Network; | ||
|
|
||
| use lightning::chain; | ||
|
|
@@ -32,19 +32,18 @@ where | |
| /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each | ||
| /// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. | ||
| /// | ||
| /// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of | ||
| /// failure, each listener may be left at a different block hash than the one it was originally | ||
| /// paired with. | ||
| /// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In | ||
| /// the case of failure, each listener may be left at a different block hash than the one it was | ||
| /// originally paired with. | ||
| /// | ||
| /// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before | ||
| /// switching to [`SpvClient`]. For example: | ||
| /// | ||
| /// ``` | ||
| /// use bitcoin::hash_types::BlockHash; | ||
| /// use bitcoin::network::Network; | ||
| /// | ||
| /// use lightning::chain; | ||
| /// use lightning::chain::Watch; | ||
| /// use lightning::chain::{BestBlock, Watch}; | ||
| /// use lightning::chain::chainmonitor; | ||
| /// use lightning::chain::chainmonitor::ChainMonitor; | ||
| /// use lightning::chain::channelmonitor::ChannelMonitor; | ||
|
|
@@ -89,14 +88,14 @@ where | |
| /// logger: &L, | ||
| /// persister: &P, | ||
| /// ) { | ||
| /// // Read a serialized channel monitor paired with the block hash when it was persisted. | ||
| /// // Read a serialized channel monitor paired with the best block when it was persisted. | ||
| /// let serialized_monitor = "..."; | ||
| /// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<SP::EcdsaSigner>)>::read( | ||
| /// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor<SP::EcdsaSigner>)>::read( | ||
| /// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap(); | ||
| /// | ||
| /// // Read the channel manager paired with the block hash when it was persisted. | ||
| /// // Read the channel manager paired with the best block when it was persisted. | ||
| /// let serialized_manager = "..."; | ||
| /// let (manager_block_hash, mut manager) = { | ||
| /// let (manager_best_block, mut manager) = { | ||
| /// let read_args = ChannelManagerReadArgs::new( | ||
| /// entropy_source, | ||
| /// node_signer, | ||
|
|
@@ -110,19 +109,18 @@ where | |
| /// config, | ||
| /// vec![&mut monitor], | ||
| /// ); | ||
| /// <(BlockHash, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P, &ES>, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( | ||
| /// <(BestBlock, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P, &ES>, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( | ||
| /// &mut Cursor::new(&serialized_manager), read_args).unwrap() | ||
| /// }; | ||
| /// | ||
| /// // Synchronize any channel monitors and the channel manager to be on the best block. | ||
| /// let mut cache = UnboundedCache::new(); | ||
| /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); | ||
| /// let listeners = vec![ | ||
| /// (monitor_block_hash, &monitor_listener as &dyn chain::Listen), | ||
| /// (manager_block_hash, &manager as &dyn chain::Listen), | ||
| /// (monitor_best_block, &monitor_listener as &dyn chain::Listen), | ||
| /// (manager_best_block, &manager as &dyn chain::Listen), | ||
| /// ]; | ||
| /// let chain_tip = init::synchronize_listeners( | ||
| /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); | ||
| /// let (chain_cache, chain_tip) = init::synchronize_listeners( | ||
| /// block_source, Network::Bitcoin, listeners).await.unwrap(); | ||
| /// | ||
| /// // Allow the chain monitor to watch any channels. | ||
| /// let monitor = monitor_listener.0; | ||
|
|
@@ -131,94 +129,94 @@ where | |
| /// // Create an SPV client to notify the chain monitor and channel manager of block events. | ||
| /// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); | ||
| /// let mut chain_listener = (chain_monitor, &manager); | ||
| /// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); | ||
| /// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener); | ||
| /// } | ||
| /// ``` | ||
| /// | ||
| /// [`SpvClient`]: crate::SpvClient | ||
| /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager | ||
| /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor | ||
| pub async fn synchronize_listeners< | ||
| B: Deref + Sized + Send + Sync, | ||
| C: Cache, | ||
| L: chain::Listen + ?Sized, | ||
| >( | ||
| block_source: B, network: Network, header_cache: &mut C, | ||
| mut chain_listeners: Vec<(BlockHash, &L)>, | ||
| ) -> BlockSourceResult<ValidatedBlockHeader> | ||
| pub async fn synchronize_listeners<B: Deref + Sized + Send + Sync, L: chain::Listen + ?Sized>( | ||
| block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, | ||
| ) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)> | ||
| where | ||
| B::Target: BlockSource, | ||
| { | ||
| let best_header = validate_best_block_header(&*block_source).await?; | ||
|
|
||
| // Fetch the header for the block hash paired with each listener. | ||
| let mut chain_listeners_with_old_headers = Vec::new(); | ||
| for (old_block_hash, chain_listener) in chain_listeners.drain(..) { | ||
| let old_header = match header_cache.look_up(&old_block_hash) { | ||
| Some(header) => *header, | ||
| None => { | ||
| block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? | ||
| }, | ||
| }; | ||
| chain_listeners_with_old_headers.push((old_header, chain_listener)) | ||
| } | ||
|
|
||
| // Find differences and disconnect blocks for each listener individually. | ||
| let mut chain_poller = ChainPoller::new(block_source, network); | ||
| let mut chain_listeners_at_height = Vec::new(); | ||
| let mut most_common_ancestor = None; | ||
| let mut most_connected_blocks = Vec::new(); | ||
| for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { | ||
| let mut header_cache = HeaderCache::new(); | ||
| for (old_best_block, chain_listener) in chain_listeners.drain(..) { | ||
| // Disconnect any stale blocks, but keep them in the cache for the next iteration. | ||
| let header_cache = &mut ReadOnlyCache(header_cache); | ||
| let (common_ancestor, connected_blocks) = { | ||
| let chain_listener = &DynamicChainListener(chain_listener); | ||
| let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; | ||
| let difference = | ||
| chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; | ||
| chain_notifier.disconnect_blocks(difference.disconnected_blocks); | ||
| let mut chain_notifier = | ||
| ChainNotifier { header_cache: &mut header_cache, chain_listener }; | ||
| let difference = chain_notifier | ||
| .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) | ||
| .await?; | ||
| if difference.common_ancestor.block_hash != old_best_block.block_hash { | ||
| chain_notifier.disconnect_blocks(difference.common_ancestor); | ||
| } | ||
| (difference.common_ancestor, difference.connected_blocks) | ||
| }; | ||
|
|
||
| // Keep track of the most common ancestor and all blocks connected across all listeners. | ||
| chain_listeners_at_height.push((common_ancestor.height, chain_listener)); | ||
| if connected_blocks.len() > most_connected_blocks.len() { | ||
| most_common_ancestor = Some(common_ancestor); | ||
| most_connected_blocks = connected_blocks; | ||
| } | ||
| } | ||
|
|
||
| // Connect new blocks for all listeners at once to avoid re-fetching blocks. | ||
| if let Some(common_ancestor) = most_common_ancestor { | ||
| let chain_listener = &ChainListenerSet(chain_listeners_at_height); | ||
| let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; | ||
| chain_notifier | ||
| .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) | ||
| .await | ||
| .map_err(|(e, _)| e)?; | ||
| } | ||
|
|
||
| Ok(best_header) | ||
| } | ||
|
|
||
| /// A wrapper to make a cache read-only. | ||
| /// | ||
| /// Used to prevent losing headers that may be needed to disconnect blocks common to more than one | ||
| /// listener. | ||
| struct ReadOnlyCache<'a, C: Cache>(&'a mut C); | ||
| while !most_connected_blocks.is_empty() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the parallel fetching useful in ChainNotifier.connect_blocks too? |
||
| #[cfg(not(test))] | ||
| const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded | ||
| #[cfg(test)] | ||
| const MAX_BLOCKS_AT_ONCE: usize = 2; | ||
|
|
||
| let mut fetch_block_futures = | ||
| Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len())); | ||
| for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) { | ||
| let fetch_future = chain_poller.fetch_block(header); | ||
| fetch_block_futures | ||
| .push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) }))); | ||
| } | ||
| let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter(); | ||
|
|
||
| impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { | ||
| fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { | ||
| self.0.look_up(block_hash) | ||
| } | ||
| let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE]; | ||
| for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { | ||
| *result = Some((header.height, block_res?)); | ||
| } | ||
| debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); | ||
| debug_assert!(fetched_blocks | ||
| .is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX))); | ||
|
|
||
| for (listener_height, listener) in chain_listeners_at_height.iter() { | ||
| // Connect blocks for this listener. | ||
| for result in fetched_blocks.iter() { | ||
| if let Some((height, block_data)) = result { | ||
| if *height > *listener_height { | ||
| match &**block_data { | ||
| BlockData::FullBlock(block) => { | ||
| listener.block_connected(&block, *height); | ||
| }, | ||
| BlockData::HeaderOnly(header_data) => { | ||
| listener.filtered_block_connected(&header_data, &[], *height); | ||
| }, | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { | ||
| unreachable!() | ||
| most_connected_blocks | ||
| .truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE)); | ||
| } | ||
|
|
||
| fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> { | ||
| None | ||
| } | ||
| Ok((header_cache, best_header)) | ||
| } | ||
|
|
||
| /// Wrapper for supporting dynamically sized chain listeners. | ||
|
|
@@ -236,33 +234,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L | |
| } | ||
| } | ||
|
|
||
| /// A set of dynamically sized chain listeners, each paired with a starting block height. | ||
| struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); | ||
|
|
||
| impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { | ||
| fn block_connected(&self, block: &bitcoin::Block, height: u32) { | ||
| for (starting_height, chain_listener) in self.0.iter() { | ||
| if height > *starting_height { | ||
| chain_listener.block_connected(block, height); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn filtered_block_connected( | ||
| &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, | ||
| ) { | ||
| for (starting_height, chain_listener) in self.0.iter() { | ||
| if height > *starting_height { | ||
| chain_listener.filtered_block_connected(header, txdata, height); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn blocks_disconnected(&self, _fork_point: BestBlock) { | ||
| unreachable!() | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
@@ -282,13 +253,12 @@ mod tests { | |
| let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4)); | ||
|
|
||
| let listeners = vec![ | ||
| (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen), | ||
| (chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen), | ||
| (chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen), | ||
| (chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen), | ||
| (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), | ||
| (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), | ||
| ]; | ||
| let mut cache = chain.header_cache(0..=4); | ||
| match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { | ||
| Ok(header) => assert_eq!(header, chain.tip()), | ||
| match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { | ||
| Ok((_, header)) => assert_eq!(header, chain.tip()), | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| } | ||
| } | ||
|
|
@@ -314,15 +284,12 @@ mod tests { | |
| .expect_block_connected(*main_chain.at_height(4)); | ||
|
|
||
| let listeners = vec![ | ||
| (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), | ||
| (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), | ||
| (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), | ||
| (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), | ||
| (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), | ||
| (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), | ||
| ]; | ||
| let mut cache = fork_chain_1.header_cache(2..=4); | ||
| cache.extend(fork_chain_2.header_cache(3..=4)); | ||
| cache.extend(fork_chain_3.header_cache(4..=4)); | ||
| match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { | ||
| Ok(header) => assert_eq!(header, main_chain.tip()), | ||
| match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { | ||
| Ok((_, header)) => assert_eq!(header, main_chain.tip()), | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| } | ||
| } | ||
|
|
@@ -351,37 +318,12 @@ mod tests { | |
| .expect_block_connected(*main_chain.at_height(4)); | ||
|
|
||
| let listeners = vec![ | ||
| (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), | ||
| (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), | ||
| (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), | ||
| (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), | ||
| (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), | ||
| (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), | ||
| ]; | ||
| let mut cache = fork_chain_1.header_cache(2..=4); | ||
| cache.extend(fork_chain_2.header_cache(3..=4)); | ||
| cache.extend(fork_chain_3.header_cache(4..=4)); | ||
| match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { | ||
| Ok(header) => assert_eq!(header, main_chain.tip()), | ||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn cache_connected_and_keep_disconnected_blocks() { | ||
| let main_chain = Blockchain::default().with_height(2); | ||
| let fork_chain = main_chain.fork_at_height(1); | ||
| let new_tip = main_chain.tip(); | ||
| let old_tip = fork_chain.tip(); | ||
|
|
||
| let listener = MockChainListener::new() | ||
| .expect_blocks_disconnected(*fork_chain.at_height(1)) | ||
| .expect_block_connected(*new_tip); | ||
|
|
||
| let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)]; | ||
| let mut cache = fork_chain.header_cache(2..=2); | ||
| match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { | ||
| Ok(_) => { | ||
| assert!(cache.contains_key(&new_tip.block_hash)); | ||
| assert!(cache.contains_key(&old_tip.block_hash)); | ||
| }, | ||
| match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { | ||
| Ok((_, header)) => assert_eq!(header, main_chain.tip()), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there test coverage for the returned cache? |
||
| Err(e) => panic!("Unexpected error: {:?}", e), | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commit msg typo