@@ -1485,10 +1485,13 @@ pub(crate) const CHANNEL_ANNOUNCEMENT_PROPAGATION_DELAY: u32 = 144;
14851485#[derive(Debug)]
14861486struct PendingChannelMonitorUpdate {
14871487 update: ChannelMonitorUpdate,
1488+ /// `MonitorEvent`s that can be ack'd after `update` is durably persisted.
1489+ post_update_ackable_events: Vec<MonitorEventSource>,
14881490}
14891491
14901492impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
14911493 (0, update, required),
1494+ (1, post_update_ackable_events, optional_vec),
14921495});
14931496
14941497/// A payment channel with a counterparty throughout its life-cycle, encapsulating negotiation and
@@ -7325,9 +7328,10 @@ where
73257328 if !update_blocked {
73267329 debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
73277330 let update = self.build_commitment_no_status_check(logger);
7328- self.context
7329- .blocked_monitor_updates
7330- .push(PendingChannelMonitorUpdate { update });
7331+ self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
7332+ update,
7333+ post_update_ackable_events: Vec::new(),
7334+ });
73317335 }
73327336 }
73337337
@@ -8558,8 +8562,10 @@ where
85588562 logger,
85598563 );
85608564 (
8561- self.push_ret_blockable_mon_update(monitor_update)
8562- .map(|upd| (upd, monitor_events_to_ack)),
8565+ self.push_ret_blockable_mon_update_with_event_sources(
8566+ monitor_update,
8567+ monitor_events_to_ack,
8568+ ),
85638569 htlcs_to_fail,
85648570 )
85658571 } else {
@@ -8901,13 +8907,14 @@ where
89018907 let mut monitor_events_to_ack = Vec::new();
89028908 macro_rules! return_with_htlcs_to_fail {
89038909 ($htlcs_to_fail: expr) => {
8910+ let events_to_ack = core::mem::take(&mut monitor_events_to_ack);
89048911 if !release_monitor {
8905- self.context
8906- .blocked_monitor_updates
8907- .push(PendingChannelMonitorUpdate { update: monitor_update });
8912+ self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
8913+ update: monitor_update,
8914+ post_update_ackable_events: events_to_ack,
8915+ });
89088916 return Ok(($htlcs_to_fail, static_invoices, None));
89098917 } else {
8910- let events_to_ack = core::mem::take(&mut monitor_events_to_ack);
89118918 return Ok((
89128919 $htlcs_to_fail,
89138920 static_invoices,
@@ -11001,12 +11008,14 @@ where
1100111008
1100211009 /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
1100311010 /// further blocked monitor update exists after the next.
11004- pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, bool)> {
11011+ pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(ChannelMonitorUpdate, Vec<MonitorEventSource>, bool)> {
1100511012 if self.context.blocked_monitor_updates.is_empty() {
1100611013 return None;
1100711014 }
11015+ let pending = self.context.blocked_monitor_updates.remove(0);
1100811016 Some((
11009- self.context.blocked_monitor_updates.remove(0).update,
11017+ pending.update,
11018+ pending.post_update_ackable_events,
1101011019 !self.context.blocked_monitor_updates.is_empty(),
1101111020 ))
1101211021 }
@@ -11016,14 +11025,24 @@ where
1101611025 #[rustfmt::skip]
1101711026 fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
1101811027 -> Option<ChannelMonitorUpdate> {
11028+ self.push_ret_blockable_mon_update_with_event_sources(update, Vec::new())
11029+ .map(|(upd, _)| upd)
11030+ }
11031+
11032+ /// Similar to `push_ret_blockable_mon_update`, but allows including a list of
11033+ /// `MonitorEventSource`s that can be ack'd after the `update` is durably persisted.
11034+ fn push_ret_blockable_mon_update_with_event_sources(
11035+ &mut self, update: ChannelMonitorUpdate, monitor_event_sources: Vec<MonitorEventSource>,
11036+ ) -> Option<(ChannelMonitorUpdate, Vec<MonitorEventSource>)> {
1101911037 let release_monitor = self.context.blocked_monitor_updates.is_empty();
1102011038 if !release_monitor {
1102111039 self.context.blocked_monitor_updates.push(PendingChannelMonitorUpdate {
1102211040 update,
11041+ post_update_ackable_events: monitor_event_sources,
1102311042 });
1102411043 None
1102511044 } else {
11026- Some(update)
11045+ Some(( update, monitor_event_sources) )
1102711046 }
1102811047 }
1102911048
@@ -11032,19 +11051,22 @@ where
1103211051 /// here after logging them.
1103311052 pub fn on_startup_drop_completed_blocked_mon_updates_through<L: Logger>(
1103411053 &mut self, logger: &L, loaded_mon_update_id: u64,
11035- ) {
11036- self.context.blocked_monitor_updates.retain(|update| {
11054+ ) -> Vec<MonitorEventSource> {
11055+ let mut monitor_events_to_ack = Vec::new();
11056+ self.context.blocked_monitor_updates.retain_mut(|update| {
1103711057 if update.update.update_id <= loaded_mon_update_id {
1103811058 log_info!(
1103911059 logger,
1104011060 "Dropping completed ChannelMonitorUpdate id {} due to a stale ChannelManager",
1104111061 update.update.update_id,
1104211062 );
11063+ monitor_events_to_ack.extend(core::mem::take(&mut update.post_update_ackable_events));
1104311064 false
1104411065 } else {
1104511066 true
1104611067 }
1104711068 });
11069+ monitor_events_to_ack
1104811070 }
1104911071
1105011072 pub fn blocked_monitor_updates_pending(&self) -> usize {
0 commit comments