Skip to content

Commit 04633d0

Browse files
committed
chore(audit): apply workspace lints
1 parent 909e322 commit 04633d0

9 files changed

Lines changed: 300 additions & 77 deletions

File tree

Cargo.lock

Lines changed: 0 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/audit/Cargo.toml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ homepage.workspace = true
77
repository.workspace = true
88
edition.workspace = true
99

10+
[lints]
11+
workspace = true
12+
1013
[dependencies]
1114
bytes.workspace = true
1215
metrics.workspace = true
13-
dotenvy.workspace = true
1416
async-trait.workspace = true
1517
metrics-derive.workspace = true
16-
aws-credential-types.workspace = true
1718
tips-core = { workspace = true, features = ["test-utils"] }
1819
serde = { workspace = true, features = ["std", "derive"] }
1920
tokio = { workspace = true, features = ["full"] }
@@ -23,12 +24,7 @@ anyhow = { workspace = true, features = ["std"] }
2324
serde_json = { workspace = true, features = ["std"] }
2425
rdkafka = { workspace = true, features = ["tokio", "libz"] }
2526
alloy-consensus = { workspace = true, features = ["std"] }
26-
alloy-provider = { workspace = true, features = ["reqwest"] }
27-
clap = { version = "4.5.47", features = ["std", "derive", "env"] }
28-
op-alloy-consensus = { workspace = true, features = ["std", "k256", "serde"] }
2927
alloy-primitives = { workspace = true, features = ["map-foldhash", "serde"] }
30-
tracing-subscriber = { workspace = true, features = ["std", "fmt", "env-filter", "json"] }
31-
aws-config = { workspace = true, features = ["default-https-client", "rt-tokio"] }
3228
aws-sdk-s3 = { workspace = true, features = ["rustls", "default-https-client", "rt-tokio"] }
3329

3430
[dev-dependencies]

crates/audit/src/archiver.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::metrics::Metrics;
22
use crate::reader::EventReader;
33
use crate::storage::EventWriter;
44
use anyhow::Result;
5+
use std::fmt;
56
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
67
use tokio::time::sleep;
78
use tracing::{error, info};
89

10+
/// Archives audit events from Kafka to S3 storage.
911
pub struct KafkaAuditArchiver<R, W>
1012
where
1113
R: EventReader,
@@ -16,11 +18,22 @@ where
1618
metrics: Metrics,
1719
}
1820

21+
impl<R, W> fmt::Debug for KafkaAuditArchiver<R, W>
22+
where
23+
R: EventReader,
24+
W: EventWriter + Clone + Send + 'static,
25+
{
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
f.debug_struct("KafkaAuditArchiver").finish_non_exhaustive()
28+
}
29+
}
30+
1931
impl<R, W> KafkaAuditArchiver<R, W>
2032
where
2133
R: EventReader,
2234
W: EventWriter + Clone + Send + 'static,
2335
{
36+
/// Creates a new archiver with the given reader and writer.
2437
pub fn new(reader: R, writer: W) -> Self {
2538
Self {
2639
reader,
@@ -29,6 +42,7 @@ where
2942
}
3043
}
3144

45+
/// Runs the archiver loop, reading events and writing them to storage.
3246
pub async fn run(&mut self) -> Result<()> {
3347
info!("Starting Kafka bundle archiver");
3448

crates/audit/src/lib.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,47 @@
1-
pub mod archiver;
2-
pub mod metrics;
3-
pub mod publisher;
4-
pub mod reader;
5-
pub mod storage;
6-
pub mod types;
1+
//! Audit library for tracking and archiving bundle and user operation events.
2+
//!
3+
//! This crate provides functionality for publishing events to Kafka,
4+
//! archiving them to S3, and reading event history.
5+
6+
#![doc(issue_tracker_base_url = "https://github.com/base/tips/issues/")]
7+
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
8+
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9+
10+
mod archiver;
11+
pub use archiver::KafkaAuditArchiver;
12+
13+
mod metrics;
14+
pub use metrics::Metrics;
15+
16+
mod publisher;
17+
pub use publisher::{
18+
BundleEventPublisher, KafkaBundleEventPublisher, KafkaUserOpEventPublisher,
19+
LoggingBundleEventPublisher, LoggingUserOpEventPublisher, UserOpEventPublisher,
20+
};
21+
22+
mod reader;
23+
pub use reader::{
24+
Event, EventReader, KafkaAuditLogReader, KafkaUserOpAuditLogReader, UserOpEventReader,
25+
UserOpEventWrapper, assign_topic_partition, create_kafka_consumer,
26+
};
27+
28+
mod storage;
29+
pub use storage::{
30+
BundleEventS3Reader, BundleHistory, BundleHistoryEvent, EventWriter, S3EventReaderWriter,
31+
S3Key, TransactionMetadata, UserOpEventS3Reader, UserOpEventWriter, UserOpHistory,
32+
UserOpHistoryEvent,
33+
};
34+
35+
mod types;
36+
pub use types::{
37+
BundleEvent, BundleId, DropReason, Transaction, TransactionId, UserOpDropReason, UserOpEvent,
38+
UserOpHash,
39+
};
740

841
use tokio::sync::mpsc;
942
use tracing::error;
1043

11-
pub use archiver::*;
12-
pub use publisher::*;
13-
pub use reader::*;
14-
pub use storage::*;
15-
pub use types::*;
16-
44+
/// Connects a bundle event receiver to a publisher, spawning a task to forward events.
1745
pub fn connect_audit_to_publisher<P>(event_rx: mpsc::UnboundedReceiver<BundleEvent>, publisher: P)
1846
where
1947
P: BundleEventPublisher + 'static,
@@ -28,6 +56,7 @@ where
2856
});
2957
}
3058

