Skip to content

Commit af15b08

Browse files
committed
Event Reporter with interface for multiple clients
Signed-off-by: aakugan <aakashganapathy2@gmail.com>
1 parent 746c7ad commit af15b08

13 files changed

Lines changed: 1086 additions & 0 deletions

File tree

CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,5 +496,6 @@ extensions/upstreams/tcp @ggreenway @mattklein123
496496
/contrib/peak_ewma/load_balancing_policies/ @rroblak @UNOWNED
497497
/contrib/kae/ @Misakokoro @UNOWNED
498498
/contrib/istio @kyessenov @wbpcode @keithmattix @krinkinmu @zirain
499+
/contrib/reverse_tunnel_reporter @agrawroh @aakugan @basundhara-c
499500

500501
/compat/openssl/ @tedjpoole @envoyproxy/envoy-openssl-sync

contrib/contrib_build_config.bzl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,10 @@ CONTRIB_EXTENSIONS = {
118118
#
119119

120120
"envoy.upstreams.http.tcp.golang": "//contrib/golang/upstreams/http/tcp/source:config",
121+
122+
#
123+
# Reverse tunnel reporters
124+
#
125+
126+
"envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service": "//contrib/reverse_tunnel_reporter/source:config",
121127
}

contrib/extensions_metadata.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,3 +205,11 @@ envoy.load_balancing_policies.peak_ewma:
205205
status: alpha
206206
type_urls:
207207
- envoy.extensions.load_balancing_policies.peak_ewma.v3alpha.PeakEwma
208+
envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service:
209+
categories:
210+
- envoy.bootstrap
211+
security_posture: requires_trusted_downstream_and_upstream
212+
status: alpha
213+
type_urls:
214+
- envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters.EventReporterConfig
215+
- envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client.GrpcClientConfig
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load(
2+
"//bazel:envoy_build_system.bzl",
3+
"envoy_cc_contrib_extension",
4+
"envoy_cc_library",
5+
"envoy_contrib_package",
6+
)
7+
8+
licenses(["notice"]) # Apache 2
9+
10+
envoy_contrib_package()
11+
12+
envoy_cc_library(
13+
name = "reverse_tunnel_event_types",
14+
hdrs = [
15+
"reverse_tunnel_event_types.h",
16+
],
17+
deps = [
18+
"//envoy/common:pure_lib",
19+
"//envoy/common:time_interface",
20+
"//envoy/config:typed_config_interface",
21+
"//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib",
22+
"//envoy/server:factory_context_interface",
23+
"//source/common/common:fmt_lib",
24+
"//source/common/config:utility_lib",
25+
"//source/common/protobuf",
26+
"//source/common/protobuf:message_validator_lib",
27+
],
28+
)
29+
30+
envoy_cc_contrib_extension(
31+
name = "config",
32+
deps = [
33+
"//contrib/reverse_tunnel_reporter/source/reporters:reporters_lib",
34+
],
35+
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
load(
2+
"//bazel:envoy_build_system.bzl",
3+
"envoy_cc_library",
4+
"envoy_contrib_package",
5+
)
6+
7+
licenses(["notice"]) # Apache 2
8+
9+
envoy_contrib_package()
10+
11+
envoy_cc_library(
12+
name = "reporters_lib",
13+
deps = [
14+
"//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib",
15+
],
16+
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
load(
2+
"//bazel:envoy_build_system.bzl",
3+
"envoy_cc_library",
4+
"envoy_contrib_package",
5+
)
6+
7+
licenses(["notice"]) # Apache 2
8+
9+
envoy_contrib_package()
10+
11+
envoy_cc_library(
12+
name = "event_reporter_lib",
13+
srcs = [
14+
"factory.cc",
15+
"reporter.cc",
16+
],
17+
hdrs = [
18+
"factory.h",
19+
"reporter.h",
20+
],
21+
deps = [
22+
"//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types",
23+
"//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib",
24+
"//envoy/registry",
25+
"//source/common/common:logger_lib",
26+
"//source/common/config:utility_lib",
27+
"//source/common/protobuf:utility_lib",
28+
"@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto",
29+
],
30+
)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h"
2+
3+
#include "envoy/registry/registry.h"
4+
5+
#include "source/common/config/utility.h"
6+
#include "source/common/protobuf/utility.h"
7+
8+
namespace Envoy {
9+
namespace Extensions {
10+
namespace Bootstrap {
11+
namespace ReverseConnection {
12+
13+
ReverseTunnelReporterPtr
14+
EventReporterFactory::createReporter(Server::Configuration::ServerFactoryContext& context,
15+
ProtobufTypes::MessagePtr config) {
16+
const auto& reporter_config = MessageUtil::downcastAndValidate<const ConfigProto&>(
17+
*config, context.messageValidationVisitor());
18+
19+
std::vector<ReverseTunnelReporterClientPtr> clients;
20+
clients.reserve(reporter_config.clients().size());
21+
for (const auto& client_config : reporter_config.clients()) {
22+
clients.push_back(createClient(context, client_config));
23+
}
24+
return std::make_unique<EventReporter>(context, reporter_config, std::move(clients));
25+
}
26+
27+
std::string EventReporterFactory::name() const {
28+
return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_"
29+
"reporter";
30+
}
31+
32+
ProtobufTypes::MessagePtr EventReporterFactory::createEmptyConfigProto() {
33+
return std::make_unique<ConfigProto>();
34+
}
35+
36+
ReverseTunnelReporterClientPtr
37+
EventReporterFactory::createClient(Server::Configuration::ServerFactoryContext& context,
38+
const ClientConfigProto& client_config) {
39+
auto* factory =
40+
Config::Utility::getFactoryByName<ReverseTunnelReporterClientFactory>(client_config.name());
41+
if (!factory) {
42+
throw EnvoyException(
43+
fmt::format("Unknown Reporter Client Factory: '{}'. "
44+
"Make sure it is registered as a ReverseTunnelReporterClientFactory.",
45+
client_config.name()));
46+
}
47+
48+
auto typed_config = Config::Utility::translateAnyToFactoryConfig(
49+
client_config.typed_config(), context.messageValidationVisitor(), *factory);
50+
return factory->createClient(context, *typed_config);
51+
}
52+
53+
REGISTER_FACTORY(EventReporterFactory, ReverseTunnelReporterFactory);
54+
55+
} // namespace ReverseConnection
56+
} // namespace Bootstrap
57+
} // namespace Extensions
58+
} // namespace Envoy
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#pragma once
2+
3+
#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h"
4+
5+
#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h"
6+
#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.validate.h"
7+
#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h"
8+
#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h"
9+
10+
namespace Envoy {
11+
namespace Extensions {
12+
namespace Bootstrap {
13+
namespace ReverseConnection {
14+
15+
/// Factory that builds an EventReporter from its proto config, dynamically
16+
/// resolving each child ReverseTunnelReporterClient by name.
17+
class EventReporterFactory : public ReverseTunnelReporterFactory {
18+
public:
19+
ReverseTunnelReporterPtr createReporter(Server::Configuration::ServerFactoryContext& context,
20+
ProtobufTypes::MessagePtr config) override;
21+
std::string name() const override;
22+
ProtobufTypes::MessagePtr createEmptyConfigProto() override;
23+
24+
private:
25+
using ConfigProto =
26+
envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig;
27+
using ClientConfigProto = envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::
28+
ReverseConnectionReporterClient;
29+
30+
ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context,
31+
const ClientConfigProto& client_config);
32+
};
33+
34+
} // namespace ReverseConnection
35+
} // namespace Bootstrap
36+
} // namespace Extensions
37+
} // namespace Envoy
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h"
2+
3+
#include "source/common/protobuf/utility.h"
4+
5+
namespace Envoy {
6+
namespace Extensions {
7+
namespace Bootstrap {
8+
namespace ReverseConnection {
9+
10+
EventReporter::EventReporter(Server::Configuration::ServerFactoryContext& context,
11+
const ConfigProto& config,
12+
std::vector<ReverseTunnelReporterClientPtr>&& clients)
13+
: context_{context}, clients_{std::move(clients)},
14+
stats_(generateStats(
15+
PROTOBUF_GET_STRING_OR_DEFAULT(config, stat_prefix, "reverse_tunnel_reporter"),
16+
context.scope())) {
17+
ENVOY_LOG(info, "Constructed with {} clients", clients_.size());
18+
}
19+
20+
void EventReporter::onServerInitialized() {
21+
ENVOY_LOG(info, "Initialized");
22+
for (auto& client : clients_) {
23+
client->onServerInitialized(this);
24+
}
25+
}
26+
27+
void EventReporter::reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id,
28+
absl::string_view tenant_id) {
29+
auto ptr = std::make_shared<ReverseTunnelEvent::Connected>(
30+
ReverseTunnelEvent::Connected{std::string(node_id), std::string(cluster_id),
31+
std::string(tenant_id), Envoy::SystemTime::clock::now()});
32+
33+
context_.mainThreadDispatcher().post(
34+
[this, ptr = std::move(ptr)]() mutable { this->addConnection(std::move(ptr)); });
35+
}
36+
37+
void EventReporter::reportDisconnectionEvent(absl::string_view node_id, absl::string_view) {
38+
std::string name = ReverseTunnelEvent::getName(node_id);
39+
auto ptr = std::make_shared<ReverseTunnelEvent::Disconnected>(name);
40+
41+
context_.mainThreadDispatcher().post(
42+
[this, ptr = std::move(ptr)]() mutable { this->removeConnection(std::move(ptr)); });
43+
}
44+
45+
// This is only served on the main thread so no locks needed.
46+
ReverseTunnelEvent::ConnectionsList EventReporter::getAllConnections() {
47+
ASSERT(context_.mainThreadDispatcher().isThreadSafe());
48+
stats_.reverse_tunnel_full_pulls_total_.inc();
49+
50+
ReverseTunnelEvent::ConnectionsList all_connections;
51+
all_connections.reserve(connections_.size());
52+
53+
for (auto& [key, val] : connections_) {
54+
all_connections.push_back(val.connection);
55+
}
56+
return all_connections;
57+
}
58+
59+
EventReporterStats EventReporter::generateStats(const std::string& prefix, Stats::Scope& scope) {
60+
return EventReporterStats{ALL_EVENT_REPORTER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
61+
POOL_GAUGE_PREFIX(scope, prefix))};
62+
}
63+
64+
void EventReporter::notifyClients(ReverseTunnelEvent::TunnelUpdates&& updates) {
65+
ASSERT(clients_.size() > 0, "Need atleast one client. Enforced via the protos.");
66+
67+
for (size_t i = 0; i < clients_.size() - 1; i++) {
68+
clients_[i]->receiveEvents(updates);
69+
}
70+
71+
clients_.back()->receiveEvents(std::move(updates));
72+
}
73+
74+
void EventReporter::addConnection(std::shared_ptr<ReverseTunnelEvent::Connected>&& connection) {
75+
ASSERT(context_.mainThreadDispatcher().isThreadSafe());
76+
77+
ENVOY_LOG(info, "Accepted a new connection. Node: {}, Cluster: {}, Tenant: {}",
78+
connection->node_id, connection->cluster_id, connection->tenant_id);
79+
80+
std::string name = ReverseTunnelEvent::getName(connection->node_id);
81+
auto [it, inserted] = connections_.try_emplace(std::move(name), std::move(connection), 1);
82+
83+
if (inserted) {
84+
stats_.reverse_tunnel_unique_active_.inc();
85+
notifyClients(ReverseTunnelEvent::TunnelUpdates{{it->second.connection}, {}});
86+
} else {
87+
// Multiple reverse tunnels can share the same name (same node).
88+
// We ref-count them and only notify clients of removal when the last one disconnects.
89+
it->second.count++;
90+
}
91+
92+
stats_.reverse_tunnel_established_total_.inc();
93+
stats_.reverse_tunnel_active_.inc();
94+
}
95+
96+
void EventReporter::removeConnection(
97+
std::shared_ptr<ReverseTunnelEvent::Disconnected>&& disconnection) {
98+
ASSERT(context_.mainThreadDispatcher().isThreadSafe());
99+
100+
const auto& name = disconnection->name;
101+
auto it = connections_.find(name);
102+
103+
ENVOY_LOG(info, "Removed connection. Name: {}", name);
104+
105+
if (it == connections_.end()) {
106+
ENVOY_LOG(warn, "Tried to remove a connection which doesnt exist");
107+
return;
108+
}
109+
110+
// Only notify removal on the last ref — see addConnection for the ref-count rationale.
111+
if (it->second.count == 1) {
112+
connections_.erase(it);
113+
stats_.reverse_tunnel_unique_active_.dec();
114+
notifyClients(ReverseTunnelEvent::TunnelUpdates{{}, {disconnection}});
115+
} else {
116+
it->second.count--;
117+
}
118+
119+
stats_.reverse_tunnel_closed_total_.inc();
120+
stats_.reverse_tunnel_active_.dec();
121+
}
122+
123+
} // namespace ReverseConnection
124+
} // namespace Bootstrap
125+
} // namespace Extensions
126+
} // namespace Envoy

0 commit comments

Comments
 (0)