Skip to content

Commit 9329ccd

Browse files
author
techartdev
committed
fix: community browse discovers members via DHT
- Add CommunityMembers wire type stored under community_info_key(share_id) - Validate CommunityMembers entries in validate_dht_value_for_known_keyspaces - Merge member lists on dht_store receipt (relay accumulates all members) - Announce self as community member in DHT when joining/creating a community - Re-announce community memberships after relay tunnel registration - browse_community now does DHT find_value to discover member peer addresses - browse_community includes local node's own published community shares - Add 4 tests: store/find, wrong-key rejection, merge-on-store, upsert roundtrip Previously browse_community only queried bootstrap peers (relay) which typically has NOT joined the community, returning 0 participants / 0 shares. Now community members announce themselves via the DHT so other members can discover and query them.
1 parent 1558767 commit 9329ccd

14 files changed

Lines changed: 459 additions & 87 deletions

File tree

crates/scp2p-cli/src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ async fn main() -> anyhow::Result<()> {
5555
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level));
5656
fmt().with_env_filter(filter).with_target(false).init();
5757

58-
let quic_port = args.quic_port.unwrap_or_else(|| args.port.saturating_sub(1));
58+
let quic_port = args
59+
.quic_port
60+
.unwrap_or_else(|| args.port.saturating_sub(1));
5961
shell::run(args.db, args.bootstrap, args.port, quic_port).await
6062
}

crates/scp2p-cli/src/shell.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ use inquire::{InquireError, Select, Text};
1414
use rand::rngs::OsRng;
1515
use scp2p_core::{
1616
BoxedStream, Capabilities, FetchPolicy, Node, NodeConfig, NodeId, OwnedRelayAwareTransport,
17-
PeerAddr, PeerConnector, PeerRecord, PersistedCommunity, RelayAwareTransport,
18-
RequestTransport, SearchQuery, ShareId, ShareVisibility, SqliteStore, Store,
19-
TransportProtocol, build_tls_server_handle, quic_connect_bi_session_insecure,
20-
start_quic_server, tls_connect_session_insecure,
17+
PeerAddr, PeerConnector, PeerRecord, PersistedCommunity, RelayAwareTransport, RequestTransport,
18+
SearchQuery, ShareId, ShareVisibility, SqliteStore, Store, TransportProtocol,
19+
build_tls_server_handle, quic_connect_bi_session_insecure, start_quic_server,
20+
tls_connect_session_insecure,
2121
};
2222
use tracing::{info, warn};
2323

