Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,795 changes: 1,237 additions & 558 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ default-run = "vector"
autobenches = false # our benchmarks are not runnable on their own either way
# Minimum supported rust version
# See docs/DEVELOPING.md for policy
rust-version = "1.81"
rust-version = "1.85"

[[bin]]
name = "vector"
Expand Down Expand Up @@ -110,6 +110,7 @@ members = [
"lib/observo/lv3",
"lib/observo/stcp",
"lib/observo/wef",
"lib/observo/eventhub",
"lib/observo/ssa",
"lib/observo/obvrl",
"lib/observo/gcs",
Expand Down Expand Up @@ -175,7 +176,7 @@ prost-build = { version = "0.12", default-features = false }
prost-reflect = { version = "0.14", features = ["serde"], default-features = false }
prost-types = { version = "0.12", default-features = false }
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }
serde_json = { version = "=1.0.133", default-features = false, features = ["raw_value", "std"] }
serde_json = { version = "=1.0.149", default-features = false, features = ["raw_value", "std"] }
serde = { version = "1.0.215", default-features = false, features = ["alloc", "derive", "rc"] }
serde_with = { version = "3.12.0", default-features = false, features = ["macros", "std"] }
snafu = { version = "0.7.5", default-features = false, features = ["futures", "std"] }
Expand Down Expand Up @@ -234,6 +235,7 @@ ssa = { path = "lib/observo/ssa", optional = true }
chkpts = { path = "lib/observo/chkpts", optional = true }
stcp = { path = "lib/observo/stcp", optional = true }
wef = { path = "lib/observo/wef", optional = true }
eventhub = { path = "lib/observo/eventhub", optional = true }
dnsmsg-parser = { path = "lib/dnsmsg-parser", optional = true }
dnstap-parser = { path = "lib/dnstap-parser", optional = true }
fakedata = { path = "lib/fakedata", optional = true }
Expand Down Expand Up @@ -410,7 +412,7 @@ postgres-openssl = { version = "0.5.0", default-features = false, features = ["r
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
rand.workspace = true
rand_distr.workspace = true
rdkafka = { version = "0.37.0", default-features = false, features = ["curl-static", "tokio", "libz-static", "ssl", "zstd"], optional = true }
rdkafka = { version = "0.39.0", default-features = false, features = ["curl-static", "tokio", "libz-static", "ssl", "zstd"], optional = true }
redis = { version = "0.24.0", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true }
regex.workspace = true
roaring = { version = "0.10.7", default-features = false, features = ["std"], optional = true }
Expand Down Expand Up @@ -708,6 +710,7 @@ sources-utils-net-udp = ["listenfd", "vector-lib/sources-utils-net-udp"]
sources-utils-net-unix = []
sources-websocket = ["dep:tokio-tungstenite"]
sources-wef = ["dep:wef"]
sources-eventhub = ["dep:eventhub"]

sources-vector = ["dep:prost", "dep:tonic", "dep:jsonwebtoken", "protobuf-build"]

Expand Down Expand Up @@ -1004,6 +1007,7 @@ observo = [
"observo-chkpts",
"sources-stcp",
"sources-wef",
"sources-eventhub",
"observo-vrl",
"sources-gcs",
"sinks-azs",
Expand Down
43 changes: 43 additions & 0 deletions lib/observo/eventhub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[package]
name = "eventhub"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
# Azure SDK (latest generation — requires Rust 1.85+)
azure_messaging_eventhubs = "0.12"
azure_messaging_eventhubs_checkpointstore_blob = "0.6"
azure_core = "0.33"
azure_identity = "0.33"
azure_storage_blob = "0.10"

# Time (used by EventProcessor builder)
time = "0.3"

# Async runtime
tokio.workspace = true
futures.workspace = true

# Utilities
anyhow.workspace = true
serde.workspace = true
chrono.workspace = true
tracing.workspace = true
smallvec.workspace = true
toml.workspace = true

# Vector libraries
vector-lib.workspace = true
vector-config.workspace = true
vector-config-macros.workspace = true
vector-config-common.workspace = true
vector-core = { path = "../../vector-core" }
lookup = { package = "vector-lookup", path = "../../vector-lookup" }
inventory.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }

[features]
default = []
1 change: 1 addition & 0 deletions lib/observo/eventhub/src/eventhub
8 changes: 8 additions & 0 deletions lib/observo/eventhub/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
This file is NOT part of the open-source components licensed under the Mozilla Public License, v. 2.0 (MPL-2.0).
Proprietary and Confidential – © 2026 Observo Inc.
Unauthorized copying, modification, distribution, or disclosure of this file, via any medium, is strictly prohibited.
This file is distributed separately and is not subject to the terms of the MPL-2.0.
**/
mod eventhub;
pub use eventhub::*;
2 changes: 1 addition & 1 deletion lib/observo/private
Submodule private updated from f6ab25 to 3433b4
93 changes: 93 additions & 0 deletions src/sources/eventhub/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
This file is NOT part of the open-source components licensed under the Mozilla Public License, v. 2.0 (MPL-2.0).
Proprietary and Confidential – © 2026 Observo Inc.
Unauthorized copying, modification, distribution, or disclosure of this file, via any medium, is strictly prohibited.
This file is distributed separately and is not subject to the terms of the MPL-2.0.
**/

use std::collections::BTreeSet;
use futures::FutureExt;

use vector_lib::{
config::{DataType, LogNamespace, SourceOutput},
schema::Definition,
source::Source,
Result,
};
use vector_config::{configurable_component, impl_generate_config_from_default};

use crate::config::{Resource, SourceConfig, SourceContext};
use eventhub::EventHubSourceConfig;

use tracing::error;

/// Configuration for the `eventhub` source.
#[configurable_component(source("eventhub"))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct Config {
/// Core Event Hub source configuration
#[configurable(derived)]
#[serde(flatten)]
pub eventhub_config: EventHubSourceConfig,

/// Log namespace configuration
#[serde(default)]
pub log_namespace: Option<bool>,
}

impl Default for Config {
fn default() -> Self {
Self {
eventhub_config: EventHubSourceConfig::default(),
log_namespace: None,
}
}
}

impl_generate_config_from_default!(Config);

#[async_trait::async_trait]
#[typetag::serde(name = "eventhub")]
impl SourceConfig for Config {
async fn build(&self, cx: SourceContext) -> Result<Source> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be good to have a source-compliance-assertion test if test-container for this exists.

let log_namespace = cx.log_namespace(self.log_namespace);

let src = eventhub::run_eventhub_source(
self.eventhub_config.clone(),
cx.out,
cx.shutdown.map(|_token| ()),
log_namespace,
)
.map(|r| match r {
Ok(_) => Ok(()),
Err(e) => {
error!("Event Hub source terminated: {}", e);
Err(())
}
});

Ok(Box::pin(src))
}

fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let lns_set = BTreeSet::from([log_namespace]);

let schema_definition =
Definition::default_for_namespace(&lns_set).with_standard_vector_source_metadata();

vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn resources(&self) -> Vec<Resource> {
vec![]
}

fn can_acknowledge(&self) -> bool {
false
}
}
2 changes: 2 additions & 0 deletions src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub mod syslog;
pub mod vector;
#[cfg(all(feature = "sources-wef"))]
pub mod wef;
#[cfg(all(feature = "sources-eventhub"))]
pub mod eventhub;
#[cfg(feature = "sources-websocket")]
pub mod websocket;

Expand Down