Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 212 additions & 113 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ members = [
"testing/testing-utils",
"testing/runner",
"crates/tracing-otlp",
"crates/firehose",
]
default-members = ["bin/reth"]
exclude = ["docs/cli"]
Expand Down Expand Up @@ -368,6 +369,7 @@ reth-execution-types = { path = "crates/evm/execution-types", default-features =
reth-exex = { path = "crates/exex/exex" }
reth-exex-test-utils = { path = "crates/exex/test-utils" }
reth-exex-types = { path = "crates/exex/types" }
reth-firehose = { path = "crates/firehose" }
reth-fs-util = { path = "crates/fs-util" }
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
reth-ipc = { path = "crates/rpc/ipc" }
Expand Down Expand Up @@ -569,6 +571,7 @@ tokio-util = { version = "0.7.4", features = ["codec"] }
async-compression = { version = "0.4", default-features = false }
async-stream = "0.3"
async-trait = "0.1.68"
firehose-tracer = "5.1.0"
futures = "0.3"
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false }
Expand Down Expand Up @@ -700,3 +703,8 @@ vergen-git2 = "9.1.0"

# networking
ipnet = "2.11"

[patch.crates-io]
# rbase64 unconditionally sets #[global_allocator] (MiMalloc), conflicting with reth's jemalloc.
# This vendor patch removes that declaration.
rbase64 = { path = "vendor/rbase64" }
1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ reth-node-ethereum.workspace = true
reth-node-builder.workspace = true
reth-node-metrics.workspace = true
reth-consensus.workspace = true
reth-firehose.workspace = true

# alloy
alloy-primitives.workspace = true
Expand Down
1 change: 1 addition & 0 deletions bin/reth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,5 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use reth_firehose as _;
use tracing as _;
38 changes: 32 additions & 6 deletions bin/reth/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_firehose::FirehoseArgs;
use reth_node_ethereum::EthereumNode;
use tracing::info;

Expand All @@ -25,12 +26,37 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}

if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;

handle.wait_for_node_exit().await
}) {
if let Err(err) =
Cli::<EthereumChainSpecParser, FirehoseArgs>::parse().run(async move |builder, args| {
info!(target: "reth::cli", "Launching node");

// Resolve the node data directory for the Firehose cursor file default path.
let data_dir = builder.config().datadir().data_dir().to_path_buf();

// Build the tracer config from CLI args and init the global tracer.
let cfg = args.to_tracer_config(&data_dir);
let shutdown_handle = reth_firehose::init_tracer(cfg);

let handle = builder
.node(EthereumNode::default())
.on_component_initialized(move |node| {
// Wire the background writer's drain into the node shutdown lifecycle.
if let Some(handle) = shutdown_handle {
node.task_executor.spawn_with_graceful_shutdown_signal(
|shutdown| async move {
let _guard = shutdown.await;
handle.drain();
},
);
}
Ok(())
})
.launch_with_debug_capabilities()
.await?;

handle.wait_for_node_exit().await
})
{
eprintln!("Error: {err:?}");
std::process::exit(1);
}
Expand Down
20 changes: 20 additions & 0 deletions crates/firehose/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "reth-firehose"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Firehose integration for Reth: async emission, shutdown wiring, and cursor writing"

[lints]
workspace = true

[dependencies]
# firehose
firehose-tracer.workspace = true

# misc
clap = { workspace = true, features = ["derive", "env"] }
tracing.workspace = true
112 changes: 112 additions & 0 deletions crates/firehose/src/args.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! CLI arguments for the Firehose integration.
//!
//! These arguments control how the Firehose tracer emits blocks to stdout,
//! where to write the cursor file, and how async emission is configured.

use clap::Args;
use firehose_tracer::EmissionMode;
use std::{path::PathBuf, time::Duration};

/// Firehose emission mode, mirroring [`EmissionMode`] for CLI parsing.
#[derive(Debug, Clone, Default, clap::ValueEnum)]
pub enum EmissionModeArg {
/// Encode and write blocks inline on the calling thread (legacy behaviour).
Blocking,
/// Encode and write blocks in a dedicated background thread with backpressure.
Async,
/// Switch automatically based on block age (catch-up → async, live → blocking).
#[default]
Auto,
}

/// CLI arguments for the Firehose tracer integration.
///
/// Add `#[command(flatten)]` to include these in a `NodeCommand` extension struct.
#[derive(Debug, Clone, Args)]
pub struct FirehoseArgs {
/// Controls when and how encoded blocks are written to stdout.
///
/// - `blocking`: encode → base64 → write, all inline on the calling thread (legacy).
/// - `async`: encode and write in a background thread; backpressure via channel.
/// - `auto`: use async for blocks older than `--firehose.live-threshold`; use blocking for
/// blocks within the live window (default).
#[arg(
id = "firehose.emission-mode",
long = "firehose.emission-mode",
value_name = "MODE",
default_value = "auto",
verbatim_doc_comment
)]
pub emission_mode: EmissionModeArg,

