Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ fn main() -> Result<(), Box<dyn Error>> {
let _tracing = initialise_tracing_log("silver", 10, None, false);
tracing::debug!("start");

// `#[timed]` is inert until a process opts in.
silver_common::flamegraph_timer::enable_surfer();

let config = load_config()?;

tracing::info!("loaded config with fork digest: {}", hex::encode(config.fork_digest()));
Expand Down
100 changes: 17 additions & 83 deletions crates/common_macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,20 @@
//! Proc-macro support for the silver crates. Exposes `#[timed]`, which wraps a
//! function body in a thread-local flux `Timer` so processing time is emitted
//! to a per-function shmem queue (or folded into an in-process call tree under
//! the perf harness). Built with the `perf` feature, the same guard also
//! records hardware counters (instructions, cycles, branch/cache misses) via
//! rdpmc. Storage and Drop-side recording live in `silver_metrics`; this crate
//! is only the attribute-macro glue.
//! function body in a drop guard that records a frame open/close into the
//! cross-process flamegraph rings (folded in-process under the perf harness, or
//! by a surfer reading a running silver). Built with the `perf` feature, the
//! same marks carry hardware counters (instructions, cycles, branch/cache
//! misses) via rdpmc. Storage and Drop-side recording live in `silver_metrics`;
//! this crate is only the attribute-macro glue.

use proc_macro::TokenStream;
use quote::quote;
use syn::{
Ident, ItemFn, LitInt, LitStr, Token,
parse::{Parse, ParseStream},
parse_macro_input,
};
use syn::{ItemFn, LitStr, parse_macro_input};

struct TimedArgs {
name: Option<LitStr>,
sample: u64,
}

impl Parse for TimedArgs {
fn parse(input: ParseStream) -> syn::Result<Self> {
let mut name = None;
let mut sample = 1u64;
if input.peek(LitStr) {
name = Some(input.parse()?);
if input.peek(Token![,]) {
input.parse::<Token![,]>()?;
}
}
if !input.is_empty() {
let ident: Ident = input.parse()?;
if ident != "sample" {
return Err(syn::Error::new(ident.span(), "expected `sample = N`"));
}
input.parse::<Token![=]>()?;
let lit: LitInt = input.parse()?;
sample = lit.base10_parse()?;
if sample == 0 {
return Err(syn::Error::new(lit.span(), "sample must be >= 1"));
}
}
Ok(Self { name, sample })
}
}

/// Wrap a function body in a per-function thread-local flux `Timer`.
/// Wrap a function body in a `#[timed]` frame guard.
///
/// Default timer name is `concat!(module_path!(), "::", fn_name)`
/// resolved at the call site — so the queue file becomes
/// `timing-{crate}::{module}::{fn_name}` and stays unambiguous across
/// crates without manual labelling.
/// Default frame name is `concat!(module_path!(), "::", fn_name)` resolved at
/// the call site, so a frame reads `{crate}::{module}::{fn_name}` and stays
/// unambiguous across crates without manual labelling.
///
/// On a method, the default name auto-qualifies by the monomorphized `Self`:
/// `type_name` bakes the concrete type in per instantiation, so generic code
Expand All @@ -66,26 +30,17 @@ impl Parse for TimedArgs {
/// Pass a string literal to override: `#[timed("custom_name")]` uses that name
/// verbatim (no module prefix, no type qualification).
///
/// `sample = N` throttles the hardware-counter dimension (built with the `perf`
/// feature) to one in N calls on hot paths — timing is still recorded every
/// call. Combinable: `#[timed("custom_name", sample = 1000)]`. Without the
/// `perf` feature `sample` only affects which calls would have been counted, so
/// it is a no-op there.
///
/// Records processing time on every exit path — normal return, `?`,
/// early `return`, panic-unwind — via a Drop guard.
/// Records the frame's close on every exit path — normal return, `?`, early
/// `return`, panic-unwind — via a Drop guard.
#[proc_macro_attribute]
pub fn timed(attr: TokenStream, item: TokenStream) -> TokenStream {
let input = parse_macro_input!(item as ItemFn);
let args = parse_macro_input!(attr as TimedArgs);
let name = if attr.is_empty() { None } else { Some(parse_macro_input!(attr as LitStr)) };
let func_name_str = input.sig.ident.to_string();

let is_method = matches!(input.sig.inputs.first(), Some(syn::FnArg::Receiver(_)));
let timer_name_expr = match &args.name {
Some(lit) => {
let s = lit.value();
quote! { #s }
}
let timer_name_expr = match &name {
Some(lit) => quote! { #lit },
None if is_method => quote! {{
struct __TimedTy<T: ?Sized>(::core::marker::PhantomData<T>);
::core::any::type_name::<__TimedTy<Self>>()
Expand All @@ -95,30 +50,9 @@ pub fn timed(attr: TokenStream, item: TokenStream) -> TokenStream {

let ItemFn { attrs, vis, sig, block } = input;

let guard = if args.sample <= 1 {
quote! {
let __timed_guard = ::silver_metrics::TimerGuard::new(#timer_name_expr);
}
} else {
let sample = args.sample;
quote! {
::std::thread_local! {
static __TIMED_SKIP: ::core::cell::Cell<u64> =
const { ::core::cell::Cell::new(0) };
}
let __timed_sample = __TIMED_SKIP.with(|c| {
let n = c.get();
c.set(n.wrapping_add(1));
n % #sample == 0
});
let __timed_guard =
::silver_metrics::TimerGuard::new_sampled(#timer_name_expr, __timed_sample);
}
};

let expanded = quote! {
#(#attrs)* #vis #sig {
#guard
let __timed_guard = ::silver_metrics::TimerGuard::new(#timer_name_expr);
#block
}
};
Expand Down
8 changes: 4 additions & 4 deletions crates/e2e/src/perf/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::time::{Duration, Instant};

use silver_common::{
flamegraph_timer::{collect::enable, report::TimingStats},
flamegraph_timer::{LocalReader, report::TimingStats},
ssz_view::StatusView,
};

Expand Down Expand Up @@ -34,9 +34,9 @@ pub fn replay(fixtures: &Fixtures) -> ReplayOutcome {

let da_events: Vec<_> = blocks.iter().filter_map(|b| data_columns_available(b)).collect();

// Enable before harness construction — `new_heap` already runs `#[timed]` STF
// Start before harness construction — `new_heap` already runs `#[timed]` STF
// code.
enable();
let recorder = LocalReader::start();

let mut harness = PmBsHarness::new(&fixtures.state_ssz, n_blocks, blocks.len());
let anchor_finalized_epoch = harness.fork_choice_finalized_epoch();
Expand Down Expand Up @@ -102,7 +102,7 @@ pub fn replay(fixtures: &Fixtures) -> ReplayOutcome {
);

ReplayOutcome {
stats: TimingStats::collect(),
stats: recorder.collect(),
validator_count: harness.head_validator_count(),
wall_elapsed,
final_slot,
Expand Down
5 changes: 5 additions & 0 deletions crates/e2e/src/perf/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ impl PerfReport {
}

pub fn check_thresholds(&self) -> Result<(), String> {
if self.outcome.stats.missed_events() {
return Err("perf gate failed: a timing ring lost marks and the measurement is \
invalid; raise RING_CAPACITY in flamegraph_timer::queue_dir"
.into());
}
let failures: Vec<String> = self.gauges().iter().filter_map(Gauge::failure).collect();
if failures.is_empty() {
return Ok(());
Expand Down
1 change: 1 addition & 0 deletions crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version.workspace = true
silver_common_macros.workspace = true
flux.workspace = true
libc.workspace = true
rustc-hash.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing = { workspace = true, optional = true }
Expand Down
103 changes: 103 additions & 0 deletions crates/metrics/src/flamegraph_timer/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use flux::timing::Duration;
use rustc_hash::FxHashMap;

use crate::{
flamegraph_timer::mark::{Frame, Mark},
perf::PerfSample,
};

#[derive(Default)]
pub(crate) struct CallStackSamples {
pub(crate) tracked_ns: Vec<u64>,
pub(crate) total_untracked_ns: u64,
/// Summed entry→exit counter deltas across calls; all-zero when the
/// `perf` feature is off. `total_untracked_perf` excludes children, like
/// `total_untracked_ns`.
pub(crate) tracked_perf: PerfSample,
pub(crate) total_untracked_perf: PerfSample,
}

struct OpenFrame {
id: u64,
ts: u64,
perf: PerfSample,
total_tracked_ns: u64,
total_tracked_perf: PerfSample,
}

/// Folds mark streams into call-path timings keyed by portable frame id,
/// independent of how the marks were transported (in-process buffers or a
/// cross-process shmem ring). Frame ids stay opaque: resolving them to names
/// needs the producer's binary and belongs to the reader that owns it.
///
/// Each `fold_thread` call folds a mark stream from a fresh stack, so a
/// cumulative tree comes from re-folding the whole retained stream, not from
/// folding disjoint windows.
#[derive(Default)]
pub(crate) struct Aggregator {
paths: FxHashMap<Vec<u64>, CallStackSamples>,
/// A close popped an open with a different id — the stream desynced, so the
/// folded stats are unreliable (surfaced as `missed_events`). See
/// [`Self::fold_thread`].
desynced: bool,
}

impl Aggregator {
pub(crate) fn fold_thread(&mut self, marks: &[Mark], counters: &[PerfSample]) {
// `counters[i]` is the snapshot pushed right after `marks[i]` (same
// producer thread, lockstep order), so the two are index-aligned over
// their common prefix. A live drain (surfer reading while the producer
// runs) can snapshot the rings a push apart, leaving a 1-ish tail
// mismatch — benign: the extra counter is ignored and a mark missing
// its counter falls back to zero below. Perf off ⇒ `counters` empty.
let mut stack: Vec<OpenFrame> = Vec::new();
for (i, mark) in marks.iter().enumerate() {
let sample = counters.get(i).copied().unwrap_or_default();
let closing_id = match mark.frame {
Frame::Open { id, .. } => {
stack.push(OpenFrame {
id,
ts: mark.ts,
perf: sample,
total_tracked_ns: 0,
total_tracked_perf: PerfSample::default(),
});
continue;
}
Frame::Close { id } => id,
};

let Some(frame) = stack.pop() else { continue };
// Drop guards close in reverse open order, so a close must pop its
// own open. A mismatch means the stream desynced (a producer
// crash/restart, or a ring overwrite reordering marks); we still
// attribute to the popped frame but flag the fold as unreliable.
debug_assert_eq!(closing_id, frame.id, "timed close popped a non-matching open");
self.desynced |= closing_id != frame.id;
let tracked_ns = Duration(mark.ts.saturating_sub(frame.ts)).as_nanos() as u64;
let untracked_ns = tracked_ns.saturating_sub(frame.total_tracked_ns);
let tracked_perf = sample.delta(&frame.perf);
let untracked_perf = tracked_perf.delta(&frame.total_tracked_perf);

let path: Vec<u64> = stack.iter().map(|f| f.id).chain([frame.id]).collect();
if let Some(parent) = stack.last_mut() {
parent.total_tracked_ns += tracked_ns;
parent.total_tracked_perf = parent.total_tracked_perf.add(&tracked_perf);
}

let entry = self.paths.entry(path).or_default();
entry.tracked_ns.push(tracked_ns);
entry.total_untracked_ns += untracked_ns;
entry.tracked_perf = entry.tracked_perf.add(&tracked_perf);
entry.total_untracked_perf = entry.total_untracked_perf.add(&untracked_perf);
}
}

pub(crate) fn desynced(&self) -> bool {
self.desynced
}

pub(crate) fn into_paths(self) -> FxHashMap<Vec<u64>, CallStackSamples> {
self.paths
}
}
Loading
Loading