59+
/// Connects a user operation event receiver to a publisher, spawning a task to forward events.
3160
pub fn connect_userop_audit_to_publisher<P>(
3261
event_rx: mpsc::UnboundedReceiver<UserOpEvent>,
3362
publisher: P,

crates/audit/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,51 @@
11
use metrics::{Counter, Gauge, Histogram};
22
use metrics_derive::Metrics;
33

4+
/// Metrics for audit operations including Kafka reads, S3 writes, and event processing.
45
#[derive(Metrics, Clone)]
56
#[metrics(scope = "tips_audit")]
67
pub struct Metrics {
8+
/// Duration of archive_event operations.
79
#[metric(describe = "Duration of archive_event")]
810
pub archive_event_duration: Histogram,
911

12+
/// Age of event when processed (now - event timestamp).
1013
#[metric(describe = "Age of event when processed (now - event timestamp)")]
1114
pub event_age: Histogram,
1215

16+
/// Duration of Kafka read_event operations.
1317
#[metric(describe = "Duration of Kafka read_event")]
1418
pub kafka_read_duration: Histogram,
1519

20+
/// Duration of Kafka commit operations.
1621
#[metric(describe = "Duration of Kafka commit")]
1722
pub kafka_commit_duration: Histogram,
1823

24+
/// Duration of update_bundle_history operations.
1925
#[metric(describe = "Duration of update_bundle_history")]
2026
pub update_bundle_history_duration: Histogram,
2127

28+
/// Duration of updating all transaction indexes.
2229
#[metric(describe = "Duration of update all transaction indexes")]
2330
pub update_tx_indexes_duration: Histogram,
2431

32+
/// Duration of S3 get_object operations.
2533
#[metric(describe = "Duration of S3 get_object")]
2634
pub s3_get_duration: Histogram,
2735

36+
/// Duration of S3 put_object operations.
2837
#[metric(describe = "Duration of S3 put_object")]
2938
pub s3_put_duration: Histogram,
3039

40+
/// Total events processed.
3141
#[metric(describe = "Total events processed")]
3242
pub events_processed: Counter,
3343

44+
/// Total S3 writes skipped due to deduplication.
3445
#[metric(describe = "Total S3 writes skipped due to dedup")]
3546
pub s3_writes_skipped: Counter,
3647

48+
/// Number of in-flight archive tasks.
3749
#[metric(describe = "Number of in-flight archive tasks")]
3850
pub in_flight_archive_tasks: Gauge,
3951
}

crates/audit/src/publisher.rs

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,36 @@ use crate::types::{BundleEvent, UserOpEvent};
22
use anyhow::Result;
33
use async_trait::async_trait;
44
use rdkafka::producer::{FutureProducer, FutureRecord};
5-
use serde_json;
65
use tracing::{debug, error, info};
76

7+
/// Trait for publishing bundle events.
88
#[async_trait]
99
pub trait BundleEventPublisher: Send + Sync {
10+
/// Publishes a single bundle event.
1011
async fn publish(&self, event: BundleEvent) -> Result<()>;
1112

13+
/// Publishes multiple bundle events.
1214
async fn publish_all(&self, events: Vec<BundleEvent>) -> Result<()>;
1315
}
1416

17+
/// Publishes bundle events to Kafka.
1518
#[derive(Clone)]
1619
pub struct KafkaBundleEventPublisher {
1720
producer: FutureProducer,
1821
topic: String,
1922
}
2023

24+
impl std::fmt::Debug for KafkaBundleEventPublisher {
25+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26+
f.debug_struct("KafkaBundleEventPublisher")
27+
.field("topic", &self.topic)
28+
.finish_non_exhaustive()
29+
}
30+
}
31+
2132
impl KafkaBundleEventPublisher {
22-
pub fn new(producer: FutureProducer, topic: String) -> Self {
33+
/// Creates a new Kafka bundle event publisher.
34+
pub const fn new(producer: FutureProducer, topic: String) -> Self {
2335
Self { producer, topic }
2436
}
2537

@@ -71,11 +83,13 @@ impl BundleEventPublisher for KafkaBundleEventPublisher {
7183
}
7284
}
7385

74-
#[derive(Clone)]
86+
/// Publishes bundle events to logs (for testing/debugging).
87+
#[derive(Clone, Debug)]
7588
pub struct LoggingBundleEventPublisher;
7689

7790
impl LoggingBundleEventPublisher {
78-
pub fn new() -> Self {
91+
/// Creates a new logging bundle event publisher.
92+
pub const fn new() -> Self {
7993
Self
8094
}
8195
}
@@ -105,21 +119,34 @@ impl BundleEventPublisher for LoggingBundleEventPublisher {
105119
}
106120
}
107121

122+
/// Trait for publishing user operation events.
108123
#[async_trait]
109124
pub trait UserOpEventPublisher: Send + Sync {
125+
/// Publishes a single user operation event.
110126
async fn publish(&self, event: UserOpEvent) -> Result<()>;
111127

128+
/// Publishes multiple user operation events.
112129
async fn publish_all(&self, events: Vec<UserOpEvent>) -> Result<()>;
113130
}
114131

132+
/// Publishes user operation events to Kafka.
115133
#[derive(Clone)]
116134
pub struct KafkaUserOpEventPublisher {
117135
producer: FutureProducer,
118136
topic: String,
119137
}
120138

139+
impl std::fmt::Debug for KafkaUserOpEventPublisher {
140+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141+
f.debug_struct("KafkaUserOpEventPublisher")
142+
.field("topic", &self.topic)
143+
.finish_non_exhaustive()
144+
}
145+
}
146+
121147
impl KafkaUserOpEventPublisher {
122-
pub fn new(producer: FutureProducer, topic: String) -> Self {
148+
/// Creates a new Kafka user operation event publisher.
149+
pub const fn new(producer: FutureProducer, topic: String) -> Self {
123150
Self { producer, topic }
124151
}
125152

@@ -171,11 +198,13 @@ impl UserOpEventPublisher for KafkaUserOpEventPublisher {
171198
}
172199
}
173200

174-
#[derive(Clone)]
201+
/// Publishes user operation events to logs (for testing/debugging).
202+
#[derive(Clone, Debug)]
175203
pub struct LoggingUserOpEventPublisher;
176204

177205
impl LoggingUserOpEventPublisher {
178-
pub fn new() -> Self {
206+
/// Creates a new logging user operation event publisher.
207+
pub const fn new() -> Self {
179208
Self
180209
}
181210
}

0 commit comments

Comments
 (0)