/// Channel capacity for the async emission path.
///
/// The background writer thread will block producers once this many encoded
/// blocks are waiting, providing backpressure. Only relevant for `async` and
/// `auto` modes.
#[arg(
id = "firehose.channel-capacity",
long = "firehose.channel-capacity",
value_name = "N",
default_value_t = 32
)]
pub channel_capacity: usize,

/// Age threshold in seconds used by `auto` emission mode.
///
/// Blocks with a timestamp more than this many seconds behind wall-clock time
/// are considered historical (catch-up) and will use the async path.
/// Blocks within this window are considered live and will use the blocking path.
#[arg(
id = "firehose.live-threshold",
long = "firehose.live-threshold",
value_name = "SECS",
default_value_t = 60
)]
pub live_threshold_secs: u64,

/// Path to the cursor file that tracks the last block successfully emitted to stdout.
///
/// After each block is written the cursor file is updated atomically so that the
/// node can detect gaps after an unclean shutdown. Defaults to `<datadir>/firehose.cursor`
/// when not set.
#[arg(id = "firehose.cursor-path", long = "firehose.cursor-path", value_name = "PATH")]
pub cursor_path: Option<PathBuf>,
}

impl Default for FirehoseArgs {
fn default() -> Self {
Self {
emission_mode: EmissionModeArg::Auto,
channel_capacity: 32,
live_threshold_secs: 60,
cursor_path: None,
}
}
}

impl FirehoseArgs {
/// Convert the parsed CLI args into a [`firehose_tracer::config::Config`].
///
/// `data_dir` is used to derive the default cursor file path when
/// `--firehose.cursor-path` is not specified.
pub fn to_tracer_config(&self, data_dir: &std::path::Path) -> firehose_tracer::config::Config {
let cursor_path =
self.cursor_path.clone().unwrap_or_else(|| data_dir.join("firehose.cursor"));

let emission_mode = match self.emission_mode {
EmissionModeArg::Blocking => EmissionMode::Blocking,
EmissionModeArg::Async => {
EmissionMode::Async { channel_capacity: self.channel_capacity }
}
EmissionModeArg::Auto => EmissionMode::Auto {
channel_capacity: self.channel_capacity,
live_threshold: Duration::from_secs(self.live_threshold_secs),
},
};

firehose_tracer::config::Config::new()
.with_emission_mode(emission_mode)
.with_cursor_path(cursor_path)
}
}
57 changes: 57 additions & 0 deletions crates/firehose/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! Reth ↔ Firehose integration.
//!
//! This crate provides:
//!
//! * **`FirehoseArgs`** — clap argument group that exposes `--firehose.*` CLI flags (emission mode,
//! channel capacity, live threshold, cursor path).
//! * **`init_tracer`** — one-shot initialisation that stores the tracer in a process-wide
//! `OnceLock` and returns an optional [`ShutdownHandle`](firehose_tracer::ShutdownHandle) for the
//! async emission background thread.