@@ -107,7 +107,12 @@ impl Ctx {
107107

108108
// ── Entry point ───────────────────────────────────────────────────────────────
109109

110-
pub async fn run(db: String, bootstrap_raw: Vec<String>, port: u16, quic_port: u16) -> anyhow::Result<()> {
110+
pub async fn run(
111+
db: String,
112+
bootstrap_raw: Vec<String>,
113+
port: u16,
114+
quic_port: u16,
115+
) -> anyhow::Result<()> {
111116
let pb = spinner("Opening database…");
112117
let store = SqliteStore::open(&db)?;
113118
let config = NodeConfig {
@@ -205,16 +210,11 @@ pub async fn run(db: String, bootstrap_raw: Vec<String>, port: u16, quic_port: u
205210
relay_via: None,
206211
})
207212
.await;
208-
let _ = tunnel_handle
209-
.reannounce_content_providers(self_addr)
210-
.await;
213+
let _ = tunnel_handle.reannounce_content_providers(self_addr).await;
211214
}
212215
// Push to relay immediately.
213216
let _ = tunnel_handle
214-
.dht_republish_once(
215-
tunnel_transport.as_ref(),
216-
&tunnel_bootstrap,
217-
)
217+
.dht_republish_once(tunnel_transport.as_ref(), &tunnel_bootstrap)
218218
.await;
219219
break;
220220
}
@@ -317,7 +317,11 @@ async fn cmd_status(ctx: &Ctx) -> anyhow::Result<()> {
317317
println!(" TCP port : {}", ctx.port);
318318
println!(
319319
" QUIC port: {}",
320-
if ctx.quic_port > 0 { ctx.quic_port.to_string() } else { "disabled".to_owned() }
320+
if ctx.quic_port > 0 {
321+
ctx.quic_port.to_string()
322+
} else {
323+
"disabled".to_owned()
324+
}
321325
);
322326
println!(" Subscriptions : {}", s.subscriptions.len());
323327
println!(" Manifests : {}", s.manifests.len());
@@ -1125,7 +1129,11 @@ fn print_banner(ctx: &Ctx) {
11251129
println!(" TCP port : {}", ctx.port);
11261130
println!(
11271131
" QUIC port: {}",
1128-
if ctx.quic_port > 0 { ctx.quic_port.to_string() } else { "disabled".to_owned() }
1132+
if ctx.quic_port > 0 {
1133+
ctx.quic_port.to_string()
1134+
} else {
1135+
"disabled".to_owned()
1136+
}
11291137
);
11301138
println!(
11311139
" Network : {}",

crates/scp2p-core/examples/quic_probe.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
//! Usage: cargo run --example quic_probe -- 178.104.13.182:7000
33
use std::net::SocketAddr;
44

5-
use scp2p_core::transport_net::{quic_connect_bi_session_insecure, tls_connect_session_insecure};
6-
use scp2p_core::capabilities::Capabilities;
75
use ed25519_dalek::SigningKey;
86
use rand::rngs::OsRng;
7+
use scp2p_core::capabilities::Capabilities;
8+
use scp2p_core::transport_net::{quic_connect_bi_session_insecure, tls_connect_session_insecure};
99

1010
#[tokio::main]
1111
async fn main() {
@@ -21,7 +21,10 @@ async fn main() {
2121
match quic_connect_bi_session_insecure(addr, &key, Capabilities::default(), None).await {
2222
Ok(session) => {
2323
println!("QUIC SUCCESS — connected");
24-
println!(" remote pubkey: {}", hex::encode(session.session.remote_node_pubkey));
24+
println!(
25+
" remote pubkey: {}",
26+
hex::encode(session.session.remote_node_pubkey)
27+
);
2528
}
2629
Err(e) => {
2730
println!("QUIC FAILED — {e:#}");
@@ -35,7 +38,10 @@ async fn main() {
3538
match tls_connect_session_insecure(tcp_addr, &key2, Capabilities::default(), None).await {
3639
Ok((_stream, session)) => {
3740
println!("TCP SUCCESS — connected");
38-
println!(" remote pubkey: {}", hex::encode(session.remote_node_pubkey));
41+
println!(
42+
" remote pubkey: {}",
43+
hex::encode(session.remote_node_pubkey)
44+
);
3945
}
4046
Err(e) => {
4147
println!("TCP FAILED — {e:#}");

crates/scp2p-core/src/api/helpers.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
};
1515

1616
use crate::{
17-
dht_keys::{content_provider_key, manifest_loc_key, share_head_key},
17+
dht_keys::{community_info_key, content_provider_key, manifest_loc_key, share_head_key},
1818
ids::{NodeId, ShareId},
1919
manifest::{ManifestV1, PublicShareSummary, ShareHead},
2020
net_fetch::RequestTransport,
@@ -25,10 +25,11 @@ use crate::{
2525
},
2626
search::IndexedItem,
2727
wire::{
28-
CommunityPublicShareList, CommunityStatus, Envelope, FLAG_RESPONSE, FindNode,
29-
FindNodeResult, FindValue, FindValueResult, GetCommunityStatus, ListCommunityPublicShares,
30-
ListPublicShares, MsgType, Providers, PublicShareList, RelayListRequest, RelayListResponse,
31-
RelayPayloadKind as WireRelayPayloadKind, Store as WireStore, WirePayload,
28+
CommunityMembers, CommunityPublicShareList, CommunityStatus, Envelope, FLAG_RESPONSE,
29+
FindNode, FindNodeResult, FindValue, FindValueResult, GetCommunityStatus,
30+
ListCommunityPublicShares, ListPublicShares, MsgType, Providers, PublicShareList,
31+
RelayListRequest, RelayListResponse, RelayPayloadKind as WireRelayPayloadKind,
32+
Store as WireStore, WirePayload,
3233
},
3334
};
3435

@@ -181,6 +182,14 @@ pub(super) fn validate_dht_value_for_known_keyspaces(
181182
}
182183
anyhow::bail!("relay announcement key does not match any valid rendezvous slot");
183184
}
185+
// Community member lists are stored at community_info_key(share_id).
186+
if let Ok(cm) = crate::cbor::from_slice::<CommunityMembers>(value) {
187+
let expected = community_info_key(&ShareId(cm.community_share_id));
188+
if expected != key {
189+
anyhow::bail!("community members value does not match community info key");
190+
}
191+
return Ok(());
192+
}
184193
// Reject values that do not match any recognized keyspace to prevent
185194
// arbitrary data storage and potential abuse (§4.5).
186195
anyhow::bail!("DHT value does not match any recognized keyspace")

crates/scp2p-core/src/api/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,11 +1455,7 @@ impl NodeHandle {
14551455
/// Update the locally stored name for a community.
14561456
///
14571457
/// Called when a remote peer reports the community name during browse.
1458-
pub async fn update_community_name(
1459-
&self,
1460-
share_id: ShareId,
1461-
name: &str,
1462-
) -> anyhow::Result<()> {
1458+
pub async fn update_community_name(&self, share_id: ShareId, name: &str) -> anyhow::Result<()> {
14631459
let mut state = self.state.write().await;
14641460
if let Some(membership) = state.communities.get_mut(&share_id.0)
14651461
&& membership.name.is_none()

crates/scp2p-core/src/api/node_dht.rs

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ use crate::transport_net::{
1919
use crate::{
2020
capabilities::Capabilities,
2121
dht::{ALPHA, DEFAULT_TTL_SECS, DhtInsertResult, DhtNodeRecord, DhtValue, K, MAX_VALUE_SIZE},
22-
dht_keys::share_head_key,
22+
dht_keys::{community_info_key, share_head_key},
2323
ids::{NodeId, ShareId},
2424
manifest::ShareHead,
2525
net_fetch::RequestTransport,
2626
peer::PeerAddr,
27-
wire::{FindNode, Store as WireStore},
27+
wire::{CommunityMembers, FindNode, Store as WireStore},
2828
};
2929

3030
use super::{
@@ -101,13 +101,12 @@ impl NodeHandle {
101101

102102
pub async fn dht_store(&self, req: WireStore) -> anyhow::Result<()> {
103103
validate_dht_value_for_known_keyspaces(req.key, &req.value)?;
104+
let now = now_unix_secs()?;
105+
let value = merge_community_members_if_applicable(req.key, req.value, self, now).await;
104106
let mut state = self.state.write().await;
105-
state.dht.store(
106-
req.key,
107-
req.value,
108-
req.ttl_secs.max(DEFAULT_TTL_SECS),
109-
now_unix_secs()?,
110-
)
107+
state
108+
.dht
109+
.store(req.key, value, req.ttl_secs.max(DEFAULT_TTL_SECS), now)
111110
}
112111

113112
pub async fn dht_find_value(&self, key: [u8; 32]) -> anyhow::Result<Option<DhtValue>> {
@@ -610,4 +609,102 @@ impl NodeHandle {
610609
merge_peer_list(&mut peers, known);
611610
peers
612611
}
612+
613+
/// Insert or update the local DHT entry for a community the node has
614+
/// joined so that other peers can discover this node as a community
615+
/// member via `community_info_key(share_id)`.
616+
pub async fn upsert_community_member(
617+
&self,
618+
community_share_id: ShareId,
619+
self_addr: PeerAddr,
620+
) -> anyhow::Result<()> {
621+
let key = community_info_key(&community_share_id);
622+
let now = now_unix_secs()?;
623+
let mut state = self.state.write().await;
624+
let mut cm: CommunityMembers = state
625+
.dht
626+
.find_value(key, now)
627+
.and_then(|v| crate::cbor::from_slice(&v.value).ok())
628+
.unwrap_or(CommunityMembers {
629+
community_share_id: community_share_id.0,
630+
members: vec![],
631+
updated_at: now,
632+
});
633+
if !cm.members.contains(&self_addr) {
634+
cm.members.push(self_addr);
635+
}
636+
cm.updated_at = now;
637+
state
638+
.dht
639+
.store(key, crate::cbor::to_vec(&cm)?, DEFAULT_TTL_SECS, now)?;
640+
Ok(())
641+
}
642+
643+
/// Re-announce DHT community member entries for all joined communities.
644+
///
645+
/// Called during `dht_republish_once` to keep community member
646+
/// announcements fresh and ensure they survive app restarts.
647+
pub async fn reannounce_community_memberships(
648+
&self,
649+
self_addr: PeerAddr,
650+
) -> anyhow::Result<usize> {
651+
let community_ids: Vec<[u8; 32]> = {
652+
let state = self.state.read().await;
653+
state.communities.keys().copied().collect()
654+
};
655+
let mut count = 0usize;
656+
for cid in community_ids {
657+
if let Err(e) = self
658+
.upsert_community_member(ShareId(cid), self_addr.clone())
659+
.await
660+
{
661+
debug!(
662+
community = %hex::encode(&cid[..8]),
663+
error = %e,
664+
"reannounce_community_memberships: failed"
665+
);
666+
} else {
667+
count += 1;
668+
}
669+
}
670+
if count > 0 {
671+
debug!(count, "reannounce_community_memberships: updated");
672+
}
673+
Ok(count)
674+
}
675+
}
676+
677+
/// If `value` deserializes as [`CommunityMembers`], merge its member list
678+
/// with any existing entry already stored in the local DHT at `key`.
679+
/// Otherwise return `value` unchanged.
680+
async fn merge_community_members_if_applicable(
681+
key: [u8; 32],
682+
value: Vec<u8>,
683+
node: &NodeHandle,
684+
now: u64,
685+
) -> Vec<u8> {
686+
let Ok(incoming) = crate::cbor::from_slice::<CommunityMembers>(&value) else {
687+
return value;
688+
};
689+
let expected = community_info_key(&ShareId(incoming.community_share_id));
690+
if expected != key {
691+
return value;
692+
}
693+
let existing = {
694+
let mut state = node.state.write().await;
695+
state
696+
.dht
697+
.find_value(key, now)
698+
.and_then(|v| crate::cbor::from_slice::<CommunityMembers>(&v.value).ok())
699+
};
700+
let Some(mut merged) = existing else {
701+
return value;
702+
};
703+
for member in incoming.members {
704+
if !merged.members.contains(&member) {
705+
merged.members.push(member);
706+
}
707+
}
708+
merged.updated_at = merged.updated_at.max(incoming.updated_at);
709+
crate::cbor::to_vec(&merged).unwrap_or(value)
613710
}

crates/scp2p-core/src/api/node_net.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,28 +1023,29 @@ impl NodeHandle {
10231023
payload,
10241024
}),
10251025
WirePayload::GetCommunityStatus(msg) => {
1026-
let (joined, proof, name) = match VerifyingKey::from_bytes(&msg.community_share_pubkey) {
1027-
Ok(pubkey) if ShareId::from_pubkey(&pubkey).0 == msg.community_share_id => {
1028-
let state = self.state.read().await;
1029-
match state.communities.get(&msg.community_share_id) {
1030-
Some(membership) => {
1031-
let proof = membership
1032-
.token
1033-
.as_ref()
1034-
.and_then(|t| crate::cbor::to_vec(t).ok());
1035-
(true, proof, membership.name.clone())
1026+
let (joined, proof, name) =
1027+
match VerifyingKey::from_bytes(&msg.community_share_pubkey) {
1028+
Ok(pubkey) if ShareId::from_pubkey(&pubkey).0 == msg.community_share_id => {
1029+
let state = self.state.read().await;
1030+
match state.communities.get(&msg.community_share_id) {
1031+
Some(membership) => {
1032+
let proof = membership
1033+
.token
1034+
.as_ref()
1035+
.and_then(|t| crate::cbor::to_vec(t).ok());
1036+
(true, proof, membership.name.clone())
1037+
}
1038+
None => (false, None, None),
10361039
}
1037-
None => (false, None, None),
10381040
}
1039-
}
1040-
_ => {
1041-
return Some(error_envelope(
1042-
req_type,
1043-
req_id,
1044-
"community share_id does not match share_pubkey",
1045-
));
1046-
}
1047-
};
1041+
_ => {
1042+
return Some(error_envelope(
1043+
req_type,
1044+
req_id,
1045+
"community share_id does not match share_pubkey",
1046+
));
1047+
}
1048+
};
10481049
crate::cbor::to_vec(&CommunityStatus {
10491050
community_share_id: msg.community_share_id,
10501051
joined,
@@ -1523,7 +1524,10 @@ impl NodeHandle {
15231524
}
15241525

15251526
if refreshed > 0 {
1526-
debug!(refreshed, "reannounce_published_share_data: DHT entries restored");
1527+
debug!(
1528+
refreshed,
1529+
"reannounce_published_share_data: DHT entries restored"
1530+
);
15271531
}
15281532

15291533
Ok(refreshed)

crates/scp2p-core/src/api/node_publish.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,8 @@ impl NodeHandle {
240240
chunk_list_hash: desc.chunk_list_hash,
241241
});
242242
// Register using the already-computed descriptor — no re-read.
243-
self.register_content_precomputed(
244-
provider.clone(),
245-
desc,
246-
file_path.to_path_buf(),
247-
)
248-
.await?;
243+
self.register_content_precomputed(provider.clone(), desc, file_path.to_path_buf())
244+
.await?;
249245

250246
if total > 50 && (idx + 1) % 500 == 0 {
251247
info!(
@@ -349,10 +345,7 @@ impl NodeHandle {
349345
///
350346
/// Called after relay tunnel registration so that provider entries
351347
/// contain the relayed address, enabling NAT-traversed downloads.
352-
pub async fn reannounce_content_providers(
353-
&self,
354-
self_addr: PeerAddr,
355-
) -> anyhow::Result<usize> {
348+
pub async fn reannounce_content_providers(&self, self_addr: PeerAddr) -> anyhow::Result<usize> {
356349
let now = now_unix_secs()?;
357350
let mut state = self.state.write().await;
358351

0 commit comments

Comments
 (0)