diff --git a/Cargo.lock b/Cargo.lock index 0fe785ed..3787d300 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4561,6 +4561,7 @@ version = "0.0.1" dependencies = [ "flux", "libc", + "rustc-hash", "serde", "serde_json", "silver_common_macros", diff --git a/crates/bin/src/main.rs b/crates/bin/src/main.rs index c82bb723..c893b026 100644 --- a/crates/bin/src/main.rs +++ b/crates/bin/src/main.rs @@ -31,6 +31,9 @@ fn main() -> Result<(), Box> { let _tracing = initialise_tracing_log("silver", 10, None, false); tracing::debug!("start"); + // `#[timed]` is inert until a process opts in. + silver_common::flamegraph_timer::enable_surfer(); + let config = load_config()?; tracing::info!("loaded config with fork digest: {}", hex::encode(config.fork_digest())); diff --git a/crates/common_macros/src/lib.rs b/crates/common_macros/src/lib.rs index ccda93f4..2eed2059 100644 --- a/crates/common_macros/src/lib.rs +++ b/crates/common_macros/src/lib.rs @@ -1,56 +1,20 @@ //! Proc-macro support for the silver crates. Exposes `#[timed]`, which wraps a -//! function body in a thread-local flux `Timer` so processing time is emitted -//! to a per-function shmem queue (or folded into an in-process call tree under -//! the perf harness). Built with the `perf` feature, the same guard also -//! records hardware counters (instructions, cycles, branch/cache misses) via -//! rdpmc. Storage and Drop-side recording live in `silver_metrics`; this crate -//! is only the attribute-macro glue. +//! function body in a drop guard that records a frame open/close into the +//! cross-process flamegraph rings (folded in-process under the perf harness, or +//! by a surfer reading a running silver). Built with the `perf` feature, the +//! same marks carry hardware counters (instructions, cycles, branch/cache +//! misses) via rdpmc. Storage and Drop-side recording live in `silver_metrics`; +//! this crate is only the attribute-macro glue. use proc_macro::TokenStream; use quote::quote; -use syn::{ - Ident, ItemFn, LitInt, LitStr, Token, - parse::{Parse, ParseStream}, - parse_macro_input, -}; +use syn::{ItemFn, LitStr, parse_macro_input}; -struct TimedArgs { - name: Option, - sample: u64, -} - -impl Parse for TimedArgs { - fn parse(input: ParseStream) -> syn::Result { - let mut name = None; - let mut sample = 1u64; - if input.peek(LitStr) { - name = Some(input.parse()?); - if input.peek(Token![,]) { - input.parse::()?; - } - } - if !input.is_empty() { - let ident: Ident = input.parse()?; - if ident != "sample" { - return Err(syn::Error::new(ident.span(), "expected `sample = N`")); - } - input.parse::()?; - let lit: LitInt = input.parse()?; - sample = lit.base10_parse()?; - if sample == 0 { - return Err(syn::Error::new(lit.span(), "sample must be >= 1")); - } - } - Ok(Self { name, sample }) - } -} - -/// Wrap a function body in a per-function thread-local flux `Timer`. +/// Wrap a function body in a `#[timed]` frame guard. /// -/// Default timer name is `concat!(module_path!(), "::", fn_name)` -/// resolved at the call site — so the queue file becomes -/// `timing-{crate}::{module}::{fn_name}` and stays unambiguous across -/// crates without manual labelling. +/// Default frame name is `concat!(module_path!(), "::", fn_name)` resolved at +/// the call site, so a frame reads `{crate}::{module}::{fn_name}` and stays +/// unambiguous across crates without manual labelling. /// /// On a method, the default name auto-qualifies by the monomorphized `Self`: /// `type_name` bakes the concrete type in per instantiation, so generic code @@ -66,26 +30,17 @@ impl Parse for TimedArgs { /// Pass a string literal to override: `#[timed("custom_name")]` uses that name /// verbatim (no module prefix, no type qualification). /// -/// `sample = N` throttles the hardware-counter dimension (built with the `perf` -/// feature) to one in N calls on hot paths — timing is still recorded every -/// call. Combinable: `#[timed("custom_name", sample = 1000)]`. Without the -/// `perf` feature `sample` only affects which calls would have been counted, so -/// it is a no-op there. -/// -/// Records processing time on every exit path — normal return, `?`, -/// early `return`, panic-unwind — via a Drop guard. +/// Records the frame's close on every exit path — normal return, `?`, early +/// `return`, panic-unwind — via a Drop guard. #[proc_macro_attribute] pub fn timed(attr: TokenStream, item: TokenStream) -> TokenStream { let input = parse_macro_input!(item as ItemFn); - let args = parse_macro_input!(attr as TimedArgs); + let name = if attr.is_empty() { None } else { Some(parse_macro_input!(attr as LitStr)) }; let func_name_str = input.sig.ident.to_string(); let is_method = matches!(input.sig.inputs.first(), Some(syn::FnArg::Receiver(_))); - let timer_name_expr = match &args.name { - Some(lit) => { - let s = lit.value(); - quote! { #s } - } + let timer_name_expr = match &name { + Some(lit) => quote! { #lit }, None if is_method => quote! {{ struct __TimedTy(::core::marker::PhantomData); ::core::any::type_name::<__TimedTy>() @@ -95,30 +50,9 @@ pub fn timed(attr: TokenStream, item: TokenStream) -> TokenStream { let ItemFn { attrs, vis, sig, block } = input; - let guard = if args.sample <= 1 { - quote! { - let __timed_guard = ::silver_metrics::TimerGuard::new(#timer_name_expr); - } - } else { - let sample = args.sample; - quote! { - ::std::thread_local! { - static __TIMED_SKIP: ::core::cell::Cell = - const { ::core::cell::Cell::new(0) }; - } - let __timed_sample = __TIMED_SKIP.with(|c| { - let n = c.get(); - c.set(n.wrapping_add(1)); - n % #sample == 0 - }); - let __timed_guard = - ::silver_metrics::TimerGuard::new_sampled(#timer_name_expr, __timed_sample); - } - }; - let expanded = quote! { #(#attrs)* #vis #sig { - #guard + let __timed_guard = ::silver_metrics::TimerGuard::new(#timer_name_expr); #block } }; diff --git a/crates/e2e/src/perf/replay.rs b/crates/e2e/src/perf/replay.rs index 874866ad..a6aae232 100644 --- a/crates/e2e/src/perf/replay.rs +++ b/crates/e2e/src/perf/replay.rs @@ -4,7 +4,7 @@ use std::time::{Duration, Instant}; use silver_common::{ - flamegraph_timer::{collect::enable, report::TimingStats}, + flamegraph_timer::{LocalReader, report::TimingStats}, ssz_view::StatusView, }; @@ -34,9 +34,9 @@ pub fn replay(fixtures: &Fixtures) -> ReplayOutcome { let da_events: Vec<_> = blocks.iter().filter_map(|b| data_columns_available(b)).collect(); - // Enable before harness construction — `new_heap` already runs `#[timed]` STF + // Start before harness construction — `new_heap` already runs `#[timed]` STF // code. - enable(); + let recorder = LocalReader::start(); let mut harness = PmBsHarness::new(&fixtures.state_ssz, n_blocks, blocks.len()); let anchor_finalized_epoch = harness.fork_choice_finalized_epoch(); @@ -102,7 +102,7 @@ pub fn replay(fixtures: &Fixtures) -> ReplayOutcome { ); ReplayOutcome { - stats: TimingStats::collect(), + stats: recorder.collect(), validator_count: harness.head_validator_count(), wall_elapsed, final_slot, diff --git a/crates/e2e/src/perf/report.rs b/crates/e2e/src/perf/report.rs index 11f142a0..a0011d89 100644 --- a/crates/e2e/src/perf/report.rs +++ b/crates/e2e/src/perf/report.rs @@ -35,6 +35,11 @@ impl PerfReport { } pub fn check_thresholds(&self) -> Result<(), String> { + if self.outcome.stats.missed_events() { + return Err("perf gate failed: a timing ring lost marks and the measurement is \ + invalid; raise RING_CAPACITY in flamegraph_timer::queue_dir" + .into()); + } let failures: Vec = self.gauges().iter().filter_map(Gauge::failure).collect(); if failures.is_empty() { return Ok(()); diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index 8a5fe9d1..fbe8ac51 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true silver_common_macros.workspace = true flux.workspace = true libc.workspace = true +rustc-hash.workspace = true serde.workspace = true serde_json.workspace = true tracing = { workspace = true, optional = true } diff --git a/crates/metrics/src/flamegraph_timer/aggregator.rs b/crates/metrics/src/flamegraph_timer/aggregator.rs new file mode 100644 index 00000000..d133df0c --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/aggregator.rs @@ -0,0 +1,103 @@ +use flux::timing::Duration; +use rustc_hash::FxHashMap; + +use crate::{ + flamegraph_timer::mark::{Frame, Mark}, + perf::PerfSample, +}; + +#[derive(Default)] +pub(crate) struct CallStackSamples { + pub(crate) tracked_ns: Vec, + pub(crate) total_untracked_ns: u64, + /// Summed entry→exit counter deltas across calls; all-zero when the + /// `perf` feature is off. `total_untracked_perf` excludes children, like + /// `total_untracked_ns`. + pub(crate) tracked_perf: PerfSample, + pub(crate) total_untracked_perf: PerfSample, +} + +struct OpenFrame { + id: u64, + ts: u64, + perf: PerfSample, + total_tracked_ns: u64, + total_tracked_perf: PerfSample, +} + +/// Folds mark streams into call-path timings keyed by portable frame id, +/// independent of how the marks were transported (in-process buffers or a +/// cross-process shmem ring). Frame ids stay opaque: resolving them to names +/// needs the producer's binary and belongs to the reader that owns it. +/// +/// Each `fold_thread` call folds a mark stream from a fresh stack, so a +/// cumulative tree comes from re-folding the whole retained stream, not from +/// folding disjoint windows. +#[derive(Default)] +pub(crate) struct Aggregator { + paths: FxHashMap, CallStackSamples>, + /// A close popped an open with a different id — the stream desynced, so the + /// folded stats are unreliable (surfaced as `missed_events`). See + /// [`Self::fold_thread`]. + desynced: bool, +} + +impl Aggregator { + pub(crate) fn fold_thread(&mut self, marks: &[Mark], counters: &[PerfSample]) { + // `counters[i]` is the snapshot pushed right after `marks[i]` (same + // producer thread, lockstep order), so the two are index-aligned over + // their common prefix. A live drain (surfer reading while the producer + // runs) can snapshot the rings a push apart, leaving a 1-ish tail + // mismatch — benign: the extra counter is ignored and a mark missing + // its counter falls back to zero below. Perf off ⇒ `counters` empty. + let mut stack: Vec = Vec::new(); + for (i, mark) in marks.iter().enumerate() { + let sample = counters.get(i).copied().unwrap_or_default(); + let closing_id = match mark.frame { + Frame::Open { id, .. } => { + stack.push(OpenFrame { + id, + ts: mark.ts, + perf: sample, + total_tracked_ns: 0, + total_tracked_perf: PerfSample::default(), + }); + continue; + } + Frame::Close { id } => id, + }; + + let Some(frame) = stack.pop() else { continue }; + // Drop guards close in reverse open order, so a close must pop its + // own open. A mismatch means the stream desynced (a producer + // crash/restart, or a ring overwrite reordering marks); we still + // attribute to the popped frame but flag the fold as unreliable. + debug_assert_eq!(closing_id, frame.id, "timed close popped a non-matching open"); + self.desynced |= closing_id != frame.id; + let tracked_ns = Duration(mark.ts.saturating_sub(frame.ts)).as_nanos() as u64; + let untracked_ns = tracked_ns.saturating_sub(frame.total_tracked_ns); + let tracked_perf = sample.delta(&frame.perf); + let untracked_perf = tracked_perf.delta(&frame.total_tracked_perf); + + let path: Vec = stack.iter().map(|f| f.id).chain([frame.id]).collect(); + if let Some(parent) = stack.last_mut() { + parent.total_tracked_ns += tracked_ns; + parent.total_tracked_perf = parent.total_tracked_perf.add(&tracked_perf); + } + + let entry = self.paths.entry(path).or_default(); + entry.tracked_ns.push(tracked_ns); + entry.total_untracked_ns += untracked_ns; + entry.tracked_perf = entry.tracked_perf.add(&tracked_perf); + entry.total_untracked_perf = entry.total_untracked_perf.add(&untracked_perf); + } + } + + pub(crate) fn desynced(&self) -> bool { + self.desynced + } + + pub(crate) fn into_paths(self) -> FxHashMap, CallStackSamples> { + self.paths + } +} diff --git a/crates/metrics/src/flamegraph_timer/call_tree.rs b/crates/metrics/src/flamegraph_timer/call_tree.rs new file mode 100644 index 00000000..06e899eb --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/call_tree.rs @@ -0,0 +1,211 @@ +//! Render aggregated `#[timed]` paths as an indented, column-aligned call tree. +//! Each parent emits a synthetic `untracked` sibling (its time minus tracked +//! children) and folds its low-coverage tail into a single `...` row. + +use std::borrow::Cow; + +use flux::timing::Nanos; +use rustc_hash::FxHashMap; + +use crate::{ + flamegraph_timer::{ + names::leaf_name, + report::{FlamegraphMeta, PathStat}, + }, + perf::PerfSample, +}; + +pub(super) fn render(paths: &[PathStat], meta: &FlamegraphMeta) -> String { + let mut root = Node::default(); + for s in paths { + let mut node = &mut root; + for id in &s.path { + node = node.children.entry(*id).or_default(); + } + node.count = s.metrics.count; + node.total_untracked_ns = s.metrics.total_untracked_ns; + node.tracked_sum_ns = s.metrics.tracked_sum_ns; + node.perf = s.metrics.tracked_perf; + node.untracked_perf = s.metrics.untracked_perf; + } + + let mut lines = Vec::new(); + root.render_children(0, meta, &mut lines); + render_aligned(&lines) +} + +#[derive(Default)] +struct Node { + count: u64, + total_untracked_ns: Nanos, + tracked_sum_ns: Nanos, + perf: PerfSample, + untracked_perf: PerfSample, + children: FxHashMap, +} + +/// Keep the rows covering this % of a node's time; fold the rest into `...`. +const COVERAGE_PCT: u64 = 99; + +impl Node { + fn render_children(&self, depth: usize, meta: &FlamegraphMeta, out: &mut Vec) { + if self.children.is_empty() { + return; + } + let mut rows: Vec = self + .children + .iter() + .map(|(id, c)| Row { + label: leaf_name(&meta.names[id]), + sum_ns: c.tracked_sum_ns, + count: c.count, + perf: c.perf, + child: Some(c), + }) + .collect(); + if self.count > 0 { + rows.push(Row { + label: Cow::Borrowed("untracked"), + sum_ns: self.total_untracked_ns, + count: self.count, + perf: self.untracked_perf, + child: None, + }); + } + rows.sort_by(|a, b| b.sum_ns.cmp(&a.sum_ns)); + + let total: Nanos = rows.iter().map(|r| r.sum_ns).sum(); + let threshold = Nanos((total.0 as u128 * COVERAGE_PCT as u128 / 100) as u64); + let mut covered = Nanos::ZERO; + let mut cut = rows.len(); + for (i, r) in rows.iter().enumerate() { + if covered >= threshold { + cut = i; + break; + } + covered += r.sum_ns; + } + // Folding a single row saves no space — only fold 2+. + if rows.len() - cut < 2 { + cut = rows.len(); + } + + let indent = depth * 2; + for r in &rows[..cut] { + let avg = r.sum_ns / r.count.max(1); + let count = Some(CallCount { total: r.count, per_parent: self.count }); + out.push(make_line(indent, r.label.as_ref(), avg, count, &r.perf, meta)); + if let Some(child) = r.child { + child.render_children(depth + 1, meta, out); + } + } + if cut < rows.len() { + let rem_sum: Nanos = rows[cut..].iter().map(|r| r.sum_ns).sum(); + let rem_avg = rem_sum / self.count.max(1); + let label = format!("... ({} more)", rows.len() - cut); + out.push(make_line(indent, &label, rem_avg, None, &PerfSample::default(), meta)); + } + } +} + +struct Row<'a> { + label: Cow<'a, str>, + sum_ns: Nanos, + count: u64, + perf: PerfSample, + /// `None` for the synthetic `untracked` row — no subtree to recurse into. + child: Option<&'a Node>, +} + +struct CallCount { + total: u64, + per_parent: u64, +} + +/// A rendered row kept as separate parts so [`render_aligned`] can right-align +/// the ns and counter columns past the widest label that precedes them. +struct Line { + name: String, + avg: String, + suffix: String, + counters: Option, +} + +fn make_line( + indent: usize, + label: &str, + avg: Nanos, + count: Option, + perf: &PerfSample, + meta: &FlamegraphMeta, +) -> Line { + let name = format!("{blank:indent$}{label}", blank = ""); + let avg = avg.to_string(); + let suffix = match &count { + None => String::new(), + // Root, or a parent that ran once → avg == total; show a plain count. + Some(c) if c.per_parent <= 1 => format!(" ×{}", c.total), + Some(c) => { + let per = if c.total % c.per_parent == 0 { + (c.total / c.per_parent).to_string() + } else { + format!("{:.1}", c.total as f64 / c.per_parent as f64) + }; + format!(" ×{per} ({} total)", c.total) + } + }; + let counters = counter_text(perf, count.map_or(0, |c| c.total), meta); + Line { name, avg, suffix, counters } +} + +/// Per-call counter columns: each non-IPC-input event as `N label/call`, then +/// IPC derived from the instructions/cycles slots when both were measured. +/// `None` when no counters were collected (perf off, or a synthetic row). +fn counter_text(perf: &PerfSample, calls: u64, meta: &FlamegraphMeta) -> Option { + if calls == 0 || perf.vals.iter().all(|&v| v == 0) { + return None; + } + let schema = &meta.schema; + let is_ipc_input = |label: &str| matches!(label, "instructions" | "cpu-cycles" | "cycles"); + let mut parts: Vec = schema + .iter() + .enumerate() + .filter(|(_, e)| !is_ipc_input(&e.label)) + .map(|(i, e)| format!("{:>9} {}/call", perf.vals[i] / calls, e.label)) + .collect(); + let ipc = schema.ipc(&perf.vals); + if ipc > 0.0 { + parts.push(format!("ipc {ipc:.2}")); + } + (!parts.is_empty()).then(|| parts.join(" ")) +} + +fn width(s: &str) -> usize { + s.chars().count() +} + +fn render_aligned(lines: &[Line]) -> String { + let name_w = lines.iter().map(|l| width(&l.name)).max().unwrap_or(0); + let prefix = |l: &Line| { + format!("{name:10}{suffix}", name = l.name, avg = l.avg, suffix = l.suffix) + }; + let counter_w = lines + .iter() + .filter(|l| l.counters.is_some()) + .map(|l| width(&prefix(l))) + .max() + .map_or(0, |w| w + 3); + let mut out = String::new(); + for l in lines { + let p = prefix(l); + out.push_str(&p); + if let Some(c) = &l.counters { + for _ in 0..counter_w.saturating_sub(width(&p)) { + out.push(' '); + } + out.push_str(c); + } + out.push('\n'); + } + out +} diff --git a/crates/metrics/src/flamegraph_timer/collect.rs b/crates/metrics/src/flamegraph_timer/collect.rs deleted file mode 100644 index 13660787..00000000 --- a/crates/metrics/src/flamegraph_timer/collect.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::{ - cell::RefCell, - collections::HashMap, - sync::{Mutex, OnceLock}, -}; - -use flux::timing::Instant; - -use crate::perf::{self, PerfSample}; - -struct Frame { - name: &'static str, - start: Instant, - /// Total tracked ns of timed children that have already exited under - /// this frame — subtracted to get the frame's untracked time. - total_tracked_ns: u64, - /// Counter snapshot at entry; `None` when the `perf` feature is off or - /// `perf_event_open` is unavailable. Mirrors `start`/`total_tracked_ns` - /// for the hardware-counter dimension. - perf_start: Option, - total_tracked_perf: PerfSample, -} - -thread_local! { - static CALL_STACK: RefCell> = const { RefCell::new(Vec::new()) }; -} - -#[derive(Default)] -pub(super) struct CallStackSamples { - pub(super) tracked_ns: Vec, - pub(super) total_untracked_ns: u64, - /// Summed entry→exit counter deltas across calls; all-zero when the - /// `perf` feature is off. `total_untracked_perf` excludes children, like - /// `total_untracked_ns`. - pub(super) tracked_perf: PerfSample, - pub(super) total_untracked_perf: PerfSample, -} - -pub(super) struct CallStackTiming { - pub(super) call_stack: Vec<&'static str>, - pub(super) samples: CallStackSamples, -} - -/// Cross-thread aggregation map: every thread's `stack_exit` merges its -/// finished frame into this one shared map, so the `Mutex` is required even -/// though the live call stack is thread-local. -type SharedPathSink = Mutex, CallStackSamples>>; -static SINK: OnceLock = OnceLock::new(); - -pub fn enable() { - let _ = SINK.set(Mutex::new(HashMap::new())); -} - -pub(super) fn drain() -> Vec { - let Some(sink) = SINK.get() else { return Vec::new() }; - std::mem::take(&mut *sink.lock().unwrap()) - .into_iter() - .map(|(call_stack, samples)| CallStackTiming { call_stack, samples }) - .collect() -} - -pub(crate) fn is_enabled() -> bool { - SINK.get().is_some() -} - -#[inline] -pub(crate) fn stack_enter(name: &'static str, start: Instant) { - if SINK.get().is_some() { - let perf_start = perf::read(); - CALL_STACK.with(|s| { - s.borrow_mut().push(Frame { - name, - start, - total_tracked_ns: 0, - perf_start, - total_tracked_perf: PerfSample::default(), - }) - }); - } -} - -#[inline] -pub(crate) fn stack_exit() { - let Some(sink) = SINK.get() else { return }; - let perf_end = perf::read(); - CALL_STACK.with(|s| { - let mut stack = s.borrow_mut(); - let Some(frame) = stack.pop() else { return }; - let tracked_ns = frame.start.elapsed().as_nanos() as u64; - let untracked_ns = tracked_ns.saturating_sub(frame.total_tracked_ns); - - // Counter delta over the call; zero when either snapshot is absent. - let tracked_perf = match (perf_end, frame.perf_start) { - (Some(end), Some(start)) => end.delta(&start), - _ => PerfSample::default(), - }; - let untracked_perf = tracked_perf.delta(&frame.total_tracked_perf); - - let path: Vec<&'static str> = stack.iter().map(|f| f.name).chain([frame.name]).collect(); - if let Some(parent) = stack.last_mut() { - parent.total_tracked_ns += tracked_ns; - parent.total_tracked_perf = parent.total_tracked_perf.add(&tracked_perf); - } - - let mut map = sink.lock().unwrap(); - let entry = map.entry(path).or_default(); - entry.tracked_ns.push(tracked_ns); - entry.total_untracked_ns += untracked_ns; - entry.tracked_perf = entry.tracked_perf.add(&tracked_perf); - entry.total_untracked_perf = entry.total_untracked_perf.add(&untracked_perf); - }); -} diff --git a/crates/metrics/src/flamegraph_timer/drainer.rs b/crates/metrics/src/flamegraph_timer/drainer.rs new file mode 100644 index 00000000..8afdb77e --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/drainer.rs @@ -0,0 +1,159 @@ +//! The live ring drainer: discovers a run's per-thread shmem rings, drains each +//! into a retained heap buffer, and folds the lot into a call tree. The two +//! consumers (the in-process harness and the surfer) drive the same engine +//! and differ only in the [`FrameResolver`] they poll with. Each ring is read +//! from slot 0, so a surfer attaching mid-run reports loss for the +//! pre-attach prefix the producer has already overwritten. +//! +//! Frame names are resolved as marks are drained, not at `fold` time: an +//! surfer reads them from the live producer's binary, which may be gone by +//! the time anyone folds the retained marks into stats. + +use std::collections::{HashMap, hash_map::Entry}; + +use flux::communication::{ + ReadError, + queue::{ConsumerBare, Queue}, +}; +use rustc_hash::FxHashMap; + +use crate::{ + Schema, + flamegraph_timer::{ + aggregator::Aggregator, + mark::{Frame, Mark}, + queue_dir::{QueueDir, RingEntry}, + report::{FlamegraphMeta, TimingStats}, + symbols::FrameResolver, + }, + perf::PerfSample, +}; + +pub(super) struct EventsDrainer { + dir: QueueDir, + threads: HashMap, + meta: FlamegraphMeta, +} + +impl EventsDrainer { + pub(super) fn new(dir: QueueDir, schema: Schema) -> Self { + let meta = FlamegraphMeta { names: FxHashMap::default(), schema }; + Self { dir, threads: HashMap::new(), meta } + } + + pub(super) fn poll(&mut self, resolver: &impl FrameResolver) { + for thread in self.dir.event_threads() { + if let Entry::Vacant(slot) = self.threads.entry(thread) { + if let Some(thread) = ThreadDrainer::open(&self.dir, slot.key()) { + slot.insert(thread); + } + } + } + for thread in self.threads.values_mut() { + thread.poll(&mut self.meta.names, resolver); + } + } + + pub(super) fn fold(&self) -> TimingStats { + let mut aggregator = Aggregator::default(); + let mut lost = false; + for thread in self.threads.values() { + lost |= thread.lost(); + thread.fold_into(&mut aggregator); + } + lost |= aggregator.desynced(); + TimingStats::from_timings(aggregator.into_paths(), self.meta.clone(), lost) + } +} + +/// One producing thread's marks and (with `perf`) counter samples, read in +/// lockstep so the aggregator can pair them by index. +struct ThreadDrainer { + marks: QueueDrainer, + /// `None` when the producer was built without `perf`: the marks ring exists + /// but the perf ring never does, so the thread folds timing-only. The + /// `perf` feature gates only the producer's counter reads, never this + /// drain. + perf: Option>, + /// Marks in `marks.out[..resolved]` have already had their frame names + /// resolved, so each `poll` only resolves the freshly drained tail. + resolved: usize, +} + +impl ThreadDrainer { + fn open(dir: &QueueDir, token: &str) -> Option { + Some(Self { + marks: QueueDrainer::::open(dir, token)?, + perf: QueueDrainer::::open(dir, token), + resolved: 0, + }) + } + + fn poll(&mut self, names: &mut FxHashMap, resolver: &impl FrameResolver) { + self.marks.poll(); + if let Some(perf) = &mut self.perf { + perf.poll(); + } + // The frame name lives in the producer's binary; `len` says how many + // bytes to read. Resolve each id once, while the producer is still + // alive to read it from. + for mark in &self.marks.out[self.resolved..] { + if let Frame::Open { id, len } = mark.frame { + names.entry(id).or_insert_with(|| { + resolver.resolve(id, len).unwrap_or_else(|| format!("unknown_{id}")) + }); + } + } + self.resolved = self.marks.out.len(); + } + + fn fold_into(&self, aggregator: &mut Aggregator) { + let counters = self.perf.as_ref().map_or(&[][..], |p| &p.out); + aggregator.fold_thread(&self.marks.out, counters); + } + + /// Whether either ring lost marks, leaving the fold incomplete. + fn lost(&self) -> bool { + self.marks.lost || self.perf.as_ref().is_some_and(|p| p.lost) + } +} + +/// A live consumer that drains one ring into a growing heap `Vec`, polled +/// repeatedly across a run. +struct QueueDrainer { + consumer: ConsumerBare, + out: Vec, + /// Set once a `SpedPast` showed the producer overwrote slots we hadn't read + /// — marks were lost, so the fold built from `out` is incomplete. An + /// surfer attaching mid-run latches this on its first poll: the + /// pre-attach prefix is genuinely gone. + lost: bool, +} + +impl QueueDrainer { + fn open(dir: &QueueDir, token: &str) -> Option { + let queue = Queue::::try_open_shared(dir.path::(token)).ok()?; + // Each ring is its own queue with its own flux group table and the surfer + // is its only consumer, so the static per-type prefix works as the group + // label — no per-token string to allocate and leak. + let mut consumer = ConsumerBare::new(queue, T::PREFIX); + + // Collaborative so the cursor starts at the beginning of the ring. + consumer.try_init_collaborative(); + Some(Self { consumer, out: Vec::with_capacity(1024), lost: false }) + } + + fn poll(&mut self) { + let mut scratch = T::EMPTY; + loop { + match self.consumer.try_consume(&mut scratch) { + Ok(()) => self.out.push(scratch), + Err(ReadError::Empty) => break, + Err(ReadError::SpedPast) => { + self.consumer.recover_after_error(); + self.lost = true; + } + } + } + } +} diff --git a/crates/metrics/src/flamegraph_timer/mark.rs b/crates/metrics/src/flamegraph_timer/mark.rs new file mode 100644 index 00000000..4b2d4f7a --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/mark.rs @@ -0,0 +1,25 @@ +#[repr(C)] +#[derive(Clone, Copy)] +pub(crate) enum Frame { + // id - pointer to function name in .rodata + // len - length of the function name in bytes + Open { id: u64, len: u32 }, + Close { id: u64 }, +} + +impl Frame { + pub(crate) fn open(name: &'static str) -> Self { + Frame::Open { id: name.as_ptr() as u64, len: name.len() as u32 } + } + + pub(crate) fn close(name: &'static str) -> Self { + Frame::Close { id: name.as_ptr() as u64 } + } +} + +#[repr(C)] +#[derive(Clone, Copy)] +pub(crate) struct Mark { + pub(crate) frame: Frame, + pub(crate) ts: u64, +} diff --git a/crates/metrics/src/flamegraph_timer/mod.rs b/crates/metrics/src/flamegraph_timer/mod.rs index d9acc177..61d69fb8 100644 --- a/crates/metrics/src/flamegraph_timer/mod.rs +++ b/crates/metrics/src/flamegraph_timer/mod.rs @@ -1,8 +1,20 @@ -//! Capture #[timed] call trees in-process and render them as a call tree or -//! JSON. +//! Capture #[timed] call trees and render them as a call tree or JSON. Marks +//! are produced into per-thread shmem rings; the call tree is folded either +//! in-process (the perf harness) or by a surfer reading the rings of a +//! running silver ([`FlamegraphReader`]). -pub mod collect; +mod aggregator; +mod call_tree; +mod drainer; +mod mark; +mod names; +mod producer; +mod queue_dir; +mod reader; pub mod report; +mod symbols; -pub use collect::enable; -pub(crate) use collect::{is_enabled, stack_enter, stack_exit}; +pub(crate) use mark::Frame; +pub(crate) use producer::record; +pub use queue_dir::enable_surfer; +pub use reader::{FlamegraphReader, LocalReader}; diff --git a/crates/metrics/src/flamegraph_timer/names.rs b/crates/metrics/src/flamegraph_timer/names.rs new file mode 100644 index 00000000..aca8482e --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/names.rs @@ -0,0 +1,70 @@ +//! Format a resolved `#[timed]` frame name for display and identity: strip the +//! module path to a leaf, and unwrap the `__TimedTy` method marker into a +//! `fn` label. Shared by the stats model (leaf matching) and the +//! call-tree renderer. + +use std::borrow::Cow; + +const MARK: &str = "::__TimedTy<"; + +/// A plain `#[timed]` free function is `module::path::fn`, displayed as the +/// trailing `fn`. A `#[timed]` method embeds its receiver as a +/// `…::fn::__TimedTy` marker (so each monomorphization is a +/// distinct frame a string-keyed sink could otherwise not tell apart), +/// unwrapped here into `fn`. Plain frames stay borrowed — the threshold +/// gauges match on these and must be unaffected. +pub(super) fn leaf_name(qualified: &str) -> Cow<'_, str> { + let Some(at) = qualified.find(MARK) else { + return Cow::Borrowed(leaf(qualified)); + }; + let func = leaf(&qualified[..at]); + let rest = &qualified[at + MARK.len()..]; + let ty = rest.strip_suffix('>').unwrap_or(rest); + Cow::Owned(format!("{func}<{}>", short_type_name(ty))) +} + +fn leaf(path: &str) -> &str { + path.rsplit("::").next().unwrap_or(path) +} + +/// Shorten a fully-qualified `type_name` for display: keep only the leaf of +/// each `::`-path and drop lifetime arguments, so +/// `crate::col::ColumnGroup<'_, crate::col::Balances>` renders as +/// `ColumnGroup`. (Same job as `disqualified::ShortName` / `tynm`, +/// inlined to keep this low-level crate dependency-free.) +fn short_type_name(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + let mut seg = 0; // start of the current path segment within `out` + let bytes = s.as_bytes(); + let mut i = 0; + while i < s.len() { + match bytes[i] { + b'\'' => { + i += 1; + while i < s.len() && (bytes[i].is_ascii_alphanumeric() || bytes[i] == b'_') { + i += 1; + } + if s[i..].starts_with(", ") { + i += 2; + } else if i < s.len() && bytes[i] == b',' { + i += 1; + } + } + b':' if s[i..].starts_with("::") => { + out.truncate(seg); // everything since `seg` was a module prefix + i += 2; + } + b'<' | b'>' | b',' | b' ' => { + out.push(bytes[i] as char); + i += 1; + seg = out.len(); + } + _ => { + let ch = s[i..].chars().next().unwrap(); + out.push(ch); + i += ch.len_utf8(); + } + } + } + out +} diff --git a/crates/metrics/src/flamegraph_timer/producer.rs b/crates/metrics/src/flamegraph_timer/producer.rs new file mode 100644 index 00000000..24248543 --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/producer.rs @@ -0,0 +1,58 @@ +use std::sync::OnceLock; + +use flux::{communication::queue::Producer, timing::Instant}; + +use crate::flamegraph_timer::{ + mark::{Frame, Mark}, + queue_dir::QueueDir, +}; +#[cfg(feature = "perf")] +use crate::perf::{PerfSample, read}; + +thread_local! { + static PRODUCERS: OnceLock = const { OnceLock::new() }; +} + +struct Producers { + marks: Producer, + #[cfg(feature = "perf")] + perf: Producer, +} + +impl Producers { + fn for_current_thread() -> Self { + let dir = QueueDir::open(); + let token = thread_token(); + Producers { + marks: Producer::from(dir.ring::(&token)), + #[cfg(feature = "perf")] + perf: Producer::from(dir.ring::(&token)), + } + } + + fn push(&self, frame: Frame) { + // `Producer` is `Copy`; the thread-local stores it behind a shared `&`, + // so produce through a local copy of the cheap handle. + let ts = Instant::now().0; + let mut marks = self.marks; + marks.produce(&Mark { frame, ts }); + #[cfg(feature = "perf")] + { + let mut perf = self.perf; + perf.produce(&read().unwrap_or_default()); + } + } +} + +fn thread_token() -> String { + let thread = std::thread::current(); + match thread.name() { + Some(name) => name.to_owned(), + None => format!("{:?}", thread.id()), + } +} + +#[inline] +pub(crate) fn record(frame: Frame) { + PRODUCERS.with(|cell| cell.get_or_init(Producers::for_current_thread).push(frame)); +} diff --git a/crates/metrics/src/flamegraph_timer/queue_dir.rs b/crates/metrics/src/flamegraph_timer/queue_dir.rs new file mode 100644 index 00000000..cdb4e08d --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/queue_dir.rs @@ -0,0 +1,137 @@ +use std::path::PathBuf; + +use flux::{ + communication::{ + cleanup::{cleanup_flink, is_pid_alive}, + queue::{Queue, QueueType}, + }, + utils::directories::{local_share_dir, shmem_dir_queues_with_base}, +}; + +use crate::{ + Schema, + flamegraph_timer::mark::{Frame, Mark}, + perf::{MAX_EVENTS, PerfSample}, +}; + +/// Each ring element type carries its own ring name and empty value, so the +/// two can't be mismatched at a call site. +pub(super) trait RingEntry: Copy { + const PREFIX: &'static str; + const EMPTY: Self; +} + +impl RingEntry for Mark { + const PREFIX: &'static str = "events"; + const EMPTY: Self = Mark { frame: Frame::Close { id: 0 }, ts: 0 }; +} + +impl RingEntry for PerfSample { + const PREFIX: &'static str = "perf-events"; + const EMPTY: Self = PerfSample { vals: [0; MAX_EVENTS] }; +} + +/// A background reader drains each ring continuously, so a ring only buffers +/// the marks produced between polls, not the whole run — hence small. +const RING_CAPACITY: usize = 1 << 14; + +/// The shmem dir holding this run's per-thread timing rings. +pub(super) struct QueueDir(PathBuf); + +impl QueueDir { + pub(super) fn open() -> Self { + Self::open_app(crate::TIMING.app()) + } + + pub(super) fn open_app(app: &str) -> Self { + let dir = shmem_dir_queues_with_base(local_share_dir(), app); + let _ = std::fs::create_dir_all(&dir); + Self(dir) + } + + /// Publish our pid so a surfer can attach to this run. + pub(super) fn write_pid(&self) { + let _ = std::fs::write(self.0.join("pid"), std::process::id().to_string()); + } + + /// Publish the perf event names this run measures so a surfer labels its + /// positional samples by our vocabulary, not its own `SILVER_PERF_EVENTS`. + /// Removed when the run has no perf, so a surfer can't fold a prior perf + /// run's stale names against samples that no longer exist. + pub(super) fn publish_perf_schema(&self) { + let path = self.0.join("perf_schema"); + #[cfg(feature = "perf")] + { + let names: Vec<&str> = Schema::local().iter().map(|e| e.label.as_str()).collect(); + let _ = std::fs::write(path, names.join(",")); + } + #[cfg(not(feature = "perf"))] + let _ = std::fs::remove_file(path); + } + + /// The vocabulary this run published, if it enabled perf. + pub(super) fn perf_schema(&self) -> Option { + std::fs::read_to_string(self.0.join("perf_schema")).ok().map(|s| Schema::parse(&s)) + } + + /// The published pid, but only if its process is still alive: the pid file + /// and stale `events-*` rings outlive a dead run, so a consumer must not + /// fold them as live data. + pub(super) fn live_pid(&self) -> Option { + let pid: u32 = std::fs::read_to_string(self.0.join("pid")).ok()?.trim().parse().ok()?; + is_pid_alive(pid).then_some(pid) + } + + pub(super) fn path(&self, token: &str) -> PathBuf { + self.0.join(format!("{}-{token}", T::PREFIX)) + } + + fn entries(&self) -> impl Iterator { + std::fs::read_dir(&self.0).into_iter().flatten().flatten() + } + + /// Unlink a prior run's rings before producers create theirs, so the reader + /// doesn't fold a vanished thread's stale ring as live data. + pub(super) fn clear_stale(&self) { + for entry in self.entries() { + let name = entry.file_name(); + let name = name.to_string_lossy(); + let is_ring = name.starts_with(Mark::PREFIX) || name.starts_with(PerfSample::PREFIX); + if is_ring { + let _ = cleanup_flink(&entry.path()); + } + } + } + + pub(super) fn ring(&self, token: &str) -> Queue { + let path = self.path::(token); + // Discard any leftover backing under this stable name first: a crashed + // run's ring or another `silver` process must never be shared, or two + // producers would write the same ring and corrupt each other. + let _ = cleanup_flink(&path); + Queue::create_or_open_shared(path, RING_CAPACITY, QueueType::SPMC) + } + + /// The `` of every marks ring in the dir; perf rings are excluded. + pub(super) fn event_threads(&self) -> Vec { + self.entries() + .filter_map(|e| { + e.file_name() + .to_string_lossy() + .strip_prefix(Mark::PREFIX) + .and_then(|rest| rest.strip_prefix('-')) + .map(str::to_owned) + }) + .collect() + } +} + +/// Enable `#[timed]` production and publish this run so a surfer can attach. +/// Call once at startup. +pub fn enable_surfer() { + crate::TIMING.set_enabled(); + let dir = QueueDir::open(); + dir.clear_stale(); + dir.publish_perf_schema(); + dir.write_pid(); +} diff --git a/crates/metrics/src/flamegraph_timer/reader.rs b/crates/metrics/src/flamegraph_timer/reader.rs new file mode 100644 index 00000000..fd945728 --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/reader.rs @@ -0,0 +1,190 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + thread::{self, JoinHandle}, + time::Duration, +}; + +use crate::{ + Schema, + flamegraph_timer::{ + drainer::EventsDrainer, + queue_dir::{QueueDir, enable_surfer}, + report::TimingStats, + symbols::{InProcessSymbolsResolver, RemoteSymbolsResolver}, + }, +}; + +/// Reads a *running* silver's `#[timed]` marks from its shmem rings and folds +/// them into a call tree on demand. The producer is a different process, so +/// names are read from its on-disk binary rather than dereferenced. +/// +/// Polling is cumulative — every mark since attach is retained — so the fold +/// covers the whole run, and memory grows with the marks produced. +pub struct FlamegraphReader { + drainer: EventsDrainer, + resolver: RemoteSymbolsResolver, + pid: u32, +} + +impl FlamegraphReader { + /// The pid of the *live* producer under `app`, or `None` if none has + /// produced yet or the last one exited. Its change across calls signals a + /// restart, so a consumer can re-[`attach`](Self::attach). + pub fn published_pid(app: &str) -> Option { + QueueDir::open_app(app).live_pid() + } + + /// Attaches to the live silver registered under `app`, or `None` if none is + /// running. + pub fn attach(app: &str) -> Option { + let dir = QueueDir::open_app(app); + let pid = dir.live_pid()?; + let schema = dir.perf_schema().unwrap_or_else(Schema::empty); + Some(Self { + drainer: EventsDrainer::new(dir, schema), + resolver: RemoteSymbolsResolver::new(pid), + pid, + }) + } + + pub fn pid(&self) -> u32 { + self.pid + } + + pub fn poll(&mut self) { + self.drainer.poll(&self.resolver); + } + + pub fn stats(&self) -> TimingStats { + self.drainer.fold() + } +} + +/// The perf harness's in-process reader: enables `#[timed]` production and +/// drains every thread's ring on a background thread until +/// [`collect`](Self::collect) stops it and folds the whole run. +pub struct LocalReader { + stop: Arc, + handle: JoinHandle, +} + +impl LocalReader { + pub fn start() -> Self { + enable_surfer(); + let stop = Arc::new(AtomicBool::new(false)); + let handle = { + let stop = stop.clone(); + thread::Builder::new() + .name("flamegraph-reader".to_owned()) + .spawn(move || Self::run(stop)) + .expect("spawn flamegraph reader") + }; + Self { stop, handle } + } + + /// Stop the background reader and fold the whole run into stats. + pub fn collect(self) -> TimingStats { + self.stop.store(true, Ordering::Release); + let drainer = self + .handle + .join() + .unwrap_or_else(|_| EventsDrainer::new(QueueDir::open(), Schema::local().clone())); + drainer.fold() + } + + fn run(stop: Arc) -> EventsDrainer { + // Boots with the producer, so it reads the whole run from slot 0 and any + // loss is a genuine overrun, not a pre-attach gap. + let mut drainer = EventsDrainer::new(QueueDir::open(), Schema::local().clone()); + loop { + drainer.poll(&InProcessSymbolsResolver); + // Poll once more after stop is observed: producers have finished by + // then, so this pass flushes their tails. + if stop.load(Ordering::Acquire) { + break; + } + thread::sleep(Duration::from_millis(1)); + } + drainer + } +} + +#[cfg(test)] +mod tests { + use std::thread; + + use super::*; + use crate::flamegraph_timer::{Frame, record}; + + #[test] + fn drain_discovers_every_thread_ring() { + let _guard = crate::test_shmem::ShmemGuard::new(); + let reader = LocalReader::start(); + + let spawn = |tag: &'static str, reps: usize| { + thread::Builder::new() + .name(format!("drainer-{tag}")) + .spawn(move || { + let (outer, inner) = ("outer", "inner"); + for _ in 0..reps { + record(Frame::open(outer)); + record(Frame::open(inner)); + record(Frame::close(inner)); + record(Frame::close(outer)); + } + }) + .unwrap() + }; + + spawn("a", 3).join().unwrap(); + spawn("b", 5).join().unwrap(); + + let stats = reader.collect(); + assert!(!stats.missed_events(), "test rings are larger than the few marks produced"); + assert_eq!(stats.aggregate_leaf("outer").1, 8, "both threads' outer frames"); + assert_eq!(stats.aggregate_leaf("inner").1, 8, "both threads' inner frames"); + } + + /// The surfer path: produce marks, then fold them through + /// [`FlamegraphReader`], which resolves names from the producer's on-disk + /// binary (here our own process) rather than by dereferencing the id. + #[test] + fn flamegraph_reader_resolves_names_cross_process() { + let _guard = crate::test_shmem::ShmemGuard::new(); + enable_surfer(); + + thread::Builder::new() + .name("surfer-producer".to_owned()) + .spawn(|| { + let (alpha, beta) = ("alpha", "beta"); + for _ in 0..4 { + record(Frame::open(alpha)); + record(Frame::open(beta)); + record(Frame::close(beta)); + record(Frame::close(alpha)); + } + }) + .unwrap() + .join() + .unwrap(); + + let mut reader = FlamegraphReader::attach(crate::TIMING.app()).expect("pid published"); + reader.poll(); + let stats = reader.stats(); + + assert!(!stats.missed_events()); + assert_eq!(stats.aggregate_leaf("alpha").1, 4, "name resolved from on-disk binary"); + assert_eq!(stats.aggregate_leaf("beta").1, 4); + + // The producer is gone; re-polling the now-static ring must not invent + // marks, so the cumulative fold stays put (no perpetual churn at idle). + reader.poll(); + reader.poll(); + let again = reader.stats(); + assert_eq!(again.aggregate_leaf("alpha").1, 4, "re-poll of a static ring changed the fold"); + assert_eq!(again.aggregate_leaf("beta").1, 4); + } +} diff --git a/crates/metrics/src/flamegraph_timer/report.rs b/crates/metrics/src/flamegraph_timer/report.rs index ee9d03b6..00f532e3 100644 --- a/crates/metrics/src/flamegraph_timer/report.rs +++ b/crates/metrics/src/flamegraph_timer/report.rs @@ -1,88 +1,129 @@ -//! Summarise drained #[timed] call paths into a deterministic call tree; each -//! path is the measured timed-frame chain, so containment holds by -//! construction. - -use std::{borrow::Cow, collections::HashMap}; +//! Per-path stats over drained `#[timed]` call paths. Each path is a measured +//! timed-frame chain, so a child's time is contained in its parent's. use flux::timing::Nanos; +use rustc_hash::FxHashMap; + +use crate::{ + Schema, + flamegraph_timer::{aggregator::CallStackSamples, call_tree, names::leaf_name}, + perf::PerfSample, +}; -use crate::{flamegraph_timer::collect::drain, perf::PerfSample, slot}; +pub(super) struct PathStat { + pub(super) path: Vec, + pub(super) metrics: PathMetrics, +} #[derive(serde::Serialize)] -struct PathStat { - #[serde(serialize_with = "join_path")] - path: Vec, - count: u64, +pub(super) struct PathMetrics { + pub(super) count: u64, tracked_avg_ns: Nanos, tracked_p50_ns: Nanos, tracked_p99_ns: Nanos, tracked_max_ns: Nanos, - tracked_sum_ns: Nanos, - total_untracked_ns: Nanos, - /// Summed counter deltas over all calls on this path, and the same - /// excluding timed children (mirroring `total_untracked_ns`). All-zero - /// unless built with the `perf` feature (and `perf_event_open` permitted). - tracked_perf: PerfSample, - untracked_perf: PerfSample, + pub(super) tracked_sum_ns: Nanos, + pub(super) total_untracked_ns: Nanos, + /// Summed counter deltas over all calls on this path; `untracked_perf` + /// excludes timed children, like `total_untracked_ns`. All-zero unless the + /// `perf` feature is built and `perf_event_open` is permitted. + pub(super) tracked_perf: PerfSample, + pub(super) untracked_perf: PerfSample, +} + +impl PathMetrics { + fn from_samples(samples: CallStackSamples) -> Self { + let mut times = samples.tracked_ns; + times.sort_unstable(); + let count = times.len() as u64; + let sum: u128 = times.iter().map(|&x| x as u128).sum(); + Self { + count, + tracked_avg_ns: if count > 0 { + Nanos((sum / count as u128) as u64) + } else { + Nanos::ZERO + }, + tracked_p50_ns: Nanos(percentile(×, 0.50)), + tracked_p99_ns: Nanos(percentile(×, 0.99)), + tracked_max_ns: Nanos(times.last().copied().unwrap_or(0)), + tracked_sum_ns: Nanos(u64::try_from(sum).unwrap_or(u64::MAX)), + total_untracked_ns: Nanos(samples.total_untracked_ns), + tracked_perf: samples.tracked_perf, + untracked_perf: samples.total_untracked_perf, + } + } } -fn join_path(path: &[String], s: S) -> Result { - s.serialize_str(&path.join(";")) +#[derive(serde::Serialize)] +struct PathStatJson<'a> { + path: String, + #[serde(flatten)] + metrics: &'a PathMetrics, } -/// Summarised `#[timed]` call paths for one run. Build with -/// [`TimingStats::collect`], then render to a call tree or JSON. -pub struct TimingStats(Vec); +/// A fold's render/match labels: frame-id → resolved name, and the perf event +/// vocabulary the counter samples are positional in. +#[derive(Clone)] +pub(crate) struct FlamegraphMeta { + pub names: FxHashMap, + pub schema: Schema, +} + +/// Per-path `#[timed]` stats for one run. Frame ids stay opaque in `paths`; +/// `meta` resolves them only at the render/match boundary, so the threshold +/// gauges and the call tree share one model. +pub struct TimingStats { + paths: Vec, + meta: FlamegraphMeta, + lost: bool, +} impl TimingStats { - pub fn collect() -> Self { - let mut paths: Vec<_> = drain() + pub(crate) fn from_timings( + paths: FxHashMap, CallStackSamples>, + meta: FlamegraphMeta, + lost: bool, + ) -> Self { + let mut paths: Vec = paths .into_iter() - .map(|timing| { - let mut samples = timing.samples.tracked_ns; - samples.sort_unstable(); - let count = samples.len() as u64; - let sum: u128 = samples.iter().map(|&x| x as u128).sum(); - PathStat { - path: timing.call_stack.iter().map(|name| name.to_string()).collect(), - count, - tracked_avg_ns: if count > 0 { - Nanos((sum / count as u128) as u64) - } else { - Nanos::ZERO - }, - tracked_p50_ns: Nanos(percentile(&samples, 0.50)), - tracked_p99_ns: Nanos(percentile(&samples, 0.99)), - tracked_max_ns: Nanos(samples.last().copied().unwrap_or(0)), - tracked_sum_ns: Nanos(u64::try_from(sum).unwrap_or(u64::MAX)), - total_untracked_ns: Nanos(timing.samples.total_untracked_ns), - tracked_perf: timing.samples.tracked_perf, - untracked_perf: timing.samples.total_untracked_perf, - } - }) + .map(|(path, samples)| PathStat { path, metrics: PathMetrics::from_samples(samples) }) .collect(); - paths.sort_by(|a, b| a.path.cmp(&b.path)); - Self(paths) + // Ids are ASLR'd pointers; sort on resolved names so JSON output is + // deterministic across runs. + paths.sort_by(|a, b| { + a.path.iter().map(|id| &meta.names[id]).cmp(b.path.iter().map(|id| &meta.names[id])) + }); + Self { paths, meta, lost } + } + + /// The mark stream was unreliable — a ring wrapped before the reader + /// drained it (marks lost), or a close popped a non-matching open (stream + /// desync, e.g. a producer crash/restart). Either way these stats are + /// incomplete and any gate built on them is invalid. + pub fn missed_events(&self) -> bool { + self.lost + } + + fn matching_paths<'a>(&'a self, leaf: &'a str) -> impl Iterator { + self.paths.iter().filter(move |s| { + s.path.last().is_some_and(|id| leaf_name(&self.meta.names[id]).as_ref() == leaf) + }) } /// Sum tracked time + call count across every path whose leaf is /// `leaf` — for normalising a cross-cutting function (e.g. /// `hash_validators`) against a workload unit regardless of call site. pub fn aggregate_leaf(&self, leaf: &str) -> (Nanos, u64) { - self.0 - .iter() - .filter(|s| s.path.last().is_some_and(|n| leaf_name(n).as_ref() == leaf)) - .fold((Nanos::ZERO, 0), |(sum, cnt), s| (sum + s.tracked_sum_ns, cnt + s.count)) + self.matching_paths(leaf).fold((Nanos::ZERO, 0), |(sum, cnt), s| { + (sum + s.metrics.tracked_sum_ns, cnt + s.metrics.count) + }) } /// Slowest single tracked sample across every path whose leaf is `leaf` /// — the worst-case call, regardless of call site. `None` if no such path. pub fn aggregate_leaf_max(&self, leaf: &str) -> Option { - self.0 - .iter() - .filter(|s| s.path.last().is_some_and(|n| leaf_name(n).as_ref() == leaf)) - .map(|s| s.tracked_max_ns) - .max() + self.matching_paths(leaf).map(|s| s.metrics.tracked_max_ns).max() } /// Median (p50) tracked sample for `leaf`. Percentiles aren't combinable @@ -90,115 +131,27 @@ impl TimingStats { /// for a single-call-site leaf (e.g. the top-level `apply_block`). `None` /// if no such path. pub fn aggregate_leaf_p50(&self, leaf: &str) -> Option { - self.0 - .iter() - .filter(|s| s.path.last().is_some_and(|n| leaf_name(n).as_ref() == leaf)) - .map(|s| s.tracked_p50_ns) - .max() + self.matching_paths(leaf).map(|s| s.metrics.tracked_p50_ns).max() } - /// Indented call tree. A parent's untracked time is emitted as a - /// synthetic `untracked` sibling so children sum to the parent's - /// tracked time; low-coverage tail folded under `COVERAGE_PCT`. pub fn call_tree(&self) -> String { - let mut root = Node::default(); - for s in &self.0 { - let mut node = &mut root; - for seg in &s.path { - node = node.children.entry(seg.clone()).or_default(); - } - node.count = s.count; - node.total_untracked_ns = s.total_untracked_ns; - node.tracked_sum_ns = s.tracked_sum_ns; - node.perf = s.tracked_perf; - node.untracked_perf = s.untracked_perf; - } - - let mut lines = Vec::new(); - root.render_children(0, &mut lines); - render_aligned(&lines) + call_tree::render(&self.paths, &self.meta) } - /// Deterministic JSON `{label, paths}` — see `PathStat` for the per-path - /// schema. + /// Deterministic JSON `{label, paths}` — see `PathMetrics` for the per-path + /// schema; the path is its frame names joined by `;`. pub fn to_json(&self, label: &str) -> String { - serde_json::json!({ "label": label, "paths": &self.0 }).to_string() - } -} - -/// Display/identity leaf for a frame path segment. -/// -/// A plain `#[timed]` free-function frame is `module::path::fn` → the trailing -/// `::fn`. A `#[timed]` method frame embeds its receiver as a -/// `…::fn::__TimedTy` marker (so each monomorphization is a -/// distinct frame — the type a string-keyed sink can't otherwise tell apart); -/// here we unwrap it into a `fn` label. Plain frames stay borrowed — the -/// threshold gauges match on these and must be unaffected. -fn leaf_name(qualified: &str) -> Cow<'_, str> { - const MARK: &str = "::__TimedTy<"; - if let Some(at) = qualified.find(MARK) { - let func = qualified[..at].rsplit("::").next().unwrap_or(&qualified[..at]); - // The marker wraps exactly the receiver type; drop its closing `>`. - let ty = &qualified[at + MARK.len()..]; - let ty = ty.strip_suffix('>').unwrap_or(ty); - return Cow::Owned(format!("{func}<{}>", strip_module_paths(&strip_lifetimes(ty)))); - } - Cow::Borrowed(qualified.rsplit("::").next().unwrap_or(qualified)) -} - -/// Minimal "short type name": `type_name` is fully qualified -/// (`crate::col::ColumnGroup`); keep only the last `::` -/// segment of each path, preserving generic punctuation — yielding -/// `ColumnGroup`. (Same job as `disqualified::ShortName` / `tynm`, -/// inlined to keep this low-level crate dependency-free.) -/// -/// `rsplit("::")` alone won't do — `::` also appears inside generic args — so -/// we split on the generic punctuation first, then take each path's leaf. -fn strip_module_paths(s: &str) -> String { - let mut out = String::with_capacity(s.len()); - for chunk in s.split_inclusive(['<', '>', ',', ' ']) { - // `split_inclusive` keeps the delimiter as the chunk's last char, so a - // chunk is one path plus a trailing delimiter, e.g. `crate::mod::T<` - // (the final chunk may have none). The delimiters are ASCII, so the - // split below always lands on a char boundary. - let (path, delim) = match chunk.as_bytes().last() { - Some(b'<' | b'>' | b',' | b' ') => { - (&chunk[..chunk.len() - 1], &chunk[chunk.len() - 1..]) - } - _ => (chunk, ""), - }; - out.push_str(path.rsplit("::").next().unwrap_or(path)); - out.push_str(delim); + let paths: Vec<_> = self + .paths + .iter() + .map(|s| PathStatJson { path: self.path_name(&s.path), metrics: &s.metrics }) + .collect(); + serde_json::json!({ "label": label, "paths": paths }).to_string() } - out -} -/// Drop lifetime arguments from a `type_name` string so a receiver renders -/// `ColumnWriteView` rather than `ColumnWriteView<'_, Current>`. A -/// lifetime is `'` + ident; a following `, ` separator (the lifetime was a -/// leading generic arg) is dropped with it. -fn strip_lifetimes(s: &str) -> String { - let mut out = String::with_capacity(s.len()); - let bytes = s.as_bytes(); - let mut i = 0; - while i < s.len() { - if bytes[i] == b'\'' { - i += 1; - while i < s.len() && (bytes[i].is_ascii_alphanumeric() || bytes[i] == b'_') { - i += 1; - } - if s[i..].starts_with(", ") { - i += 2; - } else if i < s.len() && bytes[i] == b',' { - i += 1; - } - continue; - } - let ch = s[i..].chars().next().unwrap(); - out.push(ch); - i += ch.len_utf8(); + fn path_name(&self, path: &[u64]) -> String { + path.iter().map(|id| self.meta.names[id].as_str()).collect::>().join(";") } - out } fn percentile(sorted: &[u64], q: f64) -> u64 { @@ -209,201 +162,10 @@ fn percentile(sorted: &[u64], q: f64) -> u64 { sorted[idx] } -#[derive(Default)] -struct Node { - count: u64, - total_untracked_ns: Nanos, - tracked_sum_ns: Nanos, - perf: PerfSample, - untracked_perf: PerfSample, - children: HashMap, -} - -/// Children accounting for less than this fraction of a node's time are -/// folded into a single `...` row, keeping the tree focused on the -/// dominant costs. -const COVERAGE_PCT: u64 = 99; - -/// One rendered row, held as parts so [`render_aligned`] can line up the avg -/// (ns) column past the widest label and the counter column past the widest -/// prefix — both vary with label length, so neither can use a fixed column. -struct Line { - name: String, - avg: String, - suffix: String, - /// Counter text; `None` for synthetic rows and when `perf` is off. - counters: Option, -} - -fn width(s: &str) -> usize { - s.chars().count() -} - -/// Assemble rows into the tree: avg right-aligned just past the widest label, -/// then the counter column just past the widest prefix that carries counters. -fn render_aligned(lines: &[Line]) -> String { - let name_w = lines.iter().map(|l| width(&l.name)).max().unwrap_or(0); - let prefix = |l: &Line| { - format!("{name:10}{suffix}", name = l.name, avg = l.avg, suffix = l.suffix) - }; - let counter_w = lines - .iter() - .filter(|l| l.counters.is_some()) - .map(|l| width(&prefix(l))) - .max() - .map_or(0, |w| w + 3); - let mut out = String::new(); - for l in lines { - let p = prefix(l); - out.push_str(&p); - if let Some(c) = &l.counters { - for _ in 0..counter_w.saturating_sub(width(&p)) { - out.push(' '); - } - out.push_str(c); - } - out.push('\n'); - } - out -} - -impl Node { - fn render_children(&self, depth: usize, out: &mut Vec) { - if self.children.is_empty() { - return; - } - let mut rows: Vec = self - .children - .iter() - .map(|(name, c)| Row { - label: leaf_name(name), - sum_ns: c.tracked_sum_ns, - count: c.count, - perf: c.perf, - child: Some(c), - }) - .collect(); - if self.count > 0 { - rows.push(Row { - label: Cow::Borrowed("untracked"), - sum_ns: self.total_untracked_ns, - count: self.count, - perf: self.untracked_perf, - child: None, - }); - } - rows.sort_by(|a, b| b.sum_ns.cmp(&a.sum_ns)); - - let total: Nanos = rows.iter().map(|r| r.sum_ns).sum(); - let threshold = Nanos((total.0 as u128 * COVERAGE_PCT as u128 / 100) as u64); - - let mut covered = Nanos::ZERO; - let mut cut = rows.len(); - for (i, r) in rows.iter().enumerate() { - if covered >= threshold { - cut = i; - break; - } - covered += r.sum_ns; - } - // Folding a single row saves no space — only fold 2+. - if rows.len() - cut < 2 { - cut = rows.len(); - } - - let indent = depth * 2; - for r in &rows[..cut] { - let avg = r.sum_ns / r.count.max(1); - let count = Some(CallCount { total: r.count, per_parent: self.count }); - out.push(make_line(indent, r.label.as_ref(), avg, count, &r.perf, r.count)); - if let Some(child) = r.child { - child.render_children(depth + 1, out); - } - } - if cut < rows.len() { - let rem_sum: Nanos = rows[cut..].iter().map(|r| r.sum_ns).sum(); - let rem_avg = rem_sum / self.count.max(1); - let label = format!("... ({} more)", rows.len() - cut); - out.push(make_line(indent, &label, rem_avg, None, &PerfSample::default(), 0)); - } - } -} - -struct Row<'a> { - label: Cow<'a, str>, - sum_ns: Nanos, - count: u64, - /// Summed counter deltas for this frame; all-zero for the synthetic - /// `untracked`/fold rows and when the `perf` feature is off. - perf: PerfSample, - /// `None` for the synthetic `untracked` row (no subtree to recurse into). - child: Option<&'a Node>, -} - -/// `per_parent == 0` ⇒ no parent context (root row). -struct CallCount { - total: u64, - per_parent: u64, -} - -fn make_line( - indent: usize, - label: &str, - avg: Nanos, - count: Option, - perf: &PerfSample, - calls: u64, -) -> Line { - let name = format!("{blank:indent$}{label}", blank = ""); - let avg = avg.to_string(); - let suffix = match count { - None => String::new(), - // No parent (root) or parent ran once → avg == total; show plain count. - Some(c) if c.per_parent <= 1 => format!(" ×{}", c.total), - Some(c) => { - // avg calls per parent invocation; integer when divisible. - let per = if c.total % c.per_parent == 0 { - (c.total / c.per_parent).to_string() - } else { - format!("{:.1}", c.total as f64 / c.per_parent as f64) - }; - format!(" ×{per} ({} total)", c.total) - } - }; - let counters = counter_text(perf, calls); - Line { name, avg, suffix, counters } -} - -/// Per-call counter columns from the runtime [`schema`](crate::schema): each -/// non-IPC-input event as `N label/call`, then IPC derived from the -/// `instructions`/`cpu-cycles` slots if both were measured. `None` when no -/// counters were collected (the `perf` feature is off or this is a synthetic -/// row). -fn counter_text(perf: &PerfSample, calls: u64) -> Option { - if calls == 0 || perf.vals.iter().all(|&v| v == 0) { - return None; - } - let schema = crate::schema(); - let is_ipc_input = |label: &str| matches!(label, "instructions" | "cpu-cycles" | "cycles"); - let mut parts: Vec = schema - .iter() - .enumerate() - .filter(|(_, e)| !is_ipc_input(&e.label)) - .map(|(i, e)| format!("{:>9} {}/call", perf.vals[i] / calls, e.label)) - .collect(); - if let (Some(i), Some(c)) = - (slot("instructions"), slot("cpu-cycles").or_else(|| slot("cycles"))) && - perf.vals[c] > 0 - { - parts.push(format!("ipc {:.2}", perf.vals[i] as f64 / perf.vals[c] as f64)); - } - (!parts.is_empty()).then(|| parts.join(" ")) -} - #[cfg(test)] mod tests { use super::*; - use crate::{flamegraph_timer::collect::enable, timed}; + use crate::{flamegraph_timer::LocalReader, test_shmem::ShmemGuard, timed}; #[timed] fn leaf_work(spin: u64) -> u64 { @@ -421,28 +183,30 @@ mod tests { #[test] fn records_call_paths_with_self_time() { - enable(); + let _guard = ShmemGuard::new(); + let reader = LocalReader::start(); // 3 parent invocations × 2 leaf calls each = 6 leaf total. for _ in 0..3 { std::hint::black_box(parent_work()); } - let stats = TimingStats::collect(); + let stats = reader.collect(); + let names = &stats.meta.names; let parent = stats - .0 + .paths .iter() - .find(|s| s.path.len() == 1 && s.path[0].ends_with("parent_work")) + .find(|s| s.path.len() == 1 && names[&s.path[0]].ends_with("parent_work")) .expect("parent path recorded"); let child = stats - .0 + .paths .iter() - .find(|s| s.path.len() == 2 && s.path[1].ends_with("leaf_work")) + .find(|s| s.path.len() == 2 && names[&s.path[1]].ends_with("leaf_work")) .expect("nested leaf path recorded"); - assert_eq!(parent.count, 3); - assert_eq!(child.count, 6); + assert_eq!(parent.metrics.count, 3); + assert_eq!(child.metrics.count, 6); // Parent's untracked time excludes the children it called. - assert!(parent.total_untracked_ns < parent.tracked_sum_ns); + assert!(parent.metrics.total_untracked_ns < parent.metrics.tracked_sum_ns); let json = stats.to_json("test"); assert!(json.contains("\"label\":\"test\"")); diff --git a/crates/metrics/src/flamegraph_timer/symbols.rs b/crates/metrics/src/flamegraph_timer/symbols.rs new file mode 100644 index 00000000..07a5fe0e --- /dev/null +++ b/crates/metrics/src/flamegraph_timer/symbols.rs @@ -0,0 +1,113 @@ +//! Turns a frame id back into its `#[timed]` name. An id is only an address in +//! the *producing* process, so resolution is process-relative: +//! [`InProcessSymbolsResolver`] dereferences it directly, +//! [`RemoteSymbolsResolver`] reads it out of another process's on-disk binary. + +use std::{cell::RefCell, collections::HashMap, fs, os::unix::fs::FileExt, path::PathBuf}; + +/// `len` is the name's exact byte length (from the `Frame::Open` fat pointer), +/// so resolution reads a fixed span and never scans for a terminator. +pub(crate) trait FrameResolver { + fn resolve(&self, id: u64, len: u32) -> Option; +} + +pub(crate) struct InProcessSymbolsResolver; + +impl FrameResolver for InProcessSymbolsResolver { + fn resolve(&self, id: u64, len: u32) -> Option { + // SAFETY: in-process ids are `&'static str` pointers from `#[timed]` + // names; the string lives in `.rodata` for the whole run. + let bytes = unsafe { std::slice::from_raw_parts(id as *const u8, len as usize) }; + std::str::from_utf8(bytes).ok().map(str::to_owned) + } +} + +/// Resolves marks from another process. The id is an address over there, but +/// `#[timed]` names live in file-backed `.rodata`, so the bytes are in its +/// on-disk binary. `/proc//maps` gives the segment→file translation +/// (covering PIE/ASLR), and reading the file needs only `PTRACE_MODE_READ`, +/// which yama leaves open to same-uid readers — unlike the `PTRACE_MODE_ATTACH` +/// that `process_vm_readv` would require. +/// +/// Names are cached so a resolved frame outlives the producer: once its `/proc` +/// entry is gone, re-resolving would otherwise blank every name. +pub struct RemoteSymbolsResolver { + segments: Vec, + cache: RefCell>, +} + +/// `path` is `/proc//exe` for the main binary so a rebuilt-then-deleted +/// file (overlapping restart) still reads the live inode. +struct Segment { + start: u64, + end: u64, + offset: u64, + path: PathBuf, +} + +impl RemoteSymbolsResolver { + pub fn new(pid: u32) -> Self { + Self { segments: parse_maps(pid), cache: RefCell::new(HashMap::new()) } + } + + fn read(&self, id: u64, len: u32) -> Option { + let seg = self.segments.iter().find(|s| (s.start..s.end).contains(&id))?; + let mut buf = vec![0u8; len as usize]; + fs::File::open(&seg.path) + .ok()? + .read_exact_at(&mut buf, id - seg.start + seg.offset) + .ok()?; + String::from_utf8(buf).ok() + } +} + +impl FrameResolver for RemoteSymbolsResolver { + fn resolve(&self, id: u64, len: u32) -> Option { + if let Some(name) = self.cache.borrow().get(&id) { + return Some(name.clone()); + } + let name = self.read(id, len)?; + self.cache.borrow_mut().insert(id, name.clone()); + Some(name) + } +} + +fn parse_maps(pid: u32) -> Vec { + let exe = format!("/proc/{pid}/exe"); + let exe_target = fs::read_link(&exe).ok().map(|p| p.to_string_lossy().into_owned()); + let exe_target = exe_target.as_deref().map(strip_deleted); + + let Ok(maps) = fs::read_to_string(format!("/proc/{pid}/maps")) else { + return Vec::new(); + }; + maps.lines() + .filter_map(|line| { + let toks: Vec<&str> = line.split_whitespace().collect(); + // `range perms offset dev inode path` — anonymous maps have no path. + if toks.len() < 6 || !toks[1].starts_with('r') { + return None; + } + let path = strip_deleted(&toks[5..].join(" ")).to_owned(); + if !path.starts_with('/') { + return None; + } + let (start, end) = parse_range(toks[0])?; + let offset = u64::from_str_radix(toks[2], 16).ok()?; + let read_path = if Some(path.as_str()) == exe_target { + PathBuf::from(&exe) + } else { + PathBuf::from(path) + }; + Some(Segment { start, end, offset, path: read_path }) + }) + .collect() +} + +fn parse_range(range: &str) -> Option<(u64, u64)> { + let (start, end) = range.split_once('-')?; + Some((u64::from_str_radix(start, 16).ok()?, u64::from_str_radix(end, 16).ok()?)) +} + +fn strip_deleted(path: &str) -> &str { + path.strip_suffix(" (deleted)").unwrap_or(path) +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 3da8187b..d3a2f091 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -13,102 +13,23 @@ extern crate self as silver_metrics; use std::{ - cell::RefCell, fs::OpenOptions, io, os::fd::AsRawFd, path::Path, - sync::{ - OnceLock, - atomic::{AtomicPtr, AtomicU64, Ordering}, - }, + sync::atomic::{AtomicPtr, AtomicU64, Ordering}, }; -use flux::{Timer, timing::Instant}; pub use silver_common_macros::timed; pub mod flamegraph_timer; mod perf; -pub use perf::{EventSpec, MAX_EVENTS, PerfSample, schema, slot}; - -/// App name used as the parent directory for per-function `Timer` -/// shmem queues. Falls back to `"silver"` if `init_app` is not called. -static APP_NAME: OnceLock = OnceLock::new(); - -/// Publish the app name used by `#[timed]`-created `Timer`s. Must be -/// called at process startup, before any `#[timed]` function fires. -/// Repeat calls are no-ops (first set wins). -pub fn init_app(app_name: &str) { - let _ = APP_NAME.set(app_name.to_owned()); -} - -/// Internal: construct a flux `Timer` under the app namespace published -/// by `init_app`. Used by `TimerGuard` on first hit. -#[doc(hidden)] -pub fn new_timer(name: &str) -> Timer { - Timer::new(APP_NAME.get().map(String::as_str).unwrap_or("silver"), name) -} - -::std::thread_local! { - static TIMERS: RefCell> = RefCell::new(std::collections::HashMap::new()); -} - -/// Drop-based timer scope used by the `#[timed]` macro expansion. -/// Records processing time on every exit path — normal return, `?`, -/// early `return`, panic-unwind. -/// -/// Built with the `perf` feature it also carries a hardware-counter -/// dimension: in harness (call-tree) mode the counters are captured inside -/// [`flamegraph_timer`]; in live mode this guard reads them itself and streams -/// a [`PerfSample`] onto the `perf-{name}` queue for surfer. -#[doc(hidden)] -pub struct TimerGuard { - name: &'static str, - start: Instant, - /// Counter snapshot at entry for live-mode streaming. `None` in harness - /// mode (the call-tree sink reads counters itself), when this call is not - /// sampled, or when the `perf` feature/`perf_event_open` is unavailable. - perf_start: Option, -} - -impl TimerGuard { - #[inline] - pub fn new(name: &'static str) -> Self { - Self::new_sampled(name, true) - } - - /// As [`new`](Self::new) but `sample = false` skips this call's counter - /// read/stream (timing is still recorded). Used by `#[timed(sample = N)]`. - #[inline] - pub fn new_sampled(name: &'static str, sample: bool) -> Self { - let start = Instant::now(); - flamegraph_timer::stack_enter(name, start); - // Live mode only: in harness mode the sink captures counters itself. - let perf_start = (sample && !flamegraph_timer::is_enabled()).then(perf::read).flatten(); - Self { name, start, perf_start } - } -} - -impl Drop for TimerGuard { - fn drop(&mut self) { - // Bench/test mode replaces flux emission: record the call tree and - // skip the shmem write entirely. - if flamegraph_timer::is_enabled() { - flamegraph_timer::stack_exit(); - return; - } - TIMERS.with(|cell| { - let mut map = cell.borrow_mut(); - let timer = map.entry(self.name).or_insert_with(|| new_timer(self.name)); - timer.set_start(self.start); - timer.record_processing(); - }); - // Stream the call's counter delta for surfer (no-op without `perf`). - if let (Some(start), Some(end)) = (self.perf_start, perf::read()) { - perf::emit(self.name, &end.delta(&start)); - } - } -} +mod timing; +pub(crate) use perf::Schema; +pub(crate) use timing::TIMING; +#[cfg(test)] +pub(crate) use timing::test_shmem; +pub use timing::{TimerGuard, init_app}; /// Open / create the counters file, ftruncate to `bytes`, mmap shared, /// and return the base pointer. Counter files land in flux's standard @@ -346,52 +267,4 @@ mod tests { std::fs::remove_dir_all(&tmp).ok(); } - - use crate::timed; - - #[timed] - fn timed_default_name(x: u64) -> u64 { - x * 2 - } - - #[timed("custom_label")] - fn timed_custom_name(x: u64) -> Result { - if x == 0 { Err("zero") } else { Ok(x + 1) } - } - - #[test] - fn timed_macro_expands_and_runs() { - // Sets the app namespace so the per-fn shmem queues land somewhere - // predictable for the test run. - super::init_app("silver_test"); - - assert_eq!(timed_default_name(7), 14); - assert_eq!(timed_custom_name(0), Err("zero")); - assert_eq!(timed_custom_name(41), Ok(42)); - } - - #[timed(sample = 4)] - fn timed_sampled(x: u64) -> u64 { - x + 1 - } - - #[timed("timed_sampled_label", sample = 1000)] - fn timed_sampled_named(x: u64) -> Result { - if x == 0 { Err("zero") } else { Ok(x * 2) } - } - - /// The hardware-counter dimension is inert without the `perf` feature / - /// perf access; either way the wrap must be transparent, including the - /// sampled variants' skip path. - #[test] - fn timed_sampled_macro_expands_and_runs() { - super::init_app("silver_test"); - - // Cross the sampling boundary a few times. - for i in 0..10 { - assert_eq!(timed_sampled(i), i + 1); - } - assert_eq!(timed_sampled_named(0), Err("zero")); - assert_eq!(timed_sampled_named(21), Ok(42)); - } } diff --git a/crates/metrics/src/perf/events.rs b/crates/metrics/src/perf/events.rs index 22c1240f..2270c67b 100644 --- a/crates/metrics/src/perf/events.rs +++ b/crates/metrics/src/perf/events.rs @@ -1,7 +1,7 @@ //! Event vocabulary: turn a portable name into the perf_event_open //! `(type, config)` for the running CPU. **Edit [`METRICS`] to add a counter.** //! -//! All of this runs once, at [`schema`] init — never on the hot path. Names +//! All of this runs once, at [`Schema`] init — never on the hot path. Names //! resolve like `perf -e`: the curated table, then raw `rNNNN`, then a //! `/sys/bus/event_source/devices/cpu/events/` event. @@ -12,6 +12,7 @@ use super::sample::MAX_EVENTS; /// A resolved hardware event: the perf_event_open ABI numbers plus a label. /// `type_`/`config` are the same ids perf derives from an event name, so a /// postprocessor can relabel or merge across runs from them alone. +#[derive(Clone)] pub struct EventSpec { pub type_: u32, pub config: u64, @@ -174,30 +175,72 @@ fn resolve(name: &str) -> Option { sysfs(name) } -/// The slots read this run, resolved once from `SILVER_PERF_EVENTS` (or -/// [`DEFAULT_EVENTS`]). Unknown names are skipped with a warning; the list is -/// capped at [`MAX_EVENTS`]. -pub fn schema() -> &'static [EventSpec] { - static SCHEMA: OnceLock> = OnceLock::new(); - SCHEMA.get_or_init(|| { - let spec = - std::env::var("SILVER_PERF_EVENTS").unwrap_or_else(|_| DEFAULT_EVENTS.to_owned()); - spec.split(',') - .map(str::trim) - .filter(|s| !s.is_empty()) - .filter_map(|n| { - resolve(n).or_else(|| { - eprintln!("perf: unknown event '{n}', skipping"); - None +/// The perf event vocabulary a run measures: an ordered slot list that counter +/// samples are positional in. +#[derive(Clone)] +pub struct Schema(Vec); + +impl Schema { + /// Resolve a comma-separated spec: unknown names are skipped with a + /// warning, and the list is capped at [`MAX_EVENTS`]. + pub fn parse(spec: &str) -> Self { + Self( + spec.split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) + .filter_map(|n| { + resolve(n).or_else(|| { + eprintln!("perf: unknown event '{n}', skipping"); + None + }) }) - }) - .take(MAX_EVENTS) - .collect() - }) + .take(MAX_EVENTS) + .collect(), + ) + } + + pub const fn empty() -> Self { + Self(Vec::new()) + } + + /// Slot index of the event labelled `label`. + pub fn slot(&self, label: &str) -> Option { + self.0.iter().position(|e| e.label == label) + } + + /// Value of `label` in a positional sample's slots, or 0 if the run didn't + /// measure it. + pub fn value(&self, vals: &[u64], label: &str) -> u64 { + self.slot(label).map_or(0, |i| vals[i]) + } + + /// CPU cycles under either spelling perf uses for the event. + pub fn cycles(&self, vals: &[u64]) -> u64 { + self.value(vals, "cpu-cycles").max(self.value(vals, "cycles")) + } + + /// Instructions per cycle, or 0 when cycles weren't measured. + pub fn ipc(&self, vals: &[u64]) -> f64 { + let cycles = self.cycles(vals); + if cycles == 0 { 0.0 } else { self.value(vals, "instructions") as f64 / cycles as f64 } + } + + /// This process's own counter slots, resolved once from + /// `SILVER_PERF_EVENTS` (or [`DEFAULT_EVENTS`]) — the producer's + /// vocabulary. + pub fn local() -> &'static Schema { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA.get_or_init(|| { + let spec = + std::env::var("SILVER_PERF_EVENTS").unwrap_or_else(|_| DEFAULT_EVENTS.to_owned()); + Schema::parse(&spec) + }) + } } -/// Slot index of the event labelled `label`, for postprocessing that derives -/// ratios (e.g. IPC from `instructions`/`cpu-cycles`). -pub fn slot(label: &str) -> Option { - schema().iter().position(|e| e.label == label) +impl std::ops::Deref for Schema { + type Target = [EventSpec]; + fn deref(&self) -> &Self::Target { + &self.0 + } } diff --git a/crates/metrics/src/perf/mod.rs b/crates/metrics/src/perf/mod.rs index dc577544..c81b8e0c 100644 --- a/crates/metrics/src/perf/mod.rs +++ b/crates/metrics/src/perf/mod.rs @@ -1,27 +1,28 @@ //! Hardware performance counters for `#[timed]` (distinct from the shmem app //! counters in `declare_counters!`). The hot path is deliberately dumb: read -//! the opened counters into a fixed [`PerfSample`] and either fold the raw -//! deltas into the call tree (harness mode) or stream them onto the -//! `perf-{name}` queue (live mode). All labelling and derived ratios happen in -//! postprocessing. +//! the opened counters into a fixed [`PerfSample`] that rides alongside each +//! flamegraph mark; all labelling and derived ratios happen in postprocessing. //! //! Layers (read top-down): //! - [`events`] — name → perf_event_open `(type, config)` for this CPU; the //! single place to add a metric. Pure logic, no I/O on the hot path. //! - [`sample`] — [`PerfSample`], the raw value record. -//! - [`source`] — per-thread rdpmc [`read`]/[`emit`], feature-gated. +//! - [`source`] — per-thread rdpmc [`read`], feature-gated. //! - `raw` — the rdpmc primitive (only compiled with the `perf` feature). //! -//! Collection is gated behind the `perf` feature; without it (or when -//! `perf_event_paranoid` blocks the events) [`read`] returns `None` and -//! [`emit`] is a no-op, so `#[timed]` degrades to timing only. +//! Collection is gated behind the `perf` feature; without it `#[timed]` reads +//! no counters, and even with it [`read`] returns `None` when +//! `perf_event_paranoid` blocks the events — so `#[timed]` degrades to timing +//! only. mod events; #[cfg(feature = "perf")] mod raw; mod sample; +#[cfg(feature = "perf")] mod source; -pub use events::{EventSpec, schema, slot}; +pub use events::Schema; pub use sample::{MAX_EVENTS, PerfSample}; -pub(crate) use source::{emit, read}; +#[cfg(feature = "perf")] +pub(crate) use source::read; diff --git a/crates/metrics/src/perf/source.rs b/crates/metrics/src/perf/source.rs index c0258cd3..498b9289 100644 --- a/crates/metrics/src/perf/source.rs +++ b/crates/metrics/src/perf/source.rs @@ -1,24 +1,15 @@ -//! Live counter source: open the [`schema`](super::schema)'s events per thread -//! via rdpmc, then [`read`] a snapshot or [`emit`] one onto the `perf-{name}` -//! queue for surfer. Feature-gated — without `perf`, `read` is `None` and -//! `emit` a no-op, so `#[timed]` degrades to timing only. +//! Counter source: open the [`Schema::local`](super::Schema::local) events +//! per thread via rdpmc, then [`read`] a snapshot to ride alongside each +//! flamegraph mark. Only +//! compiled with the `perf` feature; without it `#[timed]` never calls in here +//! and degrades to timing only. -pub(crate) use imp::{emit, read}; +pub(crate) use imp::read; -#[cfg(feature = "perf")] mod imp { - use std::{cell::RefCell, collections::HashMap}; + use crate::perf::{MAX_EVENTS, PerfSample, Schema, raw::HwCounter}; - use flux::{ - communication::queue::{Producer, Queue, QueueType}, - utils::directories::{local_share_dir, shmem_dir_queues_with_base}, - }; - - use crate::perf::{MAX_EVENTS, PerfSample, raw::HwCounter, schema}; - - const QUEUE_SIZE: usize = 2usize.pow(13); - - /// Per-thread counters, one slot per [`schema`] entry (`None` where the + /// Per-thread counters, one slot per [`Schema`] entry (`None` where the /// event couldn't be opened — over budget or unsupported). struct Counters { opened: Vec>, @@ -27,7 +18,7 @@ mod imp { impl Counters { fn open() -> Option { let opened: Vec<_> = - schema().iter().map(|e| HwCounter::event(e.type_, e.config)).collect(); + Schema::local().iter().map(|e| HwCounter::event(e.type_, e.config)).collect(); opened.iter().any(Option::is_some).then_some(Self { opened }) } @@ -47,8 +38,6 @@ mod imp { /// Perf pid=0 events bind to the opening thread. None when /// perf_event_open is unavailable. static COUNTERS: Option = Counters::open(); - static PRODUCERS: RefCell>> = - RefCell::new(HashMap::new()); } /// Current counter snapshot for the calling thread, or `None` when @@ -57,40 +46,4 @@ mod imp { pub(crate) fn read() -> Option { COUNTERS.with(|c| c.as_ref().map(Counters::read)) } - - fn new_producer(name: &str) -> Producer { - let app = crate::APP_NAME.get().map(String::as_str).unwrap_or("silver"); - let dir = shmem_dir_queues_with_base(local_share_dir(), app); - let _ = std::fs::create_dir_all(&dir); - let queue: Queue = Queue::create_or_open_shared( - dir.join(format!("perf-{name}")), - QUEUE_SIZE, - QueueType::MPMC, - ); - Producer::from(queue) - } - - /// Stream one call's `sample` onto this thread's `perf-{name}` queue for - /// surfer (live mode). - #[inline] - pub(crate) fn emit(name: &'static str, sample: &PerfSample) { - PRODUCERS.with(|cell| { - let mut map = cell.borrow_mut(); - let producer = map.entry(name).or_insert_with(|| new_producer(name)); - producer.produce(sample); - }); - } -} - -#[cfg(not(feature = "perf"))] -mod imp { - use crate::perf::PerfSample; - - #[inline] - pub(crate) fn read() -> Option { - None - } - - #[inline] - pub(crate) fn emit(_name: &'static str, _sample: &PerfSample) {} } diff --git a/crates/metrics/src/timing.rs b/crates/metrics/src/timing.rs new file mode 100644 index 00000000..d68d4817 --- /dev/null +++ b/crates/metrics/src/timing.rs @@ -0,0 +1,136 @@ +//! Runtime on/off switch and the drop guard behind `#[timed]`. Off (the +//! default) allocates nothing — one atomic load per call, so tests running +//! `#[timed]` prod code touch no shmem. Enabled, the guard records a frame +//! open/close into the cross-process flamegraph rings in +//! [`flamegraph_timer`](crate::flamegraph_timer); hardware counters (with the +//! `perf` feature) ride alongside each mark there. + +use std::sync::{ + OnceLock, + atomic::{AtomicBool, Ordering}, +}; + +use crate::flamegraph_timer::{self, Frame}; + +/// Process-global `#[timed]` config: whether marks are produced, and the app +/// name used as the parent dir for the shmem rings (`"silver"` until +/// `init_app`). +pub(crate) struct Timing { + enabled: AtomicBool, + app: OnceLock, +} + +pub(crate) static TIMING: Timing = Timing { enabled: AtomicBool::new(false), app: OnceLock::new() }; + +impl Timing { + pub(crate) fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::Acquire) + } + + pub(crate) fn app(&self) -> &str { + self.app.get().map(String::as_str).unwrap_or("silver") + } + + pub(crate) fn set_enabled(&self) { + self.enabled.store(true, Ordering::Release); + } +} + +/// Publish the app name used by `#[timed]` shmem. Must be called at process +/// startup, before any `#[timed]` function fires. First set wins. +pub fn init_app(app_name: &str) { + let _ = TIMING.app.set(app_name.to_owned()); +} + +/// Drop-based timer scope used by the `#[timed]` macro expansion. Records a +/// frame open on construction and a close on every exit path — normal return, +/// `?`, early `return`, panic-unwind. +#[doc(hidden)] +pub struct TimerGuard { + /// `Some(name)` iff the open was recorded — captured at construction so a + /// mid-call enable can't make `drop` emit a close with no matching open and + /// unbalance the flamegraph fold. The close reuses `name`, so its id pairs + /// with the open's by construction. + close: Option<&'static str>, +} + +impl TimerGuard { + #[inline] + pub fn new(name: &'static str) -> Self { + let close = TIMING.is_enabled().then_some(name); + if let Some(name) = close { + flamegraph_timer::record(Frame::open(name)); + } + Self { close } + } +} + +impl Drop for TimerGuard { + fn drop(&mut self) { + if let Some(name) = self.close { + flamegraph_timer::record(Frame::close(name)); + } + } +} + +/// Isolates each test process's collection shmem under a per-pid app and +/// unlinks it once the last guarded test in the process exits — normal return +/// or unwinding panic. Without this, every test that produces marks leaks its +/// rings into `/dev/shm` (a crashed swarm filled it and took down other +/// Chromium apps); with tiny `cfg(test)` rings the leak is also bounded. +#[cfg(test)] +pub(crate) mod test_shmem { + use std::sync::{Mutex, MutexGuard}; + + use flux::{communication::cleanup::cleanup_shmem, utils::directories::local_share_dir}; + + /// Timing tests drive the process-global mode + reader and share one shmem + /// dir, so they must run one at a time. nextest isolates per process; + /// `cargo test` runs them as threads, so serialize here. + static SERIAL: Mutex<()> = Mutex::new(()); + + pub(crate) struct ShmemGuard { + _serial: MutexGuard<'static, ()>, + } + + impl ShmemGuard { + pub(crate) fn new() -> Self { + let serial = SERIAL.lock().unwrap_or_else(|p| p.into_inner()); + super::init_app(&format!("silver_test_{}", std::process::id())); + ShmemGuard { _serial: serial } + } + } + + impl Drop for ShmemGuard { + fn drop(&mut self) { + if let Some(app) = super::TIMING.app.get() { + cleanup_shmem(&local_share_dir().join(app)); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::test_shmem::ShmemGuard; + use crate::timed; + + #[timed] + fn timed_default_name(x: u64) -> u64 { + x * 2 + } + + #[timed("custom_label")] + fn timed_custom_name(x: u64) -> Result { + if x == 0 { Err("zero") } else { Ok(x + 1) } + } + + #[test] + fn timed_macro_expands_and_runs() { + let _guard = ShmemGuard::new(); + + assert_eq!(timed_default_name(7), 14); + assert_eq!(timed_custom_name(0), Err("zero")); + assert_eq!(timed_custom_name(41), Ok(42)); + } +} diff --git a/crates/surfer/src/app.rs b/crates/surfer/src/app.rs index 27cd6020..92434394 100644 --- a/crates/surfer/src/app.rs +++ b/crates/surfer/src/app.rs @@ -4,9 +4,8 @@ use ratatui::widgets::TableState; use crate::{ discovery::DiscoveredSources, - sources::{ - counters::CounterSet, perf::PerfSet, tilemetrics::TileMetricsSet, timings::TimingSet, - }, + flamegraph::Flamegraph, + sources::{counters::CounterSet, tilemetrics::TileMetricsSet, timings::TimingSet}, }; #[derive(Clone, Copy, PartialEq, Eq)] @@ -15,9 +14,12 @@ pub enum Pane { TCaches, Timings, Tiles, - Perf, + Flamegraph, } +pub const PANES: [Pane; 5] = + [Pane::Counters, Pane::TCaches, Pane::Timings, Pane::Tiles, Pane::Flamegraph]; + impl Pane { pub fn label(self) -> &'static str { match self { @@ -25,7 +27,7 @@ impl Pane { Pane::TCaches => "TCaches", Pane::Timings => "Timings", Pane::Tiles => "Tiles", - Pane::Perf => "Perf", + Pane::Flamegraph => "Flamegraph", } } @@ -34,8 +36,8 @@ impl Pane { Pane::Counters => Pane::TCaches, Pane::TCaches => Pane::Timings, Pane::Timings => Pane::Tiles, - Pane::Tiles => Pane::Perf, - Pane::Perf => Pane::Counters, + Pane::Tiles => Pane::Flamegraph, + Pane::Flamegraph => Pane::Counters, } } } @@ -52,8 +54,6 @@ pub struct App { pub timings_selection: usize, pub tilemetrics: Vec, pub tiles_selection: usize, - pub perf: Vec, - pub perf_selection: usize, /// When true, the active pane renders only the plot for the /// selected row, full-area. Toggled by Enter; Esc exits. pub drilled_in: bool, @@ -70,7 +70,7 @@ pub struct App { pub tcaches_table_state: TableState, pub timings_table_state: TableState, pub tiles_table_state: TableState, - pub perf_table_state: TableState, + pub flamegraph: Flamegraph, pub quit: bool, } @@ -85,7 +85,7 @@ impl App { tcaches: Vec, timings: Vec, tilemetrics: Vec, - perf: Vec, + flamegraph: Flamegraph, ) -> Self { Self { pane: Pane::Counters, @@ -97,15 +97,13 @@ impl App { timings_selection: 0, tilemetrics, tiles_selection: 0, - perf, - perf_selection: 0, drilled_in: false, split_pct: SPLIT_DEFAULT, counters_table_state: TableState::default(), tcaches_table_state: TableState::default(), timings_table_state: TableState::default(), tiles_table_state: TableState::default(), - perf_table_state: TableState::default(), + flamegraph, quit: false, } } @@ -205,23 +203,6 @@ impl App { self.tiles_selection = idx; } } - - // Perf. - let sel_name = self.perf.get(self.perf_selection).map(|p| p.name.clone()); - let existing: HashSet = self.perf.iter().map(|p| p.name.clone()).collect(); - for f in &sources.perf { - if !existing.contains(&f.name) { - if let Ok(p) = PerfSet::open(f) { - self.perf.push(p); - } - } - } - self.perf.sort_by(|a, b| a.name.cmp(&b.name)); - if let Some(n) = sel_name { - if let Some(idx) = self.perf.iter().position(|p| p.name == n) { - self.perf_selection = idx; - } - } } pub fn sample(&mut self) { @@ -232,14 +213,12 @@ impl App { c.sample(); } for t in &mut self.timings { - t.drain(); + t.latency.drain(); } for t in &mut self.tilemetrics { t.drain(); } - for p in &mut self.perf { - p.drain(); - } + self.flamegraph.sample(); } pub fn roll_bucket(&mut self) { @@ -253,14 +232,12 @@ impl App { c.roll_bucket(); } for t in &mut self.timings { - t.roll_bucket(); + t.latency.roll_bucket(); } for t in &mut self.tilemetrics { t.roll_bucket(); } - for p in &mut self.perf { - p.roll_bucket(); - } + self.flamegraph.roll_bucket(); } /// Scroll the selection within the active pane. `dir = +1` @@ -272,17 +249,8 @@ impl App { Pane::TCaches => self.move_tcache_selection(dir), Pane::Timings => self.move_timing_selection(dir), Pane::Tiles => self.move_tile_selection(dir), - Pane::Perf => self.move_perf_selection(dir), - } - } - - fn move_perf_selection(&mut self, dir: i32) { - if self.perf.is_empty() { - return; + Pane::Flamegraph => self.flamegraph.scroll_by(dir), } - let n = self.perf.len() as i32; - let new = (self.perf_selection as i32 + dir).rem_euclid(n); - self.perf_selection = new as usize; } fn move_tcache_selection(&mut self, dir: i32) { diff --git a/crates/surfer/src/discovery.rs b/crates/surfer/src/discovery.rs index 182e6645..4c12f7a9 100644 --- a/crates/surfer/src/discovery.rs +++ b/crates/surfer/src/discovery.rs @@ -5,19 +5,17 @@ //! //! Files of interest: //! - `counters-{name}` — shmem-mapped `[AtomicU64; N]` (read-only). -//! - `timing-{name}` / `latency-{name}` — flux MPMC `TimingMessage` queues. +//! - `latency-{name}` — flux MPMC `TimingMessage` queue (tcache consumer +//! latency). //! - `tilemetrics-{name}` — flux SPMC `TileSample` queue. -//! - `perf-{name}` — flux MPMC `PerfSample` queue (`#[timed]` functions built -//! with the `perf` feature). -use std::{collections::HashMap, fs, io, path::PathBuf}; +use std::{fs, io, path::PathBuf}; pub struct DiscoveredSources { pub counters: Vec, pub tcaches: Vec, pub timings: Vec, pub tilemetrics: Vec, - pub perf: Vec, } pub struct CounterFile { @@ -30,12 +28,9 @@ pub struct CounterFile { } pub struct TimingFile { - /// The `{name}` shared by `timing-{name}` and `latency-{name}`. + /// The `{name}` suffix from `latency-{name}`. pub name: String, - /// Path to `timing-{name}` if the file exists. - pub timing_path: Option, - /// Path to `latency-{name}` if the file exists. - pub latency_path: Option, + pub path: PathBuf, } pub struct TileMetricsFile { @@ -44,21 +39,11 @@ pub struct TileMetricsFile { pub path: PathBuf, } -pub struct PerfFile { - /// The `{name}` suffix from `perf-{name}` — the decorated function's - /// `module_path::fn` (or its `#[timed("...")]` override). - pub name: String, - pub path: PathBuf, -} - pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result { let mut counters = Vec::new(); let mut tcaches = Vec::new(); - // timing-{name} and latency-{name} are flux Timer file pairs. - // Group by name so a single TimingFile carries both paths. - let mut timing_map: HashMap = HashMap::new(); + let mut timings = Vec::new(); let mut tilemetrics = Vec::new(); - let mut perf = Vec::new(); let dir = flux::utils::directories::shmem_dir_queues_with_base(base_dir, app_name); if let Ok(entries) = fs::read_dir(&dir) { @@ -73,35 +58,17 @@ pub fn discover(base_dir: &std::path::Path, app_name: &str) -> io::Result = timing_map.into_values().collect(); counters.sort_by(|a, b| a.name.cmp(&b.name)); tcaches.sort_by(|a, b| a.name.cmp(&b.name)); timings.sort_by(|a, b| a.name.cmp(&b.name)); tilemetrics.sort_by(|a, b| a.name.cmp(&b.name)); - perf.sort_by(|a, b| a.name.cmp(&b.name)); - Ok(DiscoveredSources { counters, tcaches, timings, tilemetrics, perf }) + Ok(DiscoveredSources { counters, tcaches, timings, tilemetrics }) } diff --git a/crates/surfer/src/flamegraph.rs b/crates/surfer/src/flamegraph.rs new file mode 100644 index 00000000..9d5d5c22 --- /dev/null +++ b/crates/surfer/src/flamegraph.rs @@ -0,0 +1,94 @@ +use silver_metrics::flamegraph_timer::FlamegraphReader; + +pub struct Flamegraph { + reader: Option, + tree: String, + missed: bool, + scroll: u16, + /// When set, stop polling/folding so the tree holds still for reading — a + /// live producer otherwise keeps growing the cumulative tree every tick. + paused: bool, +} + +impl Flamegraph { + pub fn attach(app_name: &str) -> Self { + Self { + reader: FlamegraphReader::attach(app_name), + tree: String::new(), + missed: false, + scroll: 0, + paused: false, + } + } + + pub fn sample(&mut self) { + if self.paused { + return; + } + if let Some(reader) = &mut self.reader { + reader.poll(); + } + } + + pub fn roll_bucket(&mut self) { + if self.paused { + return; + } + if let Some(reader) = &self.reader { + let stats = reader.stats(); + self.missed = stats.missed_events(); + self.tree = stats.call_tree(); + } + } + + pub fn scroll_by(&mut self, dir: i32) { + self.scroll = self.scroll.saturating_add_signed(dir as i16); + } + + pub fn toggle_pause(&mut self) { + self.paused = !self.paused; + } + + /// Drop the accumulator and start a fresh cumulative window from now — + /// sheds one-time boot frames so steady-state stands out. + pub fn clear(&mut self, app_name: &str) { + self.reattach(app_name); + self.paused = false; + } + + /// Reattach when a fresh pid is published — surfer started before the + /// producer, or the producer restarted with a new pid and new rings. + pub fn reattach_if_restarted(&mut self, app_name: &str) { + if let Some(pid) = FlamegraphReader::published_pid(app_name) { + if self.reader.as_ref().map(FlamegraphReader::pid) != Some(pid) { + self.reattach(app_name); + } + } + } + + fn reattach(&mut self, app_name: &str) { + self.reader = FlamegraphReader::attach(app_name); + self.tree.clear(); + self.scroll = 0; + } + + pub fn is_attached(&self) -> bool { + self.reader.is_some() + } + + pub fn tree(&self) -> &str { + &self.tree + } + + pub fn missed(&self) -> bool { + self.missed + } + + pub fn paused(&self) -> bool { + self.paused + } + + pub fn scroll(&self) -> u16 { + self.scroll + } +} diff --git a/crates/surfer/src/main.rs b/crates/surfer/src/main.rs index 0c4d027c..b47c221f 100644 --- a/crates/surfer/src/main.rs +++ b/crates/surfer/src/main.rs @@ -1,7 +1,8 @@ //! Terminal metrics viewer for silver. Reads `counters-*`, -//! `timing-*`, `tilemetrics-*`, `perf-*` files from flux's shmem +//! `latency-*`, `tilemetrics-*` files from flux's shmem //! queues directory — `{base_dir}/{app_name}/shmem/queues/` — and -//! renders them in a ratatui-based TUI. +//! renders them in a ratatui-based TUI. `#[timed]` perf counters are +//! folded into the flamegraph pane, not surfaced as a separate source. //! //! Usage: `surfer [BASE_DIR] [APP_NAME]`. //! Defaults: `BASE_DIR = flux::utils::directories::local_share_dir()` @@ -23,15 +24,15 @@ use ratatui::{Terminal, backend::CrosstermBackend}; mod app; mod discovery; +mod flamegraph; mod render; mod schema; mod sources; use crate::{ app::App, - sources::{ - counters::CounterSet, perf::PerfSet, tilemetrics::TileMetricsSet, timings::TimingSet, - }, + flamegraph::Flamegraph, + sources::{counters::CounterSet, tilemetrics::TileMetricsSet, timings::TimingSet}, }; const TICK: Duration = Duration::from_millis(100); @@ -89,7 +90,7 @@ fn main() -> io::Result<()> { }) .collect(); for t in &mut timing_sets { - t.drain(); + t.latency.drain(); } let mut tile_sets: Vec = sources @@ -107,21 +108,8 @@ fn main() -> io::Result<()> { t.drain(); } - let mut perf_sets: Vec = sources - .perf - .iter() - .filter_map(|f| match PerfSet::open(f) { - Ok(p) => Some(p), - Err(e) => { - eprintln!("surfer: skipping {}: {e}", f.path.display()); - None - } - }) - .collect(); - for p in &mut perf_sets { - p.drain(); - } - let mut app = App::new(counter_sets, tcache_sets, timing_sets, tile_sets, perf_sets); + let flamegraph = Flamegraph::attach(&app_name); + let mut app = App::new(counter_sets, tcache_sets, timing_sets, tile_sets, flamegraph); enable_raw_mode()?; let mut stdout = io::stdout(); @@ -154,7 +142,7 @@ fn run( if event::poll(timeout)? { if let Event::Key(key) = event::read()? { if key.kind == KeyEventKind::Press { - handle_key(app, key.code); + handle_key(app, key.code, app_name); } } } @@ -170,6 +158,7 @@ fn run( if let Ok(s) = discovery::discover(base_dir, app_name) { app.merge_new_sources(s); } + app.flamegraph.reattach_if_restarted(app_name); last_discover = Instant::now(); } if app.quit { @@ -178,7 +167,7 @@ fn run( } } -fn handle_key(app: &mut App, code: KeyCode) { +fn handle_key(app: &mut App, code: KeyCode, app_name: &str) { match code { KeyCode::Char('q') => app.quit = true, KeyCode::Esc | KeyCode::Backspace if app.drilled_in => app.drilled_in = false, @@ -191,6 +180,8 @@ fn handle_key(app: &mut App, code: KeyCode) { KeyCode::Up => app.move_selection(-1), KeyCode::Char('[') => app.adjust_split(-1), KeyCode::Char(']') => app.adjust_split(1), + KeyCode::Char('p') => app.flamegraph.toggle_pause(), + KeyCode::Char('c') => app.flamegraph.clear(app_name), _ => {} } } diff --git a/crates/surfer/src/render/flamegraph_pane.rs b/crates/surfer/src/render/flamegraph_pane.rs new file mode 100644 index 00000000..0262bd54 --- /dev/null +++ b/crates/surfer/src/render/flamegraph_pane.rs @@ -0,0 +1,55 @@ +use ratatui::{ + Frame, + layout::Rect, + style::{Color, Modifier, Style}, + text::{Line, Span}, + widgets::{Block, Borders, Paragraph}, +}; + +use crate::app::App; + +pub fn draw(f: &mut Frame, area: Rect, app: &App) { + let fg = &app.flamegraph; + let block = block(fg.missed(), fg.paused()); + + if !fg.is_attached() { + f.render_widget( + Paragraph::new( + "waiting for a running silver — no pid published in this app's shmem dir yet", + ) + .style(Style::default().fg(Color::DarkGray)) + .block(block), + area, + ); + return; + } + + if fg.tree().is_empty() { + f.render_widget( + Paragraph::new("attached — folding marks…") + .style(Style::default().fg(Color::DarkGray)) + .block(block), + area, + ); + return; + } + + f.render_widget(Paragraph::new(fg.tree()).block(block).scroll((fg.scroll(), 0)), area); +} + +fn block(missed: bool, paused: bool) -> Block<'static> { + let title = if missed { + Line::from(Span::styled( + " Flamegraph (cumulative) — EVENTS LOST: producer outran the reader ", + Style::default().fg(Color::Red).add_modifier(Modifier::BOLD), + )) + } else if paused { + Line::from(Span::styled( + " Flamegraph (cumulative) — PAUSED (p resume · c clear) ", + Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD), + )) + } else { + Line::from(" Flamegraph (cumulative) — p pause · c clear ") + }; + Block::default().borders(Borders::ALL).title(title) +} diff --git a/crates/surfer/src/render/mod.rs b/crates/surfer/src/render/mod.rs index 805926a7..32e69f00 100644 --- a/crates/surfer/src/render/mod.rs +++ b/crates/surfer/src/render/mod.rs @@ -1,6 +1,6 @@ pub mod counters_pane; +pub mod flamegraph_pane; pub mod fmt; -pub mod perf_pane; pub mod tcaches_pane; pub mod tiles_pane; pub mod timings_pane; @@ -13,7 +13,7 @@ use ratatui::{ widgets::Paragraph, }; -use crate::app::{App, Pane}; +use crate::app::{App, PANES, Pane}; pub fn draw(f: &mut Frame, app: &mut App) { let area = f.area(); @@ -27,13 +27,13 @@ pub fn draw(f: &mut Frame, app: &mut App) { Pane::TCaches => tcaches_pane::draw(f, chunks[1], app), Pane::Timings => timings_pane::draw(f, chunks[1], app), Pane::Tiles => tiles_pane::draw(f, chunks[1], app), - Pane::Perf => perf_pane::draw(f, chunks[1], app), + Pane::Flamegraph => flamegraph_pane::draw(f, chunks[1], app), } draw_footer(f, chunks[2], app); } fn draw_header(f: &mut Frame, area: Rect, app: &App) { - let spans: Vec = [Pane::Counters, Pane::TCaches, Pane::Timings, Pane::Tiles, Pane::Perf] + let spans: Vec = PANES .iter() .flat_map(|&p| { let style = if p == app.pane { diff --git a/crates/surfer/src/render/perf_pane.rs b/crates/surfer/src/render/perf_pane.rs deleted file mode 100644 index dd8eef21..00000000 --- a/crates/surfer/src/render/perf_pane.rs +++ /dev/null @@ -1,143 +0,0 @@ -use ratatui::{ - Frame, - layout::{Constraint, Direction, Layout, Rect}, - style::{Color, Modifier, Style}, - symbols, - text::{Line, Span}, - widgets::{Axis, Block, Borders, Cell, Chart, Dataset, GraphType, Paragraph, Row, Table}, -}; - -use crate::{app::App, render::fmt::fmt_span_ago}; - -pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { - if app.perf.is_empty() { - let block = Block::default().borders(Borders::ALL).title(" perf "); - let inner = block.inner(area); - f.render_widget(block, area); - f.render_widget( - Paragraph::new("no perf queues discovered (build with --features perf)") - .style(Style::default().fg(Color::DarkGray)), - inner, - ); - return; - } - - if app.drilled_in { - draw_chart(f, area, app); - return; - } - let rows = Layout::default() - .direction(Direction::Vertical) - .constraints([ - Constraint::Percentage(app.split_pct), - Constraint::Percentage(100 - app.split_pct), - ]) - .split(area); - - draw_table(f, rows[0], app); - draw_chart(f, rows[1], app); -} - -fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { - let block = Block::default().borders(Borders::ALL).title(" perf — 30 s avg per call "); - let header = Row::new(vec![ - Cell::from("function"), - Cell::from("calls/s"), - Cell::from("instr"), - Cell::from("cycles"), - Cell::from("ipc"), - Cell::from("br/1k"), - Cell::from("llc/call"), - ]) - .style(Style::default().add_modifier(Modifier::BOLD).fg(Color::White)) - .height(1); - - let rows: Vec = app - .perf - .iter() - .enumerate() - .map(|(i, p)| { - let row_style = if i == app.perf_selection { - Style::default().bg(Color::DarkGray).fg(Color::White).add_modifier(Modifier::BOLD) - } else { - Style::default() - }; - Row::new(vec![ - Cell::from(Span::styled(p.name.clone(), row_style)), - Cell::from(format!("{:>9}", p.call_rate())), - Cell::from(format!("{:>12}", p.instr_avg())), - Cell::from(format!("{:>12}", p.cycles_avg())), - Cell::from(format!("{:>6.2}", p.ipc())), - Cell::from(format!("{:>7.2}", p.branch_per_kinstr())), - Cell::from(format!("{:>9}", p.cache_miss_avg())), - ]) - .height(1) - }) - .collect(); - - let widths = [ - Constraint::Percentage(42), - Constraint::Length(10), - Constraint::Length(14), - Constraint::Length(14), - Constraint::Length(8), - Constraint::Length(9), - Constraint::Length(10), - ]; - let table = Table::new(rows, widths).header(header).block(block); - app.perf_table_state.select(Some(app.perf_selection)); - f.render_stateful_widget(table, area, &mut app.perf_table_state); -} - -/// IPC over the retained bucket ring for the selected function. -fn draw_chart(f: &mut Frame, area: Rect, app: &mut App) { - let Some(p) = app.perf.get(app.perf_selection) else { - f.render_widget(Block::default().borders(Borders::ALL).title(" ipc "), area); - return; - }; - let n = p.history.len(); - let title = format!(" {} — ipc ", p.name); - let block = Block::default().borders(Borders::ALL).title(title); - if n == 0 || !p.has_data() { - let inner = block.inner(area); - f.render_widget(block, area); - f.render_widget( - Paragraph::new("no samples yet").style(Style::default().fg(Color::DarkGray)), - inner, - ); - return; - } - - // One bucket per second, oldest at x=0, newest at right. - let data: Vec<(f64, f64)> = - p.history.iter().enumerate().map(|(i, b)| (i as f64, b.ipc())).collect(); - let y_max = data.iter().map(|(_, y)| *y).fold(0.0_f64, f64::max).max(1.0); - let x_max = n.saturating_sub(1).max(1) as f64; - - let dataset = Dataset::default() - .marker(symbols::Marker::Braille) - .style(Style::default().fg(Color::Cyan)) - .graph_type(GraphType::Line) - .data(&data); - - let x_labels = vec![ - Line::from(format!("-{}", fmt_span_ago(n))), - Line::from(format!("-{}", fmt_span_ago(n / 2))), - Line::from("now"), - ]; - let chart = Chart::new(vec![dataset]) - .block(block) - .x_axis( - Axis::default() - .bounds([0.0, x_max]) - .labels(x_labels) - .style(Style::default().fg(Color::DarkGray)), - ) - .y_axis( - Axis::default() - .bounds([0.0, y_max * 1.1]) - .labels(vec![Line::from("0"), Line::from(format!("{y_max:.1}"))]) - .style(Style::default().fg(Color::DarkGray)), - ); - f.render_widget(chart, area); -} diff --git a/crates/surfer/src/render/timings_pane.rs b/crates/surfer/src/render/timings_pane.rs index 1197a6e0..340cb13d 100644 --- a/crates/surfer/src/render/timings_pane.rs +++ b/crates/surfer/src/render/timings_pane.rs @@ -7,10 +7,7 @@ use ratatui::{ widgets::{Axis, Block, Borders, Cell, Chart, Dataset, GraphType, Paragraph, Row, Table}, }; -use crate::{ - app::App, - sources::timings::{TimingChannel, TimingSet}, -}; +use crate::{app::App, sources::timings::TimingChannel}; pub fn draw(f: &mut Frame, area: Rect, app: &mut App) { if app.timings.is_empty() { @@ -45,7 +42,6 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { let block = Block::default().borders(Borders::ALL).title(" timings "); let header = Row::new(vec![ Cell::from("timer"), - Cell::from("kind"), Cell::from("last"), Cell::from("p50 (last bucket)"), Cell::from("p99 (last bucket)"), @@ -59,10 +55,7 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { .iter() .enumerate() .map(|(i, t)| { - let kind = kind_marker(t); - let primary = t.primary(); - let last_bucket = primary.and_then(|c| c.last_bucket()).unwrap_or_default(); - let last_ns = primary.map(|c| c.last_ns).unwrap_or(0); + let last_bucket = t.latency.last_bucket().unwrap_or_default(); let style = if i == app.timings_selection { Style::default().bg(Color::DarkGray).fg(Color::White).add_modifier(Modifier::BOLD) } else { @@ -70,8 +63,7 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { }; Row::new(vec![ Cell::from(t.name.clone()), - Cell::from(kind), - Cell::from(format_ns(last_ns)), + Cell::from(format_ns(t.latency.last_ns)), Cell::from(format_ns(last_bucket.p50_ns)), Cell::from(format_ns(last_bucket.p99_ns)), Cell::from(format!("{:>10}", last_bucket.count)), @@ -83,7 +75,6 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { let widths = [ Constraint::Percentage(35), - Constraint::Length(6), Constraint::Length(12), Constraint::Length(18), Constraint::Length(18), @@ -94,86 +85,17 @@ fn draw_table(f: &mut Frame, area: Rect, app: &mut App) { f.render_stateful_widget(table, area, &mut app.timings_table_state); } -fn kind_marker(t: &TimingSet) -> &'static str { - let has_t = t.timing.as_ref().is_some_and(TimingChannel::has_data); - let has_l = t.latency.as_ref().is_some_and(TimingChannel::has_data); - match (has_t, has_l) { - (true, true) => "T+L", - (true, false) => "T", - (false, true) => "L", - (false, false) => "·", - } -} - fn draw_charts(f: &mut Frame, area: Rect, app: &App) { let Some(timer) = app.timings.get(app.timings_selection) else { f.render_widget(Block::default().borders(Borders::ALL).title(" history "), area); return; }; - - let has_t = timer.timing.as_ref().is_some_and(TimingChannel::has_data); - let has_l = timer.latency.as_ref().is_some_and(TimingChannel::has_data); - match (has_t, has_l) { - (true, true) => { - let rows = Layout::default() - .direction(Direction::Vertical) - .constraints([Constraint::Percentage(50), Constraint::Percentage(50)]) - .split(area); - draw_channel_chart( - f, - rows[0], - &timer.name, - "processing", - Color::Cyan, - timer.timing.as_ref().unwrap(), - ); - draw_channel_chart( - f, - rows[1], - &timer.name, - "latency", - Color::Magenta, - timer.latency.as_ref().unwrap(), - ); - } - (true, false) => draw_channel_chart( - f, - area, - &timer.name, - "processing", - Color::Cyan, - timer.timing.as_ref().unwrap(), - ), - (false, true) => draw_channel_chart( - f, - area, - &timer.name, - "latency", - Color::Magenta, - timer.latency.as_ref().unwrap(), - ), - (false, false) => { - let block = Block::default().borders(Borders::ALL).title(format!(" {} ", timer.name)); - let inner = block.inner(area); - f.render_widget(block, area); - f.render_widget( - Paragraph::new("no events drained yet").style(Style::default().fg(Color::DarkGray)), - inner, - ); - } - } + draw_channel_chart(f, area, &timer.name, &timer.latency); } -fn draw_channel_chart( - f: &mut Frame, - area: Rect, - name: &str, - kind: &str, - p50_color: Color, - ch: &TimingChannel, -) { +fn draw_channel_chart(f: &mut Frame, area: Rect, name: &str, ch: &TimingChannel) { let title = format!( - " {name} — {kind} p50 / p99 over {}s buckets ", + " {name} — latency p50 / p99 over {}s buckets ", crate::sources::counters::BUCKET_SECS ); let block = Block::default().borders(Borders::ALL).title(title); @@ -203,7 +125,7 @@ fn draw_channel_chart( Dataset::default() .name("p50 µs") .marker(symbols::Marker::Braille) - .style(Style::default().fg(p50_color)) + .style(Style::default().fg(Color::Magenta)) .graph_type(GraphType::Line) .data(&p50), Dataset::default() diff --git a/crates/surfer/src/sources/mod.rs b/crates/surfer/src/sources/mod.rs index 19be3fa5..814ffe03 100644 --- a/crates/surfer/src/sources/mod.rs +++ b/crates/surfer/src/sources/mod.rs @@ -1,4 +1,3 @@ pub mod counters; -pub mod perf; pub mod tilemetrics; pub mod timings; diff --git a/crates/surfer/src/sources/perf.rs b/crates/surfer/src/sources/perf.rs deleted file mode 100644 index 5f884205..00000000 --- a/crates/surfer/src/sources/perf.rs +++ /dev/null @@ -1,200 +0,0 @@ -//! Consumer for `perf-{name}` MPMC queues emitted by `#[timed]` functions -//! built with the `perf` feature. Each `PerfSample` is one call: raw counter -//! values, positional by `silver_metrics::schema()` slot. -//! -//! Samples are summed into 1 s wall-clock buckets (mirroring -//! counters/timings); table stats average over the last -//! `TABLE_WINDOW_BUCKETS` so they track current behaviour, while the full -//! ring feeds the drill-down IPC chart. Counters are looked up by event name -//! (`instructions`, `cpu-cycles`, …); a name absent from the active schema -//! reads zero. - -use std::collections::VecDeque; - -use flux::communication::queue::{ConsumerBare, Queue}; -use silver_metrics::{MAX_EVENTS, PerfSample, slot}; - -use crate::{discovery::PerfFile, sources::counters::BUCKET_HISTORY_LEN}; - -/// Table stats average over the last 30 s of 1 s buckets. -const TABLE_WINDOW_BUCKETS: usize = 30; - -/// Call count and per-slot counter sums over one wall-clock bucket. -#[derive(Clone, Copy, Debug, Default)] -pub struct PerfBucket { - pub count: u64, - pub vals: [u64; MAX_EVENTS], -} - -impl PerfBucket { - /// Summed value of the event labelled `label`, or 0 if it isn't in the - /// active schema. - fn val(&self, label: &str) -> u64 { - slot(label).map_or(0, |i| self.vals[i]) - } - - /// CPU cycles under either spelling perf uses. - fn cycles(&self) -> u64 { - self.val("cpu-cycles").max(self.val("cycles")) - } - - #[inline] - pub fn ipc(&self) -> f64 { - let cycles = self.cycles(); - if cycles == 0 { 0.0 } else { self.val("instructions") as f64 / cycles as f64 } - } -} - -pub struct PerfSet { - pub name: String, - consumer: ConsumerBare, - /// In-progress bucket accumulated across drains since the last roll. - cur: PerfBucket, - /// Per-bucket sums over the retained window (240 × 1 s = 4 min). - pub history: VecDeque, - pub total_count: u64, -} - -impl PerfSet { - pub fn open(file: &PerfFile) -> Result { - let queue: Queue = Queue::try_open_shared(&file.path) - .map_err(|e| format!("open_shared({:?}): {e:?}", file.path))?; - let label: &'static str = Box::leak(format!("surfer-perf-{}", file.name).into_boxed_str()); - let consumer = ConsumerBare::::new(queue, label); - Ok(Self { - name: file.name.clone(), - consumer, - cur: PerfBucket::default(), - history: VecDeque::with_capacity(BUCKET_HISTORY_LEN), - total_count: 0, - }) - } - - /// Drain everything currently available into the in-progress bucket. - pub fn drain(&mut self) { - let mut sample = PerfSample::default(); - while self.consumer.try_consume(&mut sample).is_ok() { - self.cur.count += 1; - for i in 0..MAX_EVENTS { - self.cur.vals[i] += sample.vals[i]; - } - self.total_count += 1; - } - } - - /// Snapshot the in-progress bucket into the ring and reset. Driven by - /// the wall-clock bucket tick, same as counters/timings. - pub fn roll_bucket(&mut self) { - if self.history.len() == BUCKET_HISTORY_LEN { - self.history.pop_front(); - } - self.history.push_back(self.cur); - self.cur = PerfBucket::default(); - } - - /// Last `TABLE_WINDOW_BUCKETS` buckets — the table stats window. - fn recent(&self) -> impl Iterator + '_ { - self.history.iter().rev().take(TABLE_WINDOW_BUCKETS) - } - - fn recent_sums(&self) -> PerfBucket { - let mut acc = PerfBucket::default(); - for b in self.recent() { - acc.count += b.count; - for i in 0..MAX_EVENTS { - acc.vals[i] += b.vals[i]; - } - } - acc - } - - /// Mean calls/s over the table window (buckets are 1 s). - pub fn call_rate(&self) -> u64 { - let n = self.history.len().min(TABLE_WINDOW_BUCKETS) as u64; - if n == 0 { 0 } else { self.recent_sums().count / n } - } - - /// Mean instructions retired per call over the table window. - pub fn instr_avg(&self) -> u64 { - let s = self.recent_sums(); - if s.count == 0 { 0 } else { s.val("instructions") / s.count } - } - - /// Mean CPU cycles per call over the table window. - pub fn cycles_avg(&self) -> u64 { - let s = self.recent_sums(); - if s.count == 0 { 0 } else { s.cycles() / s.count } - } - - /// Cycle-weighted instructions per cycle over the table window. - pub fn ipc(&self) -> f64 { - self.recent_sums().ipc() - } - - /// Branch misses per 1k instructions over the table window. - pub fn branch_per_kinstr(&self) -> f64 { - let s = self.recent_sums(); - let instr = s.val("instructions"); - if instr == 0 { 0.0 } else { s.val("branch-misses") as f64 * 1000.0 / instr as f64 } - } - - /// Mean LLC misses per call over the table window. - pub fn cache_miss_avg(&self) -> u64 { - let s = self.recent_sums(); - if s.count == 0 { 0 } else { s.val("cache-misses") / s.count } - } - - pub fn has_data(&self) -> bool { - self.total_count > 0 - } -} - -#[cfg(test)] -mod tests { - use flux::communication::queue::{Producer, Queue, QueueType}; - - use super::*; - use crate::discovery::PerfFile; - - /// Build a sample by metric name (order-independent), so it tracks - /// whatever the default schema resolves to. - fn sample(instr: u64, cycles: u64, branch: u64, cache: u64) -> PerfSample { - let mut s = PerfSample::default(); - s.vals[slot("instructions").unwrap()] = instr; - s.vals[slot("cpu-cycles").unwrap()] = cycles; - s.vals[slot("branch-misses").unwrap()] = branch; - s.vals[slot("cache-misses").unwrap()] = cache; - s - } - - #[test] - fn aggregates_buckets() { - let tmp = std::env::temp_dir().join(format!("surfer_perf_{}", std::process::id())); - std::fs::remove_dir_all(&tmp).ok(); - std::fs::create_dir_all(&tmp).unwrap(); - let path = tmp.join("perf-test_fn"); - let queue: Queue = Queue::create_or_open_shared(&path, 4096, QueueType::MPMC); - let mut producer = Producer::from(queue); - let file = PerfFile { name: "test_fn".into(), path: path.clone() }; - let mut set = PerfSet::open(&file).unwrap(); - // Prime the cursor at head before producing (mirrors main.rs). - set.drain(); - - producer.produce(&sample(3000, 1000, 5, 12)); - producer.produce(&sample(1000, 1000, 3, 4)); - set.drain(); - set.roll_bucket(); - - assert_eq!(set.total_count, 2); - assert_eq!(set.call_rate(), 2); - assert_eq!(set.instr_avg(), 2000); - assert_eq!(set.cycles_avg(), 1000); - assert!((set.ipc() - 2.0).abs() < 1e-9, "ipc = {}", set.ipc()); - // 8 misses / 4000 instr = 2 per 1k. - assert!((set.branch_per_kinstr() - 2.0).abs() < 1e-9); - assert_eq!(set.cache_miss_avg(), 8); - assert!(set.has_data()); - - std::fs::remove_dir_all(&tmp).ok(); - } -} diff --git a/crates/surfer/src/sources/timings.rs b/crates/surfer/src/sources/timings.rs index 28a068d2..a9983cb6 100644 --- a/crates/surfer/src/sources/timings.rs +++ b/crates/surfer/src/sources/timings.rs @@ -1,12 +1,10 @@ -//! Consumer for flux `timing-{name}` / `latency-{name}` shmem queue -//! pairs. Each `TimingSet` covers one flux `Timer` instance — flux -//! always creates both queue files, but a given Timer may only emit -//! to one side (e.g. `#[timed]` writes only processing, tcache -//! consumer timers write only latency). Channels with no observed -//! events render as "no data" instead of empty graphs. +//! Consumer for flux `latency-{name}` shmem queues — the latency side +//! of a flux `Timer`, emitted by tcache consumers (reserve→first-read +//! gap). The processing side is no longer produced: `#[timed]` records +//! into the flamegraph rings now, not flux Timer queues. //! -//! On `roll_bucket` each channel's running histogram is snapshotted -//! into its 240-deep ring and reset. +//! On `roll_bucket` the running histogram is snapshotted into its +//! 240-deep ring and reset. use std::{collections::VecDeque, path::Path}; @@ -26,7 +24,7 @@ pub struct TimingBucket { pub p99_ns: u64, } -/// One side of a flux `Timer` — either processing or latency. +/// A tcache consumer's latency stream — reserve→first-read gap. pub struct TimingChannel { consumer: ConsumerBare, hist: Histogram, @@ -92,62 +90,17 @@ impl TimingChannel { }) } } - - /// `true` once at least one event has been drained — used by the - /// pane to decide whether to render a chart for this channel. - pub fn has_data(&self) -> bool { - self.total_count > 0 - } } pub struct TimingSet { pub name: String, - pub timing: Option, - pub latency: Option, + pub latency: TimingChannel, } impl TimingSet { pub fn open(file: &TimingFile) -> Result { - let timing = if let Some(path) = &file.timing_path { - let label: &'static str = Box::leak(format!("surfer-t-{}", file.name).into_boxed_str()); - TimingChannel::open(path, label).ok() - } else { - None - }; - let latency = if let Some(path) = &file.latency_path { - let label: &'static str = Box::leak(format!("surfer-l-{}", file.name).into_boxed_str()); - TimingChannel::open(path, label).ok() - } else { - None - }; - if timing.is_none() && latency.is_none() { - return Err(format!("no openable queue for {}", file.name)); - } - Ok(Self { name: file.name.clone(), timing, latency }) - } - - pub fn drain(&mut self) { - if let Some(c) = &mut self.timing { - c.drain(); - } - if let Some(c) = &mut self.latency { - c.drain(); - } - } - - pub fn roll_bucket(&mut self) { - if let Some(c) = &mut self.timing { - c.roll_bucket(); - } - if let Some(c) = &mut self.latency { - c.roll_bucket(); - } - } - - /// Channel preferred for the table row's summary stats: latency - /// when present (more interesting for spine/tcache timers), else - /// timing. - pub fn primary(&self) -> Option<&TimingChannel> { - self.latency.as_ref().filter(|c| c.has_data()).or(self.timing.as_ref()) + let label: &'static str = Box::leak(format!("surfer-l-{}", file.name).into_boxed_str()); + let latency = TimingChannel::open(&file.path, label)?; + Ok(Self { name: file.name.clone(), latency }) } } diff --git a/justfile b/justfile index f1bbb4d4..7330a408 100644 --- a/justfile +++ b/justfile @@ -43,6 +43,14 @@ nextest: perf-local events="instructions,cycles,l1d-misses,l2-misses,l3-misses": SILVER_PERF_EVENTS="{{events}}" cargo test --release -p silver_e2e --features perf-counters --test sync_pm_bs_perf -- --ignored --nocapture +# Run surfer (the metrics TUI). It folds the watched silver's `#[timed]` perf +# counters into the flamegraph whenever that silver published them (the producer +# opts into counters via its own `perf` feature); otherwise it shows timing +# only. Counter labels come from the silver's published schema, so no event list +# is needed here. Extra args pass through as `[BASE_DIR] [APP_NAME]`. +surfer *args='': + cargo run --release -p silver_surfer -- {{args}} + # Refresh crates/e2e/data/perf from mainnet (~13 min for the default 128 # blocks at lodestar's ~1 req / 6 s cap). Commit the result via git-lfs. # Pass `--continue` (and/or `--blocks N`) to resume after a network blip.