#![cfg_attr(not(test), warn(unused_crate_dependencies))]

mod args;
pub use args::FirehoseArgs;

use std::sync::{Mutex, OnceLock};
use tracing::{debug, info};

/// Process-wide Firehose tracer, initialised at most once per process.
static GLOBAL_TRACER: OnceLock<Mutex<firehose_tracer::Tracer>> = OnceLock::new();

/// Initialise the global Firehose tracer with the given configuration.
///
/// Returns a [`ShutdownHandle`](firehose_tracer::ShutdownHandle) when the emission mode has an
/// async background thread. The caller is responsible for calling
/// [`ShutdownHandle::drain`](firehose_tracer::ShutdownHandle::drain) before the process exits so
/// that the background thread can flush all buffered blocks.
///
/// # Panics
///
/// Panics if called more than once in the same process.
pub fn init_tracer(config: firehose_tracer::config::Config) -> Option<firehose_tracer::ShutdownHandle> {
info!(
target: "reth::firehose",
cursor_path = ?config.cursor_path,
"Initialising Firehose tracer"
);

let mut tracer = firehose_tracer::Tracer::new(config);
let shutdown_handle = tracer.shutdown_handle();

debug!(
target: "reth::firehose",
has_async_thread = shutdown_handle.is_some(),
"Firehose tracer created"
);

GLOBAL_TRACER.set(Mutex::new(tracer)).expect("init_tracer called more than once");

shutdown_handle
}

/// Returns a reference to the global [`Mutex<Tracer>`], or `None` if the
/// tracer has not been initialised via [`init_tracer`].
pub fn get_tracer() -> Option<&'static Mutex<firehose_tracer::Tracer>> {
GLOBAL_TRACER.get()
}
16 changes: 16 additions & 0 deletions vendor/rbase64/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Patched rbase64: removed mimalloc global allocator that conflicts with reth's jemalloc.
# See the workspace [patch.crates-io] section.

[package]
edition = "2021"
name = "rbase64"
version = "2.0.3"
authors = ["Marcel Riera <marcel.riera@outlook.com>"]
description = "A fast multi-threaded base64 encoding library and CLI tool (patched: no global allocator)"
license = "MIT OR Apache-2.0"
repository = "https://github.com/uhmarcel/rbase64"

[dependencies]

[dev-dependencies]
rand = { version = "0.8.5", features = ["small_rng"] }
42 changes: 42 additions & 0 deletions vendor/rbase64/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
pub const ENCODE_MAP: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
pub const DECODE_MAP: &[u8; 256] = &construct_decode_map();

pub const ENC_CHUNK_SIZE: usize = 5;
pub const DEC_CHUNK_SIZE: usize = 2;

pub const SIX_BIT_MASK: u128 = 0x3f;
pub const BYTE_MASK: u64 = 0xff;
pub const INVALID_BYTE: u8 = 0x40;

#[cfg(feature = "parallel")]
pub const PARALLEL_THRESHOLD_BYTES: usize = 2 << 16; // 128 KiB
#[cfg(feature = "parallel")]
pub const PARALLEL_BATCH_SIZE: usize = 256;

const fn construct_decode_map() -> [u8; 256] {
let mut map = [INVALID_BYTE; 256];
let mut index = 0;

while index < 64 {
map[ENCODE_MAP[index] as usize] = index as u8;
index += 1;
}
map
}

#[cfg(test)]
mod tests {
use crate::common::construct_decode_map;
use crate::ENCODE_MAP;

#[test]
fn should_construct_matching_encode_decode_tables() {
for byte in 0..64 {
assert_eq!(
construct_decode_map()[ENCODE_MAP[byte] as usize],
byte as u8
);
}
}
}
Loading