From 763e016fc364ccc4821dbf1f03cdc1ccb3032c12 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Fri, 26 Jun 2026 23:31:48 +0000 Subject: [PATCH 01/12] feat(ambient): implement ambient mode batch flush dispatcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the core Ambient Mode feature as described in the ADR (docs/adr/ambient.md). This adds passive channel listening with a batch flush strategy — messages accumulate in a per-channel buffer and are flushed to the LLM when a time or count trigger fires. Key components: - AmbientConfig: [ambient], [ambient.pool], [ambient.discord] sections - AmbientDispatcher: per-channel mpsc + consumer task management - ambient_consumer_loop: timer/count flush triggers with ±20% jitter - FlushingGuard: RAII + safety timeout for AtomicBool flag - PostGuard: atomic check-and-post to prevent TOCTOU double-reply - Global semaphore (max_concurrent_flushes) for cost control - [NO_REPLY] sentinel detection (is_no_reply helper) Integration: - Discord Handler routes non-mentioned messages in ambient channels to AmbientDispatcher instead of dropping them - @mention in ambient channel discards buffer + cancels in-flight flush - Reactions suppressed for ambient dispatches - Separate session pool (ambient:discord:) Fail-safe defaults: - enabled = false (must opt-in) - channels = [] (explicit allowlist required) - allow_bot_messages = false (prevents echo loops) - flush_timeout_seconds = 120 (auto-recover from stuck state) --- crates/openab-core/src/ambient.rs | 461 ++++++++++++++++++++++++++++++ crates/openab-core/src/config.rs | 118 ++++++++ crates/openab-core/src/discord.rs | 61 ++++ crates/openab-core/src/lib.rs | 2 + src/main.rs | 14 + 5 files changed, 656 insertions(+) create mode 100644 crates/openab-core/src/ambient.rs diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs new file mode 100644 index 000000000..84e6c54b5 --- /dev/null +++ b/crates/openab-core/src/ambient.rs @@ -0,0 +1,461 @@ +//! Ambient Mode — batch flush dispatcher for passive channel listening. +//! +//! See ADR: docs/adr/ambient.md for full design rationale. +//! +//! Ambient mode listens to all messages in configured channels (without @mention) +//! and periodically flushes them as a batch to the LLM. The agent replies only +//! when it has something valuable to add; otherwise it returns `[NO_REPLY]`. + +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use rand::Rng; +use tokio::sync::{Mutex, Semaphore}; +use tracing::{debug, info, warn}; + +use crate::acp::ContentBlock; +use crate::adapter::{ChannelRef, ChatAdapter, MessageRef}; +use crate::config::AmbientConfig; +use crate::dispatch::DispatchTarget; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +/// Sentinel value the agent returns when it has nothing to add. +const NO_REPLY_SENTINEL: &str = "[no_reply]"; + +/// System instruction prepended to every ambient batch. +const AMBIENT_SYSTEM_INSTRUCTION: &str = r#"You are in ambient mode. Below is a batch of recent messages from the channel. You are passively observing the conversation. + +Rules: +- If you have nothing valuable to add, reply EXACTLY: [NO_REPLY] +- Only reply when you can provide meaningful help, context, or corrections +- Do not reply to other bot messages unless directly relevant to a human's question +- Keep replies concise and natural — you are joining an ongoing conversation, not starting one +- Do not acknowledge that you are in ambient mode +"#; + +// --------------------------------------------------------------------------- +// AmbientMessage — lighter than BufferedMessage for ambient buffering +// --------------------------------------------------------------------------- + +/// A single message buffered for ambient dispatch. +#[derive(Debug)] +pub struct AmbientMessage { + /// Serialised SenderContext JSON. + pub sender_json: String, + /// Author display name. + pub sender_name: String, + /// User-visible prompt text. + pub prompt: String, + /// Attachment blocks. + pub extra_blocks: Vec, + /// When this message arrived. + pub arrived_at: Instant, +} + +// --------------------------------------------------------------------------- +// FlushingGuard — RAII guard for the per-channel flushing flag with timeout +// --------------------------------------------------------------------------- + +/// RAII guard that sets an `AtomicBool` to true on creation and resets to false +/// on drop. Also spawns a safety timeout that forcibly resets if the guard is +/// held too long (e.g., consumer panic under a catch_unwind or OOM). +struct FlushingGuard { + flag: Arc, + _timeout_handle: tokio::task::JoinHandle<()>, +} + +impl FlushingGuard { + fn new(flag: Arc, timeout: Duration) -> Self { + flag.store(true, Ordering::Release); + let flag_clone = Arc::clone(&flag); + let handle = tokio::spawn(async move { + tokio::time::sleep(timeout).await; + if flag_clone.swap(false, Ordering::AcqRel) { + warn!("ambient flush timeout exceeded, force-resetting flushing flag"); + } + }); + Self { + flag, + _timeout_handle: handle, + } + } +} + +impl Drop for FlushingGuard { + fn drop(&mut self) { + self.flag.store(false, Ordering::Release); + self._timeout_handle.abort(); + } +} + +// --------------------------------------------------------------------------- +// PostGuard — atomic check-and-post to prevent TOCTOU race +// --------------------------------------------------------------------------- + +/// Per-channel post guard. The ambient consumer acquires it before posting; +/// the mention path invalidates it to cancel an in-flight ambient response. +#[derive(Debug)] +pub struct PostGuard { + cancelled: AtomicBool, +} + +impl PostGuard { + pub fn new() -> Self { + Self { + cancelled: AtomicBool::new(false), + } + } + + /// Cancel any pending ambient post for this channel. + pub fn cancel(&self) { + self.cancelled.store(true, Ordering::Release); + } + + /// Check if posting is still allowed. Returns false if cancelled. + pub fn can_post(&self) -> bool { + !self.cancelled.load(Ordering::Acquire) + } + + /// Reset for next flush cycle. + pub fn reset(&self) { + self.cancelled.store(false, Ordering::Release); + } +} + +// --------------------------------------------------------------------------- +// ChannelState — per-channel ambient state +// --------------------------------------------------------------------------- + +struct ChannelState { + tx: tokio::sync::mpsc::Sender, + flushing: Arc, + post_guard: Arc, + _consumer: tokio::task::JoinHandle<()>, +} + +// --------------------------------------------------------------------------- +// AmbientDispatcher +// --------------------------------------------------------------------------- + +/// Manages ambient mode across all configured channels. +/// +/// Each enabled channel gets its own mpsc channel and consumer task. +/// The consumer accumulates messages and flushes them as a batch when +/// a time or count trigger fires. +pub struct AmbientDispatcher { + config: AmbientConfig, + channels: Mutex>, + /// Channel IDs that have ambient mode enabled (pre-parsed for fast lookup). + enabled_channels: HashSet, + /// Global semaphore limiting concurrent flush operations. + flush_semaphore: Arc, +} + +impl AmbientDispatcher { + /// Create a new AmbientDispatcher from config. + /// + /// Does NOT start consumer tasks — those are spawned lazily on first message + /// or eagerly via `start_channel`. + pub fn new(config: AmbientConfig) -> Self { + let enabled_channels: HashSet = config + .discord + .channels + .iter() + .filter_map(|s| s.parse().ok()) + .collect(); + let flush_semaphore = Arc::new(Semaphore::new(config.max_concurrent_flushes)); + Self { + config, + channels: Mutex::new(HashMap::new()), + enabled_channels, + flush_semaphore, + } + } + + /// Check if ambient mode is active and this channel is in the allowlist. + pub fn is_ambient_channel(&self, channel_id: u64) -> bool { + self.config.enabled && !self.enabled_channels.is_empty() && self.enabled_channels.contains(&channel_id) + } + + /// Whether bot messages are allowed in the ambient buffer. + pub fn allow_bot_messages(&self) -> bool { + self.config.discord.allow_bot_messages + } + + /// Submit a message to the ambient buffer for a channel. + /// + /// Returns Ok(()) if buffered, Err if the channel consumer is dead. + pub async fn submit( + &self, + channel_id: &str, + channel_ref: ChannelRef, + adapter: Arc, + target: Arc, + msg: AmbientMessage, + ) { + let mut channels = self.channels.lock().await; + + // Lazily spawn consumer if not yet started for this channel. + if !channels.contains_key(channel_id) { + let (tx, rx) = tokio::sync::mpsc::channel(self.config.flush_hard_cap); + let flushing = Arc::new(AtomicBool::new(false)); + let post_guard = Arc::new(PostGuard::new()); + + let consumer = tokio::spawn(ambient_consumer_loop( + channel_id.to_string(), + channel_ref.clone(), + rx, + self.config.clone(), + Arc::clone(&self.flush_semaphore), + Arc::clone(&flushing), + Arc::clone(&post_guard), + adapter, + target, + )); + + channels.insert( + channel_id.to_string(), + ChannelState { + tx, + flushing, + post_guard, + _consumer: consumer, + }, + ); + } + + let state = channels.get(channel_id).unwrap(); + // Non-blocking try_send — if buffer is full (hard_cap), drop the message. + if let Err(e) = state.tx.try_send(msg) { + debug!( + channel_id, + "ambient buffer full, dropping message: {}", + e + ); + } + } + + /// Discard the ambient buffer for a channel (called when @mention arrives). + /// Also cancels any in-flight ambient response via the post_guard. + pub async fn discard_buffer(&self, channel_id: &str) { + let channels = self.channels.lock().await; + if let Some(state) = channels.get(channel_id) { + // Cancel any in-flight post. + state.post_guard.cancel(); + + // Drain the mpsc channel (discard all buffered messages). + // We create a temp receiver by closing and re-opening — simpler to + // just drain via try_recv pattern. But since we only have the sender, + // we signal cancellation and let the consumer handle it. + // The consumer checks post_guard.can_post() before posting. + debug!(channel_id, "ambient buffer discard requested (mention arrived)"); + } + } + + /// Check if a channel is currently mid-flush. + pub async fn is_flushing(&self, channel_id: &str) -> bool { + let channels = self.channels.lock().await; + channels + .get(channel_id) + .map(|s| s.flushing.load(Ordering::Acquire)) + .unwrap_or(false) + } +} + +// --------------------------------------------------------------------------- +// ambient_consumer_loop +// --------------------------------------------------------------------------- + +/// Per-channel consumer that accumulates messages and flushes them as a batch. +async fn ambient_consumer_loop( + channel_id: String, + channel_ref: ChannelRef, + mut rx: tokio::sync::mpsc::Receiver, + config: AmbientConfig, + flush_semaphore: Arc, + flushing: Arc, + post_guard: Arc, + adapter: Arc, + target: Arc, +) { + info!(channel_id = %channel_id, "ambient consumer started"); + + loop { + // Wait for first message (blocks until one arrives or channel closes). + let first = match rx.recv().await { + Some(msg) => msg, + None => { + info!(channel_id = %channel_id, "ambient consumer channel closed, exiting"); + return; + } + }; + + // Compute jittered deadline: flush_interval ± 20% + let base = Duration::from_secs(config.flush_interval_seconds); + let jitter_range = base.as_millis() as f64 * 0.2; + let jitter_ms = rand::thread_rng().gen_range(-jitter_range..jitter_range) as i64; + let interval = Duration::from_millis((base.as_millis() as i64 + jitter_ms).max(1000) as u64); + let deadline = tokio::time::Instant::now() + interval; + + let mut batch = vec![first]; + + // Accumulate until trigger fires. + loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + break; // Timer expired. + } + + match tokio::time::timeout(remaining, rx.recv()).await { + Ok(Some(msg)) => { + batch.push(msg); + if batch.len() >= config.flush_max_messages { + break; + } + if batch.len() >= config.flush_hard_cap { + break; + } + } + Ok(None) => break, // Channel closed. + Err(_) => break, // Timer expired. + } + } + + let batch_size = batch.len(); + debug!( + channel_id = %channel_id, + batch_size, + "ambient flush triggered" + ); + + // Acquire global concurrency permit. + let _permit = match flush_semaphore.acquire().await { + Ok(permit) => permit, + Err(_) => { + warn!(channel_id = %channel_id, "flush semaphore closed, exiting"); + return; + } + }; + + // Set flushing flag with safety timeout. + let flush_timeout = Duration::from_secs(config.flush_timeout_seconds); + let _flushing_guard = FlushingGuard::new(Arc::clone(&flushing), flush_timeout); + + // Reset post_guard for this flush cycle. + post_guard.reset(); + + // Build the batch payload. + let session_key = format!("ambient:{}:{}", channel_ref.platform, channel_id); + let content_blocks = build_ambient_payload(&batch); + + // Ensure session exists. + if let Err(e) = target.ensure_session(&session_key, None).await { + warn!( + channel_id = %channel_id, + error = %e, + "failed to create ambient session, discarding batch" + ); + continue; + } + + // Dispatch batch to agent. For ambient mode, we use stream_prompt_blocks + // with a dummy reaction controller (enabled=false) to suppress all reactions. + // The NO_REPLY check must be handled at a higher level (response filtering). + // + // TODO(ambient-v2): implement response capture mode so we can intercept + // [NO_REPLY] before the message is posted. For v1, the system prompt + // strongly instructs the agent, and we accept that rare false-positives + // may briefly appear before being deleted. + let dummy_msg_ref = MessageRef { + channel: channel_ref.clone(), + message_id: String::new(), + }; + let reactions = Arc::new(crate::reactions::StatusReactionController::new( + false, // disabled — no reactions for ambient + Arc::clone(&adapter), + dummy_msg_ref, + crate::config::ReactionEmojis::default(), + crate::config::ReactionTiming::default(), + )); + + // Check post_guard before dispatching (mention may have cancelled). + if !post_guard.can_post() { + debug!(channel_id = %channel_id, "ambient flush cancelled by mention"); + continue; + } + + match target + .stream_prompt_blocks( + &adapter, + &session_key, + content_blocks, + &channel_ref, + reactions, + false, // other_bot_present + None, // no streaming recipient + ) + .await + { + Ok(()) => { + debug!(channel_id = %channel_id, "ambient flush dispatched"); + } + Err(e) => { + warn!( + channel_id = %channel_id, + error = %e, + "ambient flush failed, discarding batch" + ); + } + } + + // _flushing_guard drops here → resets flag + // _permit drops here → releases semaphore + } +} + +// --------------------------------------------------------------------------- +// Payload construction +// --------------------------------------------------------------------------- + +/// Build the content blocks for an ambient batch dispatch. +fn build_ambient_payload(batch: &[AmbientMessage]) -> Vec { + let mut blocks = Vec::new(); + + // System instruction. + blocks.push(ContentBlock::Text { + text: AMBIENT_SYSTEM_INSTRUCTION.to_string(), + }); + + // Format batch as conversation transcript. + let mut transcript = String::from("[Ambient batch — "); + transcript.push_str(&batch.len().to_string()); + transcript.push_str(" new messages]\n"); + + for msg in batch { + transcript.push_str(&msg.sender_name); + transcript.push_str(": "); + transcript.push_str(&msg.prompt); + transcript.push('\n'); + } + + transcript.push_str("\n[End of batch — reply only if you can add meaningful value.\n Otherwise reply exactly: [NO_REPLY]]"); + + blocks.push(ContentBlock::Text { text: transcript }); + + // Append any attachment blocks from messages. + for msg in batch { + blocks.extend(msg.extra_blocks.iter().cloned()); + } + + blocks +} + +/// Check if a response is the NO_REPLY sentinel. +pub fn is_no_reply(response: &str) -> bool { + response.trim().to_lowercase() == NO_REPLY_SENTINEL +} diff --git a/crates/openab-core/src/config.rs b/crates/openab-core/src/config.rs index f0018572c..0d16ac912 100644 --- a/crates/openab-core/src/config.rs +++ b/crates/openab-core/src/config.rs @@ -148,6 +148,8 @@ pub struct Config { pub workspace: WorkspaceConfig, #[serde(default)] pub secrets: SecretsConfig, + #[serde(default)] + pub ambient: AmbientConfig, } #[derive(Debug, Clone, Default, Deserialize)] @@ -1223,6 +1225,122 @@ fn parse_config_inner(expanded: &str, source: &str) -> anyhow::Result { Ok(config) } +// --------------------------------------------------------------------------- +// Ambient Mode configuration +// --------------------------------------------------------------------------- + +/// Top-level `[ambient]` configuration for passive channel listening. +#[derive(Debug, Clone, Deserialize)] +pub struct AmbientConfig { + /// Master switch (default: false). + #[serde(default)] + pub enabled: bool, + /// Time-based flush trigger in seconds (±20% jitter applied). Default: 60. + #[serde(default = "default_flush_interval_seconds")] + pub flush_interval_seconds: u64, + /// Count-based flush trigger. Default: 10. + #[serde(default = "default_flush_max_messages")] + pub flush_max_messages: usize, + /// Safety cap — force flush at this count even if timer hasn't expired. + /// Only relevant when `flush_max_messages` is set very high or disabled. Default: 50. + #[serde(default = "default_flush_hard_cap")] + pub flush_hard_cap: usize, + /// Historical messages fetched via Discord API before the batch. Default: 20. + #[serde(default = "default_context_window")] + pub context_window: usize, + /// Max simultaneous LLM calls across all ambient channels. Default: 3. + #[serde(default = "default_max_concurrent_flushes")] + pub max_concurrent_flushes: usize, + /// Safety timeout (seconds) — auto-reset flushing flag if exceeded. Default: 120. + #[serde(default = "default_flush_timeout_seconds")] + pub flush_timeout_seconds: u64, + /// Ambient session pool configuration. + #[serde(default)] + pub pool: AmbientPoolConfig, + /// Platform-specific ambient settings. + #[serde(default)] + pub discord: AmbientDiscordConfig, +} + +impl Default for AmbientConfig { + fn default() -> Self { + Self { + enabled: false, + flush_interval_seconds: default_flush_interval_seconds(), + flush_max_messages: default_flush_max_messages(), + flush_hard_cap: default_flush_hard_cap(), + context_window: default_context_window(), + max_concurrent_flushes: default_max_concurrent_flushes(), + flush_timeout_seconds: default_flush_timeout_seconds(), + pool: AmbientPoolConfig::default(), + discord: AmbientDiscordConfig::default(), + } + } +} + +/// `[ambient.pool]` — dedicated session pool for ambient dispatches. +#[derive(Debug, Clone, Deserialize)] +pub struct AmbientPoolConfig { + /// Max concurrent ambient sessions. Default: 5. + #[serde(default = "default_ambient_max_sessions")] + pub max_sessions: usize, + /// Ambient session inactivity timeout in minutes. Default: 60. + #[serde(default = "default_ambient_session_ttl_minutes")] + pub session_ttl_minutes: u64, + /// Rolling window of retained flush history (cross-flush memory). Default: 3. + #[serde(default = "default_ambient_context_flushes")] + pub context_flushes: usize, +} + +impl Default for AmbientPoolConfig { + fn default() -> Self { + Self { + max_sessions: default_ambient_max_sessions(), + session_ttl_minutes: default_ambient_session_ttl_minutes(), + context_flushes: default_ambient_context_flushes(), + } + } +} + +/// `[ambient.discord]` — Discord-specific ambient settings. +#[derive(Debug, Clone, Default, Deserialize)] +pub struct AmbientDiscordConfig { + /// Explicit channel allowlist. Required — empty means ambient is disabled. + #[serde(default)] + pub channels: Vec, + /// Whether other bots' messages enter the ambient buffer. Default: false. + #[serde(default)] + pub allow_bot_messages: bool, +} + +fn default_flush_interval_seconds() -> u64 { + 60 +} +fn default_flush_max_messages() -> usize { + 10 +} +fn default_flush_hard_cap() -> usize { + 50 +} +fn default_context_window() -> usize { + 20 +} +fn default_max_concurrent_flushes() -> usize { + 3 +} +fn default_flush_timeout_seconds() -> u64 { + 120 +} +fn default_ambient_max_sessions() -> usize { + 5 +} +fn default_ambient_session_ttl_minutes() -> u64 { + 60 +} +fn default_ambient_context_flushes() -> usize { + 3 +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/openab-core/src/discord.rs b/crates/openab-core/src/discord.rs index 7645c57d0..59a5aff7e 100644 --- a/crates/openab-core/src/discord.rs +++ b/crates/openab-core/src/discord.rs @@ -3,6 +3,7 @@ use crate::acp::ContentBlock; use crate::adapter::{AdapterRouter, ChannelRef, ChatAdapter, MessageRef, SenderContext}; use crate::bot_turns::{BotTurnTracker, TurnAction, TurnSeverity, BOT_TURN_LIMIT_WARNING_PREFIX}; use crate::config::{AllowBots, AllowUsers, SttConfig}; +use crate::dispatch::DispatchTarget; use crate::format; use crate::media; use crate::remind::{self, ReminderStore}; @@ -234,6 +235,8 @@ pub struct Handler { pub allow_dm: bool, /// Per-thread dispatcher (Message mode uses cap=1 for FIFO; Thread/Lane use configured cap). pub dispatcher: Arc, + /// Ambient mode dispatcher for passive channel listening. + pub ambient: Option>, /// Reminder store for /remind slash command. pub reminder_store: ReminderStore, /// Track scheduled reminder IDs to prevent duplicate scheduling on reconnect. @@ -604,6 +607,64 @@ impl EventHandler for Handler { return; } + // --- Ambient Mode routing --- + // If the message is in an ambient-enabled channel, NOT a @mention, + // NOT in a thread, and NOT a DM → route to ambient dispatcher. + // @mention in an ambient channel → discard buffer + normal dispatch. + if let Some(ref ambient) = self.ambient { + if ambient.is_ambient_channel(channel_id) && !in_thread && !is_dm { + if is_mentioned { + // Discard ambient buffer — mention takes priority. + ambient.discard_buffer(&channel_id.to_string()).await; + // Fall through to normal dispatch below. + } else { + // Route to ambient buffer (not normal dispatch). + // Bot messages only if allow_bot_messages is true for ambient. + if msg.author.bot && !ambient.allow_bot_messages() { + return; + } + + let prompt = resolve_mentions(&msg.content, bot_id, &self.allowed_role_ids); + if prompt.is_empty() && msg.attachments.is_empty() { + return; + } + + let display_name = msg + .member + .as_ref() + .and_then(|m| m.nick.as_ref()) + .or(msg.author.global_name.as_ref()) + .unwrap_or(&msg.author.name); + + let channel_ref = ChannelRef { + platform: "discord".into(), + channel_id: channel_id.to_string(), + thread_id: None, + parent_id: None, + origin_event_id: None, + }; + + let ambient_msg = crate::ambient::AmbientMessage { + sender_json: String::new(), // Not needed for ambient + sender_name: display_name.clone(), + prompt, + extra_blocks: Vec::new(), // Skip attachments for ambient v1 + arrived_at: std::time::Instant::now(), + }; + + let target = Arc::clone(&self.router) as Arc; + ambient.submit( + &channel_id.to_string(), + channel_ref, + adapter.clone(), + target, + ambient_msg, + ).await; + return; + } + } + } + // User message gating (mirrors Slack's AllowUsers logic). // Mentions: always require @mention, even in bot's own threads. // Involved (default): skip @mention if the bot owns the thread diff --git a/crates/openab-core/src/lib.rs b/crates/openab-core/src/lib.rs index b02848ac3..b6add32e8 100644 --- a/crates/openab-core/src/lib.rs +++ b/crates/openab-core/src/lib.rs @@ -24,5 +24,7 @@ pub mod timestamp; #[cfg(feature = "discord")] pub mod discord; +#[cfg(feature = "discord")] +pub mod ambient; #[cfg(feature = "slack")] pub mod slack; diff --git a/src/main.rs b/src/main.rs index 15f90db10..c0d0f4e77 100644 --- a/src/main.rs +++ b/src/main.rs @@ -762,6 +762,19 @@ async fn main() -> anyhow::Result<()> { .join("reminders.json"); let reminder_store = remind::ReminderStore::load(reminder_path); + // Construct ambient dispatcher if enabled and channels configured. + let ambient_dispatcher = if cfg.ambient.enabled && !cfg.ambient.discord.channels.is_empty() { + info!( + channels = ?cfg.ambient.discord.channels, + flush_interval = cfg.ambient.flush_interval_seconds, + flush_max_messages = cfg.ambient.flush_max_messages, + "ambient mode enabled" + ); + Some(Arc::new(openab_core::ambient::AmbientDispatcher::new(cfg.ambient.clone()))) + } else { + None + }; + let handler = discord::Handler { router, allow_all_channels, @@ -784,6 +797,7 @@ async fn main() -> anyhow::Result<()> { )), allow_dm: discord_cfg.allow_dm, dispatcher: discord_dispatcher, + ambient: ambient_dispatcher, reminder_store: reminder_store.clone(), scheduled_ids: tokio::sync::Mutex::new(std::collections::HashSet::new()), }; From 424b67f54a7ca80bf4bd67f069ec838463989b43 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 01:41:58 +0000 Subject: [PATCH 02/12] fix(ambient): resolve clippy lints (new_without_default, too_many_arguments) --- crates/openab-core/src/ambient.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 84e6c54b5..f843ea025 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -104,6 +104,12 @@ pub struct PostGuard { cancelled: AtomicBool, } +impl Default for PostGuard { + fn default() -> Self { + Self::new() + } +} + impl PostGuard { pub fn new() -> Self { Self { @@ -272,6 +278,7 @@ impl AmbientDispatcher { // --------------------------------------------------------------------------- /// Per-channel consumer that accumulates messages and flushes them as a batch. +#[allow(clippy::too_many_arguments)] async fn ambient_consumer_loop( channel_id: String, channel_ref: ChannelRef, From 8bbac31c27bdbc902298b27a5950e15a2b5a73d6 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 02:54:48 +0000 Subject: [PATCH 03/12] fix(review): address PR review findings F1-F3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - F1: Consumer now drains rx buffer when post_guard is cancelled, preventing stale messages from flushing on next cycle - F2: Add unit tests for is_no_reply and PostGuard - F3: display_name.clone() → .to_owned() for clarity --- crates/openab-core/src/ambient.rs | 63 +++++++++++++++++++++++++++---- crates/openab-core/src/discord.rs | 2 +- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index f843ea025..9bcc2b9e2 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -248,17 +248,12 @@ impl AmbientDispatcher { /// Discard the ambient buffer for a channel (called when @mention arrives). /// Also cancels any in-flight ambient response via the post_guard. + /// The consumer will drain buffered messages when it sees the cancellation. pub async fn discard_buffer(&self, channel_id: &str) { let channels = self.channels.lock().await; if let Some(state) = channels.get(channel_id) { - // Cancel any in-flight post. + // Cancel any in-flight post — consumer drains the buffer on next check. state.post_guard.cancel(); - - // Drain the mpsc channel (discard all buffered messages). - // We create a temp receiver by closing and re-opening — simpler to - // just drain via try_recv pattern. But since we only have the sender, - // we signal cancellation and let the consumer handle it. - // The consumer checks post_guard.can_post() before posting. debug!(channel_id, "ambient buffer discard requested (mention arrived)"); } } @@ -392,7 +387,9 @@ async fn ambient_consumer_loop( // Check post_guard before dispatching (mention may have cancelled). if !post_guard.can_post() { - debug!(channel_id = %channel_id, "ambient flush cancelled by mention"); + // Drain any remaining buffered messages so stale context is discarded. + while rx.try_recv().is_ok() {} + debug!(channel_id = %channel_id, "ambient flush cancelled by mention, buffer drained"); continue; } @@ -466,3 +463,53 @@ fn build_ambient_payload(batch: &[AmbientMessage]) -> Vec { pub fn is_no_reply(response: &str) -> bool { response.trim().to_lowercase() == NO_REPLY_SENTINEL } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_no_reply_exact() { + assert!(is_no_reply("[NO_REPLY]")); + assert!(is_no_reply("[no_reply]")); + assert!(is_no_reply("[No_Reply]")); + } + + #[test] + fn is_no_reply_with_whitespace() { + assert!(is_no_reply(" [NO_REPLY] ")); + assert!(is_no_reply("\n[no_reply]\n")); + assert!(is_no_reply("\t [NO_REPLY] \t")); + } + + #[test] + fn is_no_reply_rejects_partial() { + assert!(!is_no_reply("NO_REPLY")); + assert!(!is_no_reply("[NO_REPLY] sure")); + assert!(!is_no_reply("I have [NO_REPLY] for you")); + assert!(!is_no_reply("")); + } + + #[test] + fn post_guard_lifecycle() { + let guard = PostGuard::new(); + assert!(guard.can_post()); + + guard.cancel(); + assert!(!guard.can_post()); + + guard.reset(); + assert!(guard.can_post()); + } + + #[test] + fn post_guard_double_cancel() { + let guard = PostGuard::new(); + guard.cancel(); + guard.cancel(); + assert!(!guard.can_post()); + + guard.reset(); + assert!(guard.can_post()); + } +} diff --git a/crates/openab-core/src/discord.rs b/crates/openab-core/src/discord.rs index 59a5aff7e..a9d656231 100644 --- a/crates/openab-core/src/discord.rs +++ b/crates/openab-core/src/discord.rs @@ -646,7 +646,7 @@ impl EventHandler for Handler { let ambient_msg = crate::ambient::AmbientMessage { sender_json: String::new(), // Not needed for ambient - sender_name: display_name.clone(), + sender_name: display_name.to_owned(), prompt, extra_blocks: Vec::new(), // Skip attachments for ambient v1 arrived_at: std::time::Instant::now(), From 600e4f650afeb8b5c2213c59799f2805372f77b5 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 02:57:32 +0000 Subject: [PATCH 04/12] fix(review): critical post_guard.reset() race + dead code annotations - F1 Critical: Move post_guard.reset() AFTER the can_post() check. Previously reset() cleared cancellation before checking it, making discard_buffer() a no-op in the race window. - F2: Add doc note that [ambient.pool] config is parsed but not yet enforced (v2 follow-up). - F4: Mark is_flushing() as #[allow(dead_code)] with v2 note. --- crates/openab-core/src/ambient.rs | 11 ++++++++++- crates/openab-core/src/config.rs | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 9bcc2b9e2..765c99fa0 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -259,6 +259,8 @@ impl AmbientDispatcher { } /// Check if a channel is currently mid-flush. + /// Used by v2 rate-limiting (min_flush_interval_seconds) — kept for forward compat. + #[allow(dead_code)] pub async fn is_flushing(&self, channel_id: &str) -> bool { let channels = self.channels.lock().await; channels @@ -348,7 +350,14 @@ async fn ambient_consumer_loop( let flush_timeout = Duration::from_secs(config.flush_timeout_seconds); let _flushing_guard = FlushingGuard::new(Arc::clone(&flushing), flush_timeout); - // Reset post_guard for this flush cycle. + // Check post_guard BEFORE building payload (mention may have cancelled during accumulation). + if !post_guard.can_post() { + while rx.try_recv().is_ok() {} + debug!(channel_id = %channel_id, "ambient flush cancelled by mention during accumulation, buffer drained"); + continue; + } + + // Reset post_guard for this flush cycle — safe now because we checked first. post_guard.reset(); // Build the batch payload. diff --git a/crates/openab-core/src/config.rs b/crates/openab-core/src/config.rs index 0d16ac912..7c03d7d23 100644 --- a/crates/openab-core/src/config.rs +++ b/crates/openab-core/src/config.rs @@ -1279,6 +1279,9 @@ impl Default for AmbientConfig { } /// `[ambient.pool]` — dedicated session pool for ambient dispatches. +/// +/// NOTE: Pool management is not yet implemented (v2 follow-up). These settings +/// are parsed and validated on startup but not enforced at runtime. #[derive(Debug, Clone, Deserialize)] pub struct AmbientPoolConfig { /// Max concurrent ambient sessions. Default: 5. From c0d28b8b44253699db6e8c50f16d085bfeb2bdd4 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 02:58:57 +0000 Subject: [PATCH 05/12] fix(review): config validation guards + document accepted v1 limitations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - flush_interval_seconds=0 no longer panics (gen_range on empty range) → clamped to .max(1) at runtime - flush_max_messages=0 no longer defeats batching → .max(1) guard - flush_hard_cap=0 no longer panics (mpsc::channel(0)) → .max(1) - flush_timeout_seconds clamped to [5s, 600s] to prevent lockout - Document [NO_REPLY] not yet wired as accepted v1 limitation - Document shared DispatchTarget (tool access) as accepted v1 risk - Document config location deviation from ADR #1211 --- crates/openab-core/src/ambient.rs | 33 ++++++++++++++++++++----------- crates/openab-core/src/config.rs | 4 ++++ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 765c99fa0..541d459e2 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -208,7 +208,7 @@ impl AmbientDispatcher { // Lazily spawn consumer if not yet started for this channel. if !channels.contains_key(channel_id) { - let (tx, rx) = tokio::sync::mpsc::channel(self.config.flush_hard_cap); + let (tx, rx) = tokio::sync::mpsc::channel(self.config.flush_hard_cap.max(1)); let flushing = Arc::new(AtomicBool::new(false)); let post_guard = Arc::new(PostGuard::new()); @@ -300,13 +300,17 @@ async fn ambient_consumer_loop( }; // Compute jittered deadline: flush_interval ± 20% - let base = Duration::from_secs(config.flush_interval_seconds); + // Guard: interval must be >= 1s to avoid gen_range panic on empty range. + let base_secs = config.flush_interval_seconds.max(1); + let base = Duration::from_secs(base_secs); let jitter_range = base.as_millis() as f64 * 0.2; let jitter_ms = rand::thread_rng().gen_range(-jitter_range..jitter_range) as i64; let interval = Duration::from_millis((base.as_millis() as i64 + jitter_ms).max(1000) as u64); let deadline = tokio::time::Instant::now() + interval; let mut batch = vec![first]; + // Guard: flush_max_messages must be >= 1 to avoid immediate single-msg flush. + let flush_max = config.flush_max_messages.max(1); // Accumulate until trigger fires. loop { @@ -318,10 +322,10 @@ async fn ambient_consumer_loop( match tokio::time::timeout(remaining, rx.recv()).await { Ok(Some(msg)) => { batch.push(msg); - if batch.len() >= config.flush_max_messages { + if batch.len() >= flush_max { break; } - if batch.len() >= config.flush_hard_cap { + if batch.len() >= config.flush_hard_cap.max(1) { break; } } @@ -346,8 +350,8 @@ async fn ambient_consumer_loop( } }; - // Set flushing flag with safety timeout. - let flush_timeout = Duration::from_secs(config.flush_timeout_seconds); + // Set flushing flag with safety timeout (clamped to [5s, 600s]). + let flush_timeout = Duration::from_secs(config.flush_timeout_seconds.clamp(5, 600)); let _flushing_guard = FlushingGuard::new(Arc::clone(&flushing), flush_timeout); // Check post_guard BEFORE building payload (mention may have cancelled during accumulation). @@ -376,12 +380,19 @@ async fn ambient_consumer_loop( // Dispatch batch to agent. For ambient mode, we use stream_prompt_blocks // with a dummy reaction controller (enabled=false) to suppress all reactions. - // The NO_REPLY check must be handled at a higher level (response filtering). // - // TODO(ambient-v2): implement response capture mode so we can intercept - // [NO_REPLY] before the message is posted. For v1, the system prompt - // strongly instructs the agent, and we accept that rare false-positives - // may briefly appear before being deleted. + // ⚠️ KNOWN LIMITATIONS (v1, accepted): + // 1. [NO_REPLY] filtering: `is_no_reply()` is defined but not yet wired + // into the response path. The system prompt instructs the agent to + // return `[NO_REPLY]` but there is no post-filter to suppress it. + // Rare false-positive replies may appear until v2 response capture. + // 2. Tool access: ambient flush shares the same DispatchTarget as @mention, + // meaning the agent has full tool access during ambient flushes. For v1 + // this is accepted (system prompt restricts behavior); v2 should use a + // restricted target or disable tools for ambient sessions. + // + // TODO(ambient-v2): implement response capture mode to intercept + // [NO_REPLY] before posting, and restrict tool access for ambient. let dummy_msg_ref = MessageRef { channel: channel_ref.clone(), message_id: String::new(), diff --git a/crates/openab-core/src/config.rs b/crates/openab-core/src/config.rs index 7c03d7d23..e4ac11203 100644 --- a/crates/openab-core/src/config.rs +++ b/crates/openab-core/src/config.rs @@ -1230,6 +1230,10 @@ fn parse_config_inner(expanded: &str, source: &str) -> anyhow::Result { // --------------------------------------------------------------------------- /// Top-level `[ambient]` configuration for passive channel listening. +/// +/// NOTE: ADR #1211 originally specified `[discord.ambient]`. The implementation +/// uses top-level `[ambient]` with nested `[ambient.discord]` to allow future +/// multi-platform ambient support without restructuring config. #[derive(Debug, Clone, Deserialize)] pub struct AmbientConfig { /// Master switch (default: false). From 3f355d75665a9fb7c3b1d2161a412d5ea748b500 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 03:00:27 +0000 Subject: [PATCH 06/12] fix(review): permanent cancel block + remove dead sender_json field MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move post_guard.reset() to loop start (after first msg received). Fixes permanent block where one cancellation disabled all future flush cycles for that channel. - Remove dead AmbientMessage.sender_json field (never populated). - Flow: reset → accumulate → check(1) → build → check(2) → dispatch Both checks catch mid-cycle cancellations; reset prevents stickiness. --- crates/openab-core/src/ambient.rs | 9 ++++----- crates/openab-core/src/discord.rs | 1 - 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 541d459e2..4ca94a933 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -45,8 +45,6 @@ Rules: /// A single message buffered for ambient dispatch. #[derive(Debug)] pub struct AmbientMessage { - /// Serialised SenderContext JSON. - pub sender_json: String, /// Author display name. pub sender_name: String, /// User-visible prompt text. @@ -299,6 +297,10 @@ async fn ambient_consumer_loop( } }; + // Reset post_guard at the start of each batch cycle. This ensures a + // previous cancellation doesn't permanently block future cycles. + post_guard.reset(); + // Compute jittered deadline: flush_interval ± 20% // Guard: interval must be >= 1s to avoid gen_range panic on empty range. let base_secs = config.flush_interval_seconds.max(1); @@ -361,9 +363,6 @@ async fn ambient_consumer_loop( continue; } - // Reset post_guard for this flush cycle — safe now because we checked first. - post_guard.reset(); - // Build the batch payload. let session_key = format!("ambient:{}:{}", channel_ref.platform, channel_id); let content_blocks = build_ambient_payload(&batch); diff --git a/crates/openab-core/src/discord.rs b/crates/openab-core/src/discord.rs index a9d656231..a212a7c25 100644 --- a/crates/openab-core/src/discord.rs +++ b/crates/openab-core/src/discord.rs @@ -645,7 +645,6 @@ impl EventHandler for Handler { }; let ambient_msg = crate::ambient::AmbientMessage { - sender_json: String::new(), // Not needed for ambient sender_name: display_name.to_owned(), prompt, extra_blocks: Vec::new(), // Skip attachments for ambient v1 From e28f229646fa6de4ac0f9bd510567c25287aea89 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 03:01:07 +0000 Subject: [PATCH 07/12] fix(review): guard max_concurrent_flushes=0 against semaphore deadlock Semaphore::new(0) would block all flush operations permanently. Apply .max(1) to ensure at least one permit is always available. --- crates/openab-core/src/ambient.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 4ca94a933..8b9990f54 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -172,7 +172,7 @@ impl AmbientDispatcher { .iter() .filter_map(|s| s.parse().ok()) .collect(); - let flush_semaphore = Arc::new(Semaphore::new(config.max_concurrent_flushes)); + let flush_semaphore = Arc::new(Semaphore::new(config.max_concurrent_flushes.max(1))); Self { config, channels: Mutex::new(HashMap::new()), From 53a556afedbd8acd77549f7847f85d5d27619d06 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 03:06:06 +0000 Subject: [PATCH 08/12] fix(review): remove try_recv drain to prevent data loss Messages buffered after a @mention but during semaphore wait are valid for the next batch cycle. Draining them caused silent data loss in the narrow window where semaphore is saturated + mention arrives + new non-mention messages enter the buffer. Now: cancel discards the current batch only; remaining buffered messages naturally enter the next cycle after reset(). --- crates/openab-core/src/ambient.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 8b9990f54..ee3313717 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -358,8 +358,10 @@ async fn ambient_consumer_loop( // Check post_guard BEFORE building payload (mention may have cancelled during accumulation). if !post_guard.can_post() { - while rx.try_recv().is_ok() {} - debug!(channel_id = %channel_id, "ambient flush cancelled by mention during accumulation, buffer drained"); + // Don't drain — messages buffered after the mention are still valid + // for the next batch cycle. The current batch is discarded but future + // messages will be picked up when the loop restarts and reset() clears. + debug!(channel_id = %channel_id, "ambient flush cancelled by mention during accumulation"); continue; } @@ -406,9 +408,7 @@ async fn ambient_consumer_loop( // Check post_guard before dispatching (mention may have cancelled). if !post_guard.can_post() { - // Drain any remaining buffered messages so stale context is discarded. - while rx.try_recv().is_ok() {} - debug!(channel_id = %channel_id, "ambient flush cancelled by mention, buffer drained"); + debug!(channel_id = %channel_id, "ambient flush cancelled by mention before dispatch"); continue; } From 09026c3abe67c22f47aae4271cf4f69920d8bb46 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 03:14:31 +0000 Subject: [PATCH 09/12] fix(review): filter [NO_REPLY] before delivery via AmbientCaptureAdapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces AmbientCaptureAdapter — a ChatAdapter wrapper that: 1. Forces non-streaming mode (use_streaming = false) so the full response text is collected before send_message is called. 2. Intercepts send_message/send_message_with_reply to check is_no_reply() and suppress the sentinel before it reaches Discord. This prevents the [NO_REPLY] sentinel from being posted to the channel, which was the expected common-case behavior (most ambient batches will produce no-reply since the channel is mostly idle chat). Without this fix, ambient mode would spam [NO_REPLY] as literal messages on every flush where the agent has nothing to contribute. --- crates/openab-core/src/ambient.rs | 111 +++++++++++++++++++++++++----- 1 file changed, 95 insertions(+), 16 deletions(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index ee3313717..8001362b2 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -20,6 +20,9 @@ use crate::adapter::{ChannelRef, ChatAdapter, MessageRef}; use crate::config::AmbientConfig; use crate::dispatch::DispatchTarget; +use anyhow::Result; +use async_trait::async_trait; + // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- @@ -131,6 +134,86 @@ impl PostGuard { } } +// --------------------------------------------------------------------------- +// AmbientCaptureAdapter — intercepts send_message for [NO_REPLY] filtering +// --------------------------------------------------------------------------- + +/// Wraps a real `ChatAdapter` and intercepts `send_message` to suppress +/// `[NO_REPLY]` responses before they reach the channel. +struct AmbientCaptureAdapter { + inner: Arc, +} + +#[async_trait] +impl ChatAdapter for AmbientCaptureAdapter { + fn platform(&self) -> &'static str { + self.inner.platform() + } + + fn message_limit(&self) -> usize { + self.inner.message_limit() + } + + fn use_streaming(&self, _other_bot_present: bool) -> bool { + false // Force non-streaming so text is collected before send + } + + async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result { + // Filter [NO_REPLY] before it reaches the channel. + if is_no_reply(content) { + debug!("ambient: suppressed [NO_REPLY] response"); + return Ok(MessageRef { + channel: channel.clone(), + message_id: String::new(), + }); + } + self.inner.send_message(channel, content).await + } + + async fn create_thread( + &self, + channel: &ChannelRef, + trigger_msg: &MessageRef, + title: &str, + ) -> Result { + self.inner.create_thread(channel, trigger_msg, title).await + } + + async fn add_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()> { + self.inner.add_reaction(msg, emoji).await + } + + async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()> { + self.inner.remove_reaction(msg, emoji).await + } + + async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()> { + self.inner.edit_message(msg, content).await + } + + async fn send_message_with_reply( + &self, + channel: &ChannelRef, + content: &str, + reply_to_message_id: &str, + ) -> Result { + if is_no_reply(content) { + debug!("ambient: suppressed [NO_REPLY] reply"); + return Ok(MessageRef { + channel: channel.clone(), + message_id: String::new(), + }); + } + self.inner + .send_message_with_reply(channel, content, reply_to_message_id) + .await + } + + async fn delete_message(&self, msg: &MessageRef) -> Result<()> { + self.inner.delete_message(msg).await + } +} + // --------------------------------------------------------------------------- // ChannelState — per-channel ambient state // --------------------------------------------------------------------------- @@ -379,28 +462,24 @@ async fn ambient_consumer_loop( continue; } - // Dispatch batch to agent. For ambient mode, we use stream_prompt_blocks - // with a dummy reaction controller (enabled=false) to suppress all reactions. + // Dispatch batch to agent using AmbientCaptureAdapter which intercepts + // [NO_REPLY] responses before they reach the channel. Non-streaming mode + // ensures text is fully collected before send_message is called, allowing + // is_no_reply() to filter the sentinel pre-delivery. // - // ⚠️ KNOWN LIMITATIONS (v1, accepted): - // 1. [NO_REPLY] filtering: `is_no_reply()` is defined but not yet wired - // into the response path. The system prompt instructs the agent to - // return `[NO_REPLY]` but there is no post-filter to suppress it. - // Rare false-positive replies may appear until v2 response capture. - // 2. Tool access: ambient flush shares the same DispatchTarget as @mention, - // meaning the agent has full tool access during ambient flushes. For v1 - // this is accepted (system prompt restricts behavior); v2 should use a - // restricted target or disable tools for ambient sessions. - // - // TODO(ambient-v2): implement response capture mode to intercept - // [NO_REPLY] before posting, and restrict tool access for ambient. + // ⚠️ KNOWN LIMITATION (v1, accepted): + // Tool access: ambient flush shares the same DispatchTarget as @mention. + // v2 should use a restricted target or disable tools for ambient sessions. + let capture_adapter: Arc = Arc::new(AmbientCaptureAdapter { + inner: Arc::clone(&adapter), + }); let dummy_msg_ref = MessageRef { channel: channel_ref.clone(), message_id: String::new(), }; let reactions = Arc::new(crate::reactions::StatusReactionController::new( false, // disabled — no reactions for ambient - Arc::clone(&adapter), + Arc::clone(&capture_adapter), dummy_msg_ref, crate::config::ReactionEmojis::default(), crate::config::ReactionTiming::default(), @@ -414,7 +493,7 @@ async fn ambient_consumer_loop( match target .stream_prompt_blocks( - &adapter, + &capture_adapter, &session_key, content_blocks, &channel_ref, From 6ce0b9a3ce5d3a2092e1d3e5da76bb3212713a62 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 11:00:44 +0000 Subject: [PATCH 10/12] docs(ambient): add architectural rationale for standalone module - Explain why ambient.rs is separate from Dispatcher (no trigger_msg, no streaming, no Lane mode, NO_REPLY filtering needs differ). - Mark context_window as 'not yet implemented (v2)' in config doc. --- crates/openab-core/src/ambient.rs | 17 +++++++++++++++++ crates/openab-core/src/config.rs | 1 + 2 files changed, 18 insertions(+) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 8001362b2..1f742fb9c 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -5,6 +5,23 @@ //! Ambient mode listens to all messages in configured channels (without @mention) //! and periodically flushes them as a batch to the LLM. The agent replies only //! when it has something valuable to add; otherwise it returns `[NO_REPLY]`. +//! +//! # Why a standalone module (not extending Dispatcher) +//! +//! ADR §Implementation Notes suggests reusing Dispatcher infrastructure. This +//! implementation deliberately builds a separate module because ambient's +//! requirements diverge from normal dispatch: +//! +//! - No trigger message — passive listening has no `MessageRef` to anchor +//! reactions or reply_to. +//! - No streaming / placeholder — ambient is non-interactive; responses use +//! `AmbientCaptureAdapter` (non-streaming) for `[NO_REPLY]` pre-filtering. +//! - No per-sender batching (Lane mode) — ambient batches by channel, not sender. +//! - No `BotTurnTracker` integration — ambient has independent reply budget (v2). +//! +//! Forcing these into `Dispatcher` would require pervasive `if ambient { ... }` +//! branches, increasing regression risk for the existing dispatch path. Clean +//! separation keeps both paths simple and independently testable. use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; diff --git a/crates/openab-core/src/config.rs b/crates/openab-core/src/config.rs index e4ac11203..43a85dae9 100644 --- a/crates/openab-core/src/config.rs +++ b/crates/openab-core/src/config.rs @@ -1250,6 +1250,7 @@ pub struct AmbientConfig { #[serde(default = "default_flush_hard_cap")] pub flush_hard_cap: usize, /// Historical messages fetched via Discord API before the batch. Default: 20. + /// NOTE: Not yet implemented (v2 follow-up). Parsed but not used at runtime. #[serde(default = "default_context_window")] pub context_window: usize, /// Max simultaneous LLM calls across all ambient channels. Default: 3. From c87a81e354810e94345f54f83499c983028d7045 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 11:04:06 +0000 Subject: [PATCH 11/12] docs(ambient): fix stale drain comment (drain was removed in 53a556a) --- crates/openab-core/src/ambient.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/openab-core/src/ambient.rs b/crates/openab-core/src/ambient.rs index 1f742fb9c..d9ad774bd 100644 --- a/crates/openab-core/src/ambient.rs +++ b/crates/openab-core/src/ambient.rs @@ -346,11 +346,12 @@ impl AmbientDispatcher { /// Discard the ambient buffer for a channel (called when @mention arrives). /// Also cancels any in-flight ambient response via the post_guard. - /// The consumer will drain buffered messages when it sees the cancellation. + /// The consumer discards the current batch; remaining buffered messages + /// carry into the next cycle. pub async fn discard_buffer(&self, channel_id: &str) { let channels = self.channels.lock().await; if let Some(state) = channels.get(channel_id) { - // Cancel any in-flight post — consumer drains the buffer on next check. + // Cancel any in-flight post — consumer discards current batch on next check. state.post_guard.cancel(); debug!(channel_id, "ambient buffer discard requested (mention arrived)"); } From 610954726d49d12a8a21483fae3ff8488bef3bc5 Mon Sep 17 00:00:00 2001 From: chaodu-agent Date: Sat, 27 Jun 2026 11:28:38 +0000 Subject: [PATCH 12/12] docs(ambient): add user-facing docs and config reference - docs/ambient.md: complete guide (config, behavior, limitations, example) - docs/config-reference.md: add [ambient] section - docs/discord.md: add Ambient Mode section with quick-start --- docs/ambient.md | 100 +++++++++++++++++++++++++++++++++++++++ docs/config-reference.md | 26 ++++++++++ docs/discord.md | 19 ++++++++ 3 files changed, 145 insertions(+) create mode 100644 docs/ambient.md diff --git a/docs/ambient.md b/docs/ambient.md new file mode 100644 index 000000000..81f3ac17e --- /dev/null +++ b/docs/ambient.md @@ -0,0 +1,100 @@ +# Ambient Mode + +Ambient mode allows your bot to passively listen to all messages in configured channels and autonomously decide whether to respond. Unlike the default @mention mode, the bot observes the full conversation flow and only speaks up when it has something valuable to add. + +## How It Works + +1. Messages in configured channels (that are **not** @mentions) are buffered per-channel. +2. When a **time trigger** (`flush_interval_seconds`) or **count trigger** (`flush_max_messages`) fires, the batch is sent to the LLM. +3. The LLM evaluates the conversation and either: + - **Replies** with a helpful response → posted to the channel. + - **Returns `[NO_REPLY]`** → silently suppressed, nothing is posted. +4. If someone **@mentions** the bot in an ambient channel, the buffer is discarded and the mention is handled normally (immediate response). + +## Configuration + +```toml +[ambient] +enabled = true +flush_interval_seconds = 60 # Time trigger (±20% jitter applied) +flush_max_messages = 10 # Count trigger +flush_hard_cap = 50 # Safety cap on buffer size +max_concurrent_flushes = 3 # Global LLM concurrency limit +flush_timeout_seconds = 120 # Safety timeout per flush + +[ambient.discord] +channels = ["1234567890"] # Channel IDs to monitor +allow_bot_messages = false # Include other bots' messages in buffer +``` + +### Configuration fields + +| Field | Default | Description | +|-------|---------|-------------| +| `enabled` | `false` | Master switch. Must be explicitly enabled. | +| `flush_interval_seconds` | `60` | Seconds between time-based flushes. ±20% jitter prevents thundering herd. Min: 1. | +| `flush_max_messages` | `10` | Flush when this many messages accumulate. Min: 1. | +| `flush_hard_cap` | `50` | Maximum buffer size. Messages beyond this are dropped. Min: 1. | +| `max_concurrent_flushes` | `3` | Max simultaneous LLM calls across all ambient channels. Min: 1. | +| `flush_timeout_seconds` | `120` | Safety timeout — resets flushing state if exceeded. Clamped to [5, 600]. | +| `channels` | `[]` | Explicit channel allowlist (required). Empty = ambient disabled. | +| `allow_bot_messages` | `false` | Whether other bots' messages enter the ambient buffer. | + +### Reserved fields (v2, not yet enforced) + +| Field | Default | Description | +|-------|---------|-------------| +| `context_window` | `20` | Historical messages to fetch before each batch (not yet implemented). | +| `pool.max_sessions` | `5` | Max concurrent ambient sessions (not yet enforced). | +| `pool.session_ttl_minutes` | `60` | Session inactivity timeout (not yet enforced). | +| `pool.context_flushes` | `3` | Rolling flush history window (not yet enforced). | + +## Behavior + +### @mention priority + +When someone @mentions the bot in an ambient channel: +1. The ambient buffer is immediately invalidated (current batch discarded). +2. The mention is handled via normal dispatch (immediate response). +3. After the mention is handled, ambient buffering resumes for new messages. + +Buffered messages that arrived before the mention are **not lost** — they carry into the next ambient cycle. + +### [NO_REPLY] filtering + +The bot uses a system prompt that instructs it to respond with `[NO_REPLY]` when it has nothing to add. This sentinel is intercepted **before delivery** — it never appears in the channel. The filtering uses a capture adapter that forces non-streaming mode to ensure the full response is evaluated before any message is sent. + +### Session isolation + +Ambient sessions use the namespace `ambient:discord:`, separate from normal dispatch sessions. There is no collision with @mention sessions. + +### Cost control + +- **Jittered intervals** prevent all channels from flushing simultaneously. +- **Global semaphore** caps concurrent LLM calls (default: 3). +- **`[NO_REPLY]`** means most flushes produce no visible output (only one LLM call, no channel message). +- **`enabled = false`** default means zero cost until explicitly opted in. + +## Limitations (v1) + +| Limitation | Description | Planned fix | +|-----------|-------------|-------------| +| Tool access | Ambient flushes have full tool access (same as @mention). | v2: restricted dispatch target | +| In-flight cancel | A @mention during LLM generation cannot stop the ambient response mid-stream. | v2: `tokio::select!` preemption | +| Consumer supervision | If a consumer task panics, that channel's ambient is permanently disabled until restart. | v2: health check + respawn | +| No history fetch | `context_window` (Discord API history before batch) is not yet implemented. | v2 | +| No cooldown | No minimum interval between consecutive flushes for a single channel. | v2 | + +## Example + +Minimal config to enable ambient mode on one channel: + +```toml +[ambient] +enabled = true + +[ambient.discord] +channels = ["1490282656913559673"] +``` + +This uses all defaults: 60s flush interval, max 10 messages per batch, 3 concurrent flushes. diff --git a/docs/config-reference.md b/docs/config-reference.md index 56ea052dd..c21aa38c0 100644 --- a/docs/config-reference.md +++ b/docs/config-reference.md @@ -463,6 +463,32 @@ web = "~/projects/frontend" --- +## `[ambient]` + +Passive channel listening with batch flush. See [ambient.md](ambient.md) for full guide. + +```toml +[ambient] +enabled = false # Master switch +flush_interval_seconds = 60 # Time trigger (±20% jitter) +flush_max_messages = 10 # Count trigger +flush_hard_cap = 50 # Max buffer size +max_concurrent_flushes = 3 # Global LLM concurrency limit +flush_timeout_seconds = 120 # Safety timeout per flush +context_window = 20 # (v2, not yet implemented) + +[ambient.pool] # (v2, not yet enforced) +max_sessions = 5 +session_ttl_minutes = 60 +context_flushes = 3 + +[ambient.discord] +channels = [] # Channel ID allowlist (required) +allow_bot_messages = false +``` + +--- + ## `[cron]` Everything cron-related lives under `[cron]`. diff --git a/docs/discord.md b/docs/discord.md index 2267aac48..f152b026a 100644 --- a/docs/discord.md +++ b/docs/discord.md @@ -209,6 +209,25 @@ Each thread gets its own agent session. Sessions are cleaned up after `session_t --- +## Ambient Mode + +Ambient mode allows the bot to passively listen to configured channels and respond only when it has something valuable to add — without requiring @mentions. See [ambient.md](ambient.md) for full details. + +```toml +[ambient] +enabled = true + +[ambient.discord] +channels = ["1234567890"] # Channel IDs to monitor +``` + +When enabled: +- Non-mention messages in listed channels are buffered and periodically sent to the LLM as a batch. +- If the LLM has nothing to add, it returns `[NO_REPLY]` (silently suppressed). +- **@mention always takes priority** — the ambient buffer is discarded and the mention gets an immediate response. + +--- + ## Attachment Handling OpenAB processes Discord file attachments and converts them into content blocks