diff --git a/bin/ethlambda/src/cli.rs b/bin/ethlambda/src/cli.rs index 848a2446..f4f97cab 100644 --- a/bin/ethlambda/src/cli.rs +++ b/bin/ethlambda/src/cli.rs @@ -84,4 +84,17 @@ pub(crate) struct CliOptions { /// but it no longer suppresses any duty: the gate becomes observe-only. #[arg(long, default_value = "false")] pub(crate) disable_duty_sync_gate: bool, + /// Enable proposer-side aggregation of attestation proofs when building a + /// block. + /// + /// A block may carry at most one entry per `AttestationData`, so the + /// proposer must collapse same-data proofs either way. When set, + /// `build_block` merges them via recursive Type-1 aggregation into a single + /// union-coverage proof per data (leanSpec #510), maximizing voter coverage + /// at the cost of a leanVM aggregation per duplicated data entry. When unset + /// (the default), it instead keeps only the single best-coverage proof per + /// data and drops the rest, skipping the leanVM work at the cost of lower + /// coverage. + #[arg(long, default_value = "false")] + pub(crate) enable_proposer_aggregation: bool, } diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index d85b3e8d..591cdcfd 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -215,6 +215,7 @@ async fn main() -> eyre::Result<()> { aggregator.clone(), attestation_committee_count, !options.disable_duty_sync_gate, + options.enable_proposer_aggregation, ); // Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the diff --git a/crates/blockchain/src/block_builder.rs b/crates/blockchain/src/block_builder.rs index 722d98d7..208481c2 100644 --- a/crates/blockchain/src/block_builder.rs +++ b/crates/blockchain/src/block_builder.rs @@ -45,9 +45,23 @@ pub struct PostBlockCheckpoints { /// Build a valid block on top of this state. /// -/// Selects attestations via `select_attestations`, compacts duplicate -/// `AttestationData` entries, and runs the STF once to seal the state root. -/// The proposer signature is NOT included; it is appended by the caller. +/// Selects attestations via `select_attestations`, collapses entries sharing +/// the same `AttestationData` down to one (a block may carry at most one entry +/// per data; `on_block` rejects duplicates), and runs the STF once to seal the +/// state root. The proposer signature is NOT included; it is appended by the +/// caller. +/// +/// The collapse strategy is gated by `enable_proposer_aggregation`: +/// - **enabled**: same-data proofs are merged via recursive Type-1 aggregation +/// into a single union-coverage proof (leanSpec #510). Maximizes voter +/// coverage per entry at the cost of a leanVM aggregation per duplicated +/// data entry. +/// - **disabled** (default): the single best-coverage proof per data is kept +/// and the rest dropped. Skips the leanVM work; coverage is bounded by the +/// best individual proof. +/// +/// Either way the output has one entry per `AttestationData` and the +/// attestation-to-proof correspondence stays 1:1. pub(crate) fn build_block( head_state: &State, slot: u64, @@ -55,6 +69,7 @@ pub(crate) fn build_block( parent_root: H256, known_block_roots: &HashSet, aggregated_payloads: &HashMap)>, + enable_proposer_aggregation: bool, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { info!(slot, proposer_index, "Building block"); @@ -70,10 +85,19 @@ pub(crate) fn build_block( let child_payloads_consumed = selected.len(); - // Compact: merge proofs sharing the same AttestationData via recursive - // aggregation so each AttestationData appears at most once (leanSpec #510). + // Each AttestationData may appear at most once per block (`on_block` + // rejects duplicates), so same-data entries must be collapsed to one. + // Gated by `enable_proposer_aggregation`: when enabled, proofs sharing an + // AttestationData are merged via recursive Type-1 aggregation into a + // union-coverage proof (leanSpec #510); when disabled, we skip that leanVM + // work and keep only the single best-coverage proof per data. Both paths + // log the entry / unique-entry counts they already compute. let compact_start = Instant::now(); - let compacted = compact_attestations(selected, head_state)?; + let compacted = if enable_proposer_aggregation { + compact_attestations(selected, head_state, slot)? + } else { + keep_best_proof_per_data(selected, slot) + }; metrics::observe_block_proposal_phase("compact", compact_start.elapsed()); let (aggregated_attestations, aggregated_signatures): (Vec<_>, Vec<_>) = @@ -517,11 +541,8 @@ fn build_running_votes(state: &State) -> HashMap> { fn compact_attestations( entries: Vec<(AggregatedAttestation, TypeOneMultiSignature)>, head_state: &State, + block_slot: u64, ) -> Result, StoreError> { - if entries.len() <= 1 { - return Ok(entries); - } - // Group indices by AttestationData, preserving first-occurrence order let mut order: Vec = Vec::new(); let mut groups: HashMap> = HashMap::new(); @@ -537,11 +558,18 @@ fn compact_attestations( } } - // Fast path: no duplicates + // Fast path: every AttestationData already appears once (covers ≤1 entry). if order.len() == entries.len() { return Ok(entries); } + info!( + slot = block_slot, + entries = entries.len(), + unique = order.len(), + "Compacting attestations" + ); + // Wrap in Option so we can .take() items by index without cloning let mut items: Vec> = entries.into_iter().map(Some).collect(); @@ -600,9 +628,67 @@ fn compact_attestations( compacted.push((merged_att, merged_proof)); } + info!(slot = block_slot, "Finished compacting attestations"); Ok(compacted) } +/// Reduce same-data entries to a single best proof each, without aggregation. +/// +/// The block format permits at most one entry per `AttestationData`: `on_block` +/// rejects duplicates (`StoreError::DuplicateAttestationData`). When proposer +/// aggregation is disabled we therefore cannot keep every selected proof, nor +/// can we merge them. For each group sharing an `AttestationData` we keep the +/// single proof covering the most validators (ties broken by first occurrence) +/// and drop the rest. No leanVM aggregation runs; coverage is whatever the best +/// individual proof already had, which is the cost of skipping aggregation. +fn keep_best_proof_per_data( + entries: Vec<(AggregatedAttestation, TypeOneMultiSignature)>, + block_slot: u64, +) -> Vec<(AggregatedAttestation, TypeOneMultiSignature)> { + // Preserve first-occurrence order of distinct AttestationData; for each, + // track the index of the best (most participants) entry seen so far. + let mut order: Vec = Vec::new(); + let mut best_index: HashMap = HashMap::new(); + for (i, (att, _)) in entries.iter().enumerate() { + match best_index.entry(att.data.clone()) { + std::collections::hash_map::Entry::Vacant(e) => { + order.push(e.key().clone()); + e.insert(i); + } + std::collections::hash_map::Entry::Occupied(mut e) => { + let current_best = entries[*e.get()].0.aggregation_bits.count_ones(); + if att.aggregation_bits.count_ones() > current_best { + e.insert(i); + } + } + } + } + + // Fast path: every AttestationData already appeared exactly once (covers + // ≤1 entry). + if order.len() == entries.len() { + return entries; + } + + info!( + slot = block_slot, + entries = entries.len(), + unique = order.len(), + "Skipping attestation compaction" + ); + + let mut items: Vec> = + entries.into_iter().map(Some).collect(); + order + .iter() + .map(|data| { + items[best_index[data]] + .take() + .expect("best index taken once") + }) + .collect() +} + /// Greedily select proofs maximizing new validator coverage. /// /// For a single attestation data entry, picks proofs that cover the most @@ -905,6 +991,7 @@ mod tests { parent_root, &known_block_roots, &aggregated_payloads, + true, ) .expect("build_block should succeed"); @@ -941,6 +1028,135 @@ mod tests { ); } + /// With proposer aggregation disabled, `build_block` must still emit at + /// most one entry per `AttestationData` (`on_block` rejects duplicates), + /// keeping the single best-coverage proof and dropping the rest rather than + /// recursively aggregating them. This path performs no leanVM aggregation, + /// so empty proof blobs suffice. + #[test] + fn build_block_without_proposer_aggregation_keeps_single_best_proof_per_data() { + use ethlambda_types::{ + block::BlockHeader, + state::{ChainConfig, JustificationValidators, JustifiedSlots}, + }; + use libssz_types::SszList; + + const NUM_VALIDATORS: usize = 4; + const HEAD_SLOT: u64 = 11; + const TARGET_SLOT: u64 = 5; + + let validators: Vec<_> = (0..NUM_VALIDATORS) + .map(|i| ethlambda_types::state::Validator { + attestation_pubkey: [i as u8; 52], + proposal_pubkey: [i as u8; 52], + index: i as u64, + }) + .collect(); + + let hashes: Vec = (0..HEAD_SLOT).map(|i| H256([(i + 1) as u8; 32])).collect(); + let historical_block_hashes = SszList::try_from(hashes.clone()).unwrap(); + + let head_header = BlockHeader { + slot: HEAD_SLOT, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body_root: BlockBody::default().hash_tree_root(), + }; + + let head_state = State { + config: ChainConfig { genesis_time: 1000 }, + slot: HEAD_SLOT, + latest_block_header: head_header, + latest_justified: Checkpoint::default(), + latest_finalized: Checkpoint::default(), + historical_block_hashes, + justified_slots: JustifiedSlots::new(), + validators: SszList::try_from(validators).unwrap(), + justifications_roots: Default::default(), + justifications_validators: JustificationValidators::new(), + }; + + let mut header_for_root = head_state.latest_block_header.clone(); + header_for_root.state_root = head_state.hash_tree_root(); + let parent_root = header_for_root.hash_tree_root(); + + let slot = HEAD_SLOT + 1; + let proposer_index = slot % NUM_VALIDATORS as u64; + + let source = Checkpoint { + root: hashes[0], + slot: 0, + }; + let target = Checkpoint { + root: hashes[TARGET_SLOT as usize], + slot: TARGET_SLOT, + }; + let head = Checkpoint { + root: hashes[0], + slot: 0, + }; + + let mut known_block_roots = HashSet::new(); + known_block_roots.insert(parent_root); + known_block_roots.insert(hashes[0]); + + // A single AttestationData carrying two proofs of different coverage: + // one for validator 0 (count 1) and one for validators {1, 2} + // (count 2). Both are selected by `extend_proofs_greedily` since each + // adds new voters, so the disabled path must reduce them to one. + let att_data = AttestationData { + slot: TARGET_SLOT, + head, + target, + source, + }; + let data_root = att_data.hash_tree_root(); + let proofs = vec![ + TypeOneMultiSignature::empty(make_bits(&[0])), + TypeOneMultiSignature::empty(make_bits(&[1, 2])), + ]; + + let mut aggregated_payloads: HashMap)> = + HashMap::new(); + aggregated_payloads.insert(data_root, (att_data.clone(), proofs)); + + let (block, signatures, _post_checkpoints) = build_block( + &head_state, + slot, + proposer_index, + parent_root, + &known_block_roots, + &aggregated_payloads, + false, + ) + .expect("build_block should succeed"); + + // Exactly one entry per AttestationData survives (no duplicate would + // pass `on_block`), the correspondence stays 1:1, and the entry kept is + // the higher-coverage proof ({1, 2}, two participants). + assert_eq!( + block.body.attestations.len(), + 1, + "a block must carry at most one entry per AttestationData" + ); + assert_eq!( + signatures.len(), + 1, + "one Type-1 proof per attestation entry" + ); + let kept = &block.body.attestations[0]; + assert_eq!( + kept.data, att_data, + "the kept entry carries the shared data" + ); + assert_eq!( + kept.aggregation_bits.count_ones(), + 2, + "the best-coverage proof ({{1, 2}}) is kept over the smaller one ({{0}})" + ); + } + /// Regression test for leanSpec PR #716: build_block must absorb /// gap-closing attestations whose source is justified on the head /// chain but older than `latest_justified` (e.g., a sibling fork @@ -1048,6 +1264,7 @@ mod tests { parent_root, &known_block_roots, &aggregated_payloads, + true, ) .expect("build_block should succeed"); @@ -1180,6 +1397,7 @@ mod tests { parent_root, &known_block_roots, &aggregated_payloads, + true, ) .expect("build_block should succeed"); @@ -1227,7 +1445,7 @@ mod tests { ]; let state = State::from_genesis(1000, vec![]); - let out = compact_attestations(entries, &state).unwrap(); + let out = compact_attestations(entries, &state, 0).unwrap(); assert_eq!(out.len(), 2); assert_eq!(out[0].0.data, data_a); assert_eq!(out[1].0.data, data_b); @@ -1268,7 +1486,7 @@ mod tests { ]; let state = State::from_genesis(1000, vec![]); - let out = compact_attestations(entries, &state).unwrap(); + let out = compact_attestations(entries, &state, 0).unwrap(); assert_eq!(out.len(), 3); assert_eq!(out[0].0.data, data_a); assert_eq!(out[1].0.data, data_b); diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 0fceede2..c3ee9604 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -82,6 +82,7 @@ impl BlockChain { aggregator: AggregatorController, attestation_committee_count: u64, gate_duties: bool, + enable_proposer_aggregation: bool, ) -> BlockChain { metrics::set_is_aggregator(aggregator.is_enabled()); metrics::set_node_sync_status(metrics::SyncStatus::Idle); @@ -106,6 +107,7 @@ impl BlockChain { current_aggregation: None, last_tick_instant: None, attestation_committee_count, + enable_proposer_aggregation, pre_merge_coverage: None, sync_status: SyncStatusTracker::new(gate_duties), } @@ -166,6 +168,15 @@ pub struct BlockChainServer { /// attestation aggregate coverage emission. attestation_committee_count: u64, + /// How the proposer collapses same-data attestations during block building + /// (a block may carry at most one entry per `AttestationData`). When true, + /// same-data proofs are merged via recursive Type-1 aggregation into a + /// union-coverage proof (leanSpec #510); when false (the default), only the + /// single best-coverage proof per data is kept, skipping the per-data + /// leanVM aggregation. Seeded from the CLI `--enable-proposer-aggregation` + /// flag at spawn. + enable_proposer_aggregation: bool, + /// Pre-merge `new_payloads` snapshot for the attestation aggregate coverage /// report. Captured at the end-of-slot promote (interval 4), read at the /// next slot boundary. Owned solely by the actor and only touched from the @@ -493,10 +504,13 @@ impl BlockChainServer { // by the idempotency guard in `on_tick`, since the store clock is already // here. let timing = metrics::time_block_building(); - let Ok((block, type_one_proofs, _post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) - else { + let Ok((block, type_one_proofs, _post_checkpoints)) = store::produce_block_with_signatures( + &mut self.store, + slot, + validator_id, + self.enable_proposer_aggregation, + ) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) else { metrics::inc_block_building_failures(); return; }; diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 0d7b6064..f4799797 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -789,6 +789,7 @@ pub fn produce_block_with_signatures( store: &mut Store, slot: u64, validator_index: u64, + enable_proposer_aggregation: bool, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { // Get parent block and state to build upon let head_root = get_proposal_head(store, slot); @@ -822,6 +823,7 @@ pub fn produce_block_with_signatures( head_root, &known_block_roots, &aggregated_payloads, + enable_proposer_aggregation, )? };