Skip to content

Commit e9ef353

Browse files
committed
Event Reporter with interface for multiple clients
Signed-off-by: aakugan <aakashganapathy2@gmail.com>
1 parent da27d8b commit e9ef353

13 files changed

Lines changed: 1085 additions & 0 deletions

File tree

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,5 +494,6 @@ extensions/upstreams/tcp @ggreenway @mattklein123
494494
/contrib/peak_ewma/load_balancing_policies/ @rroblak @UNOWNED
495495
/contrib/kae/ @Misakokoro @UNOWNED
496496
/contrib/istio @kyessenov @wbpcode @keithmattix @krinkinmu @zirain
497+
/contrib/reverse_tunnel_reporter @agrawroh @aakugan
497498

498499
/compat/openssl/ @tedjpoole @envoyproxy/envoy-openssl-sync

contrib/contrib_build_config.bzl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,10 @@ CONTRIB_EXTENSIONS = {
118118
#
119119

120120
"envoy.upstreams.http.tcp.golang": "//contrib/golang/upstreams/http/tcp/source:config",
121+
122+
#
123+
# Reverse tunnel reporters
124+
#
125+
126+
"envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service": "//contrib/reverse_tunnel_reporter/source:config",
121127
}

contrib/extensions_metadata.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,3 +205,11 @@ envoy.load_balancing_policies.peak_ewma:
205205
status: alpha
206206
type_urls:
207207
- envoy.extensions.load_balancing_policies.peak_ewma.v3alpha.PeakEwma
208+
envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service:
209+
categories:
210+
- envoy.bootstrap
211+
security_posture: requires_trusted_downstream_and_upstream
212+
status: alpha
213+
type_urls:
214+
- envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters.EventReporterConfig
215+
- envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client.GrpcClientConfig
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load(
2+
"//bazel:envoy_build_system.bzl",
3+
"envoy_cc_contrib_extension",
4+
"envoy_cc_library",
5+
"envoy_contrib_package",
6+
)
7+
8+
licenses(["notice"]) # Apache 2
9+
10+
envoy_contrib_package()
11+
12+
envoy_cc_library(
13+
name = "reverse_tunnel_event_types",
14+
hdrs = [
15+
"reverse_tunnel_event_types.h",
16+
],
17+
deps = [
18+
"//envoy/common:pure_lib",
19+
"//envoy/common:time_interface",
20+
"//envoy/config:typed_config_interface",
21+
"//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib",
22+
"//envoy/server:factory_context_interface",
23+
"//source/common/common:fmt_lib",
24+
"//source/common/config:utility_lib",
25+
"//source/common/protobuf",
26+
"//source/common/protobuf:message_validator_lib",
27+
],
28+
)
29+
30+
envoy_cc_contrib_extension(
31+
name = "config",
32+
deps = [
33+
"//contrib/reverse_tunnel_reporter/source/reporters:reporters_lib",
34+
],
35+
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
load(
2+
"//bazel:envoy_build_system.bzl",
3+
"envoy_cc_library",
4+
"envoy_contrib_package",
5+
)
6+
7+
licenses(["notice"]) # Apache 2
8+
9+
envoy_contrib_package()
10+
11+
envoy_cc_library(
12+
name = "reporters_lib",
13+
deps = [
14+
"//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib",
15+
],
16+
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load(
2+
"//bazel:envoy_build_system.bzl",
3+
"envoy_cc_library",
4+
"envoy_contrib_package",
5+
)
6+
7+
licenses(["notice"]) # Apache 2
8+
9+
envoy_contrib_package()
10+
11+
envoy_cc_library(
12+
name = "event_reporter_lib",
13+
srcs = [
14+
"factory.cc",
15+
"reporter.cc",
16+
],
17+
hdrs = [
18+
"factory.h",
19+
"reporter.h",
20+
],
21+
deps = [
22+
"//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types",
23+
"//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib",
24+
"//envoy/registry",
25+
"//source/common/common:logger_lib",
26+
"//source/common/config:utility_lib",
27+
"//source/common/protobuf:utility_lib",
28+
"@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto",
29+
],
30+
)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h"
2+
3+
#include "envoy/registry/registry.h"
4+
5+
#include "source/common/config/utility.h"
6+
#include "source/common/protobuf/utility.h"
7+
8+
namespace Envoy {
9+
namespace Extensions {
10+
namespace Bootstrap {
11+
namespace ReverseConnection {
12+
13+
ReverseTunnelReporterPtr
14+
EventReporterFactory::createReporter(Server::Configuration::ServerFactoryContext& context,
15+
ProtobufTypes::MessagePtr config) {
16+
const auto& reporter_config = MessageUtil::downcastAndValidate<const ConfigProto&>(
17+
*config, context.messageValidationVisitor());
18+
19+
std::vector<ReverseTunnelReporterClientPtr> clients;
20+
clients.reserve(reporter_config.clients().size());
21+
for (const auto& client_config : reporter_config.clients()) {
22+
clients.push_back(createClient(context, client_config));
23+
}
24+
return std::make_unique<EventReporter>(context, reporter_config, std::move(clients));
25+
}
26+
27+
std::string EventReporterFactory::name() const {
28+
return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_"
29+
"reporter";
30+
}
31+
32+
ProtobufTypes::MessagePtr EventReporterFactory::createEmptyConfigProto() {
33+
return std::make_unique<ConfigProto>();
34+
}
35+
36+
ReverseTunnelReporterClientPtr
37+
EventReporterFactory::createClient(Server::Configuration::ServerFactoryContext& context,
38+
const ClientConfigProto& client_config) {
39+
auto* factory =
40+
Config::Utility::getFactoryByName<ReverseTunnelReporterClientFactory>(client_config.name());
41+
if (!factory) {
42+
throw EnvoyException(
43+
fmt::format("Unknown Reporter Client Factory: '{}'. "
44+
"Make sure it is registered as a ReverseTunnelReporterClientFactory.",
45+
client_config.name()));
46+
}
47+
48+
auto typed_config = Config::Utility::translateAnyToFactoryConfig(
49+
client_config.typed_config(), context.messageValidationVisitor(), *factory);
50+
return factory->createClient(context, *typed_config);
51+
}
52+
53+
REGISTER_FACTORY(EventReporterFactory, ReverseTunnelReporterFactory);
54+
55+
} // namespace ReverseConnection
56+
} // namespace Bootstrap
57+
} // namespace Extensions
58+
} // namespace Envoy
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#pragma once
2+
3+
#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h"
4+
5+
#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h"
6+
#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.validate.h"
7+
#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h"
8+
#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h"
9+
10+
namespace Envoy {
11+
namespace Extensions {
12+
namespace Bootstrap {
13+
namespace ReverseConnection {
14+
15+
/// Factory that builds an EventReporter from its proto config, dynamically
16+
/// resolving each child ReverseTunnelReporterClient by name.
17+
class EventReporterFactory : public ReverseTunnelReporterFactory {
18+
public:
19+
ReverseTunnelReporterPtr createReporter(Server::Configuration::ServerFactoryContext& context,
20+
ProtobufTypes::MessagePtr config) override;
21+
std::string name() const override;
22+
ProtobufTypes::MessagePtr createEmptyConfigProto() override;
23+
24+
private:
25+
using ConfigProto =
26+
envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig;
27+
using ClientConfigProto = envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::
28+
ReverseConnectionReporterClient;
29+
30+
ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context,
31+
const ClientConfigProto& client_config);
32+
};
33+
34+
} // namespace ReverseConnection
35+
} // namespace Bootstrap
36+
} // namespace Extensions
37+
} // namespace Envoy
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h"
2+
3+
#include "source/common/protobuf/utility.h"
4+
5+
namespace Envoy {
6+
namespace Extensions {
7+
namespace Bootstrap {
8+
namespace ReverseConnection {
9+
10+
EventReporter::EventReporter(Server::Configuration::ServerFactoryContext& context,
11+
const ConfigProto& config,
12+
std::vector<ReverseTunnelReporterClientPtr>&& clients)
13+
: context_{context}, clients_{std::move(clients)},
14+
stats_(generateStats(
15+
PROTOBUF_GET_STRING_OR_DEFAULT(config, stat_prefix, "reverse_tunnel_reporter"),
16+
context.scope())) {
17+
ENVOY_LOG(info, "EventReporter: Constructed with {} clients", clients_.size());
18+
}
19+
20+
void EventReporter::onServerInitialized() {
21+
ENVOY_LOG(info, "EventReporter: Initialized");
22+
for (auto& client : clients_) {
23+
client->onServerInitialized(this);
24+
}
25+
}
26+
27+
void EventReporter::reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id,
28+
absl::string_view tenant_id) {
29+
auto ptr = std::make_shared<ReverseTunnelEvent::Connected>(
30+
ReverseTunnelEvent::Connected{std::string(node_id), std::string(cluster_id),
31+
std::string(tenant_id), Envoy::SystemTime::clock::now()});
32+
33+
context_.mainThreadDispatcher().post(
34+
[this, ptr = std::move(ptr)]() mutable { this->addConnection(std::move(ptr)); });
35+
}
36+
37+
void EventReporter::reportDisconnectionEvent(absl::string_view node_id,
38+
absl::string_view cluster_id) {
39+
std::string name = ReverseTunnelEvent::getName(node_id, cluster_id);
40+
auto ptr = std::make_shared<ReverseTunnelEvent::Disconnected>(
41+
ReverseTunnelEvent::Disconnected{std::move(name)});
42+
43+
context_.mainThreadDispatcher().post(
44+
[this, ptr = std::move(ptr)]() mutable { this->removeConnection(std::move(ptr)); });
45+
}
46+
47+
// This is only served on the main thread so no locks needed.
48+
ReverseTunnelEvent::SharedConnections EventReporter::getAllConnections() {
49+
ASSERT(context_.mainThreadDispatcher().isThreadSafe());
50+
stats_.reverse_tunnel_full_pulls_total_.inc();
51+
52+
ReverseTunnelEvent::SharedConnections all_connections;
53+
all_connections.reserve(connections_.size());
54+
55+
for (auto& [key, val] : connections_) {
56+
all_connections.push_back(val.connection);
57+
}
58+
return all_connections;
59+
}
60+
61+
EventReporterStats EventReporter::generateStats(const std::string& prefix, Stats::Scope& scope) {
62+
return EventReporterStats{ALL_EVENT_REPORTER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
63+
POOL_GAUGE_PREFIX(scope, prefix))};
64+
}
65+
66+
void EventReporter::notifyClients(ReverseTunnelEvent::BatchedEvents&& batch) {
67+
for (auto& client : clients_) {
68+
client->receiveEvents(batch);
69+
}
70+
}
71+
72+
void EventReporter::addConnection(std::shared_ptr<ReverseTunnelEvent::Connected>&& connection) {
73+
ASSERT(context_.mainThreadDispatcher().isThreadSafe());
74+
75+
ENVOY_LOG(info, "EventReporter: Accepted a new connection. Node: {}, Cluster: {}, Tenant: {}",
76+
connection->node_id, connection->cluster_id, connection->tenant_id);
77+
78+
std::string name = ReverseTunnelEvent::getName(connection->node_id, connection->cluster_id);
79+
auto [it, inserted] =
80+
connections_.try_emplace(std::move(name), ConnectionEntry{std::move(connection), 1});
81+
82+
if (inserted) {
83+
stats_.reverse_tunnel_unique_active_.inc();
84+
notifyClients(ReverseTunnelEvent::BatchedEvents{{it->second.connection}, {}});
85+
} else {
86+
// Multiple reverse tunnels can share the same name (same node).
87+
// We ref-count them and only notify clients of removal when the last one disconnects.
88+
it->second.count++;
89+
}
90+
91+
stats_.reverse_tunnel_established_total_.inc();
92+
stats_.reverse_tunnel_active_.inc();
93+
}
94+
95+
void EventReporter::removeConnection(
96+
std::shared_ptr<ReverseTunnelEvent::Disconnected>&& disconnection) {
97+
ASSERT(context_.mainThreadDispatcher().isThreadSafe());
98+
99+
const auto& name = disconnection->name;
100+
auto it = connections_.find(name);
101+
102+
ENVOY_LOG(info, "EventReporter: Removed connection. Name: {}", name);
103+
104+
if (it == connections_.end()) {
105+
ENVOY_LOG(warn, "EventReporter: Tried to remove a connection which doesnt exist");
106+
return;
107+
}
108+
109+
// Only notify removal on the last ref — see addConnection for the ref-count rationale.
110+
if (it->second.count == 1) {
111+
connections_.erase(it);
112+
stats_.reverse_tunnel_unique_active_.dec();
113+
notifyClients(ReverseTunnelEvent::BatchedEvents{{}, {disconnection}});
114+
} else {
115+
it->second.count--;
116+
}
117+
118+
stats_.reverse_tunnel_closed_total_.inc();
119+
stats_.reverse_tunnel_active_.dec();
120+
}
121+
122+
} // namespace ReverseConnection
123+
} // namespace Bootstrap
124+
} // namespace Extensions
125+
} // namespace Envoy

0 commit comments

Comments
 (0)