Skip to content
23 changes: 12 additions & 11 deletions docs/REPLICATION_DESIGN.md

Large diffs are not rendered by default.

148 changes: 141 additions & 7 deletions src/replication/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! Challenge-response for claimed holders. Anti-outsourcing protection.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::logging::{debug, info, warn};
Expand All @@ -15,10 +15,13 @@ use crate::replication::protocol::{
compute_audit_digest, AuditChallenge, AuditResponse, ReplicationMessage,
ReplicationMessageBody, ABSENT_KEY_DIGEST,
};
use crate::replication::types::{AuditFailureReason, FailureEvidence, PeerSyncRecord};
use crate::replication::types::{
AuditFailureReason, FailureEvidence, PeerSyncRecord, RepairProofs,
};
use crate::storage::LmdbStorage;
use saorsa_core::identity::PeerId;
use saorsa_core::P2PNode;
use tokio::sync::RwLock;

// ---------------------------------------------------------------------------
// Audit tick result
Expand Down Expand Up @@ -61,13 +64,42 @@ pub enum AuditTickResult {
/// **Invariant 19**: Returns [`AuditTickResult::Idle`] immediately if
/// `is_bootstrapping` is `true` — a node must not audit others while it
/// is still bootstrapping.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
#[allow(clippy::implicit_hasher)]
pub async fn audit_tick(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
is_bootstrapping: bool,
) -> AuditTickResult {
let repair_proofs = Arc::new(RwLock::new(RepairProofs::new()));
audit_tick_with_repair_proofs(
p2p_node,
storage,
config,
sync_history,
&repair_proofs,
0,
is_bootstrapping,
)
.await
}

/// Execute one repair-proof-gated audit tick.
///
/// This is the production path used by the replication engine. The
/// compatibility [`audit_tick`] wrapper passes an empty proof table, so direct
/// callers that have not adopted repair proofs remain conservative and do not
/// audit peers for unproven keys.
#[allow(clippy::implicit_hasher, clippy::too_many_lines)]
pub async fn audit_tick_with_repair_proofs(
p2p_node: &Arc<P2PNode>,
storage: &Arc<LmdbStorage>,
config: &ReplicationConfig,
sync_history: &HashMap<PeerId, PeerSyncRecord>,
repair_proofs: &Arc<RwLock<RepairProofs>>,
current_sync_epoch: u64,
is_bootstrapping: bool,
) -> AuditTickResult {
// Invariant 19: never audit while still bootstrapping.
if is_bootstrapping {
Expand Down Expand Up @@ -119,17 +151,30 @@ pub async fn audit_tick(
.collect()
};

// Step 4: Filter to keys where the chosen peer is in the close group.
let mut peer_keys = Vec::new();
// Step 4: Filter to keys where the chosen peer is in the close group and
// this node has proof that it already sent the peer a repair hint for the
// specific key.
let mut sampled_key_groups = Vec::new();
for key in &sampled_keys {
let closest = dht
.find_closest_nodes_local_with_self(key, config.close_group_size)
.await;
if closest.iter().any(|n| n.peer_id == challenged_peer) {
peer_keys.push(*key);
let close_peers: HashSet<PeerId> = closest.iter().map(|node| node.peer_id).collect();
if close_peers.contains(&challenged_peer) {
sampled_key_groups.push((*key, close_peers));
}
}

let peer_keys = {
let mut proofs = repair_proofs.write().await;
mature_audit_keys_for_peer(
&challenged_peer,
sampled_key_groups,
&mut proofs,
current_sync_epoch,
)
};

if peer_keys.is_empty() {
return AuditTickResult::Idle;
}
Expand Down Expand Up @@ -299,6 +344,22 @@ fn eligible_audit_peers(sync_history: &HashMap<PeerId, PeerSyncRecord>) -> Vec<P
.collect()
}

fn mature_audit_keys_for_peer(
challenged_peer: &PeerId,
sampled_key_groups: Vec<(XorName, HashSet<PeerId>)>,
repair_proofs: &mut RepairProofs,
current_sync_epoch: u64,
) -> Vec<XorName> {
sampled_key_groups
.into_iter()
.filter_map(|(key, close_peers)| {
repair_proofs
.has_mature_replica_hint(challenged_peer, &key, &close_peers, current_sync_epoch)
.then_some(key)
})
.collect()
}

// ---------------------------------------------------------------------------
// Digest verification
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1133,6 +1194,79 @@ mod tests {
);
}

#[test]
fn audit_key_filter_retains_stable_proofs_and_rejects_evicted_peers() {
const HINT_EPOCH: u64 = 7;
const CURRENT_EPOCH: u64 = HINT_EPOCH + 1;
const CHALLENGED_PEER_BYTE: u8 = 0xA1;
const OTHER_PEER_BYTE: u8 = 0xA2;
const NEW_PEER_BYTE: u8 = 0xA3;
const MATURE_KEY_BYTE: u8 = 0xB1;
const SAME_EPOCH_KEY_BYTE: u8 = 0xB2;
const MISSING_PROOF_KEY_BYTE: u8 = 0xB3;
const STABLE_CHURN_KEY_BYTE: u8 = 0xB4;
const EVICTED_KEY_BYTE: u8 = 0xB5;
const XOR_NAME_LEN: usize = 32;

let challenged_peer = peer_id_from_bytes([CHALLENGED_PEER_BYTE; XOR_NAME_LEN]);
let other_peer = peer_id_from_bytes([OTHER_PEER_BYTE; XOR_NAME_LEN]);
let new_peer = peer_id_from_bytes([NEW_PEER_BYTE; XOR_NAME_LEN]);
let mature_key = [MATURE_KEY_BYTE; XOR_NAME_LEN];
let same_epoch_key = [SAME_EPOCH_KEY_BYTE; XOR_NAME_LEN];
let missing_proof_key = [MISSING_PROOF_KEY_BYTE; XOR_NAME_LEN];
let stable_churn_key = [STABLE_CHURN_KEY_BYTE; XOR_NAME_LEN];
let evicted_key = [EVICTED_KEY_BYTE; XOR_NAME_LEN];
let close_group = HashSet::from([challenged_peer, other_peer]);
let changed_close_group = HashSet::from([challenged_peer, new_peer]);
let evicted_close_group = HashSet::from([other_peer, new_peer]);
let mut repair_proofs = RepairProofs::new();

assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
mature_key,
&close_group,
HINT_EPOCH,
));
assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
same_epoch_key,
&close_group,
CURRENT_EPOCH,
));
assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
stable_churn_key,
&close_group,
HINT_EPOCH,
));
assert!(repair_proofs.record_replica_hint_sent(
challenged_peer,
evicted_key,
&close_group,
HINT_EPOCH,
));

let sampled_key_groups = vec![
(mature_key, close_group.clone()),
(same_epoch_key, close_group.clone()),
(missing_proof_key, close_group.clone()),
(stable_churn_key, changed_close_group),
(evicted_key, evicted_close_group),
];
let peer_keys = mature_audit_keys_for_peer(
&challenged_peer,
sampled_key_groups,
&mut repair_proofs,
CURRENT_EPOCH,
);

assert_eq!(
peer_keys,
vec![mature_key, stable_churn_key],
"mature proofs for stable close-group peers should become audit keys, while same-epoch, missing, and evicted-peer proofs should not"
);
}

// -- Audit response must match key count --------------------------------------

#[tokio::test]
Expand Down
Loading
Loading