Skip to content

Commit d5a137c

Browse files
committed
gw: implement proxy protocol with server-side control
Add PROXY protocol support to the gateway with two server-side config options instead of client-controlled SNI suffixes: - inbound_pp_enabled: read PP headers from upstream load balancers - outbound_pp_enabled: send PP headers to backend apps The original PR#361 used a 'p' suffix in the SNI subdomain to toggle outbound PP per-connection. This is a security flaw: a client could connect to a PP-expecting port without sending PP headers, allowing source address spoofing. Both flags are now server-side config only.
1 parent a673ab7 commit d5a137c

10 files changed

Lines changed: 274 additions & 9 deletions

File tree

Cargo.lock

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ yaml-rust2 = "0.10.4"
247247
luks2 = "0.5.0"
248248
scopeguard = "1.2.0"
249249
tar = "0.4"
250+
proxy-protocol = "0.5.0"
250251

251252
[profile.release]
252253
panic = "abort"

gateway/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ hyper-rustls.workspace = true
5454
http-body-util.workspace = true
5555
x509-parser.workspace = true
5656
jemallocator.workspace = true
57+
proxy-protocol.workspace = true
5758
wavekv.workspace = true
5859
tdx-attest.workspace = true
5960
flate2.workspace = true

gateway/gateway.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ workers = 32
5858
external_port = 443
5959
# Maximum concurrent connections per app. 0 means unlimited.
6060
max_connections_per_app = 2000
61+
# Whether to read PROXY protocol from inbound connections (e.g. from Cloudflare).
62+
inbound_pp_enabled = false
63+
# Whether to send PROXY protocol headers to backend apps.
64+
outbound_pp_enabled = false
6165

6266
[core.proxy.timeouts]
6367
# Timeout for establishing a connection to the target app.
@@ -81,6 +85,8 @@ write = "5s"
8185
shutdown = "5s"
8286
# Timeout for total connection duration.
8387
total = "5h"
88+
# Timeout for proxy protocol header.
89+
pp_header = "5s"
8490

8591
[core.recycle]
8692
enabled = true

gateway/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,14 @@ pub struct ProxyConfig {
117117
pub app_address_ns_compat: bool,
118118
/// Maximum concurrent connections per app. 0 means unlimited.
119119
pub max_connections_per_app: u64,
120+
/// Whether to read PROXY protocol headers from inbound connections
121+
/// (e.g. when behind a PP-aware load balancer like Cloudflare).
122+
#[serde(default)]
123+
pub inbound_pp_enabled: bool,
124+
/// Whether to send PROXY protocol headers on outbound connections to backend apps.
125+
/// This is a server-side setting; it must NOT be controlled by client input (e.g. SNI).
126+
#[serde(default)]
127+
pub outbound_pp_enabled: bool,
120128
}
121129

122130
#[derive(Debug, Clone, Deserialize)]
@@ -142,6 +150,9 @@ pub struct Timeouts {
142150
pub write: Duration,
143151
#[serde(with = "serde_duration")]
144152
pub shutdown: Duration,
153+
/// Timeout for reading the proxy protocol header from inbound connections.
154+
#[serde(with = "serde_duration")]
155+
pub pp_header: Duration,
145156
}
146157

147158
#[derive(Debug, Clone, Deserialize, Serialize)]

gateway/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ mod distributed_certbot;
3232
mod kv;
3333
mod main_service;
3434
mod models;
35+
mod pp;
3536
mod proxy;
3637
mod web_routes;
3738

gateway/src/pp.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// SPDX-FileCopyrightText: © 2024-2025 Phala Network <dstack@phala.network>
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
use std::net::SocketAddr;
6+
7+
use anyhow::{bail, Context, Result};
8+
use proxy_protocol::{version1 as v1, version2 as v2, ProxyHeader};
9+
use tokio::{
10+
io::{AsyncRead, AsyncReadExt},
11+
net::TcpStream,
12+
};
13+
14+
use crate::config::ProxyConfig;
15+
16+
const V1_PROTOCOL_PREFIX: &str = "PROXY";
17+
const V1_PREFIX_LEN: usize = 5;
18+
const V1_MAX_LENGTH: usize = 107;
19+
const V1_TERMINATOR: &[u8] = b"\r\n";
20+
21+
const V2_PROTOCOL_PREFIX: &[u8] = b"\r\n\r\n\0\r\nQUIT\n";
22+
const V2_PREFIX_LEN: usize = 12;
23+
const V2_MINIMUM_LEN: usize = 16;
24+
const V2_LENGTH_INDEX: usize = 14;
25+
const READ_BUFFER_LEN: usize = 512;
26+
const V2_MAX_LENGTH: usize = 2048;
27+
28+
/// Read or synthesize the inbound proxy protocol header.
29+
///
30+
/// When `inbound_pp_enabled` is true, reads a PP header from the stream (e.g. from an upstream
31+
/// load balancer). When false, synthesizes one from the TCP peer address.
32+
pub(crate) async fn get_inbound_pp_header(
33+
inbound: TcpStream,
34+
config: &ProxyConfig,
35+
) -> Result<(TcpStream, ProxyHeader)> {
36+
if config.inbound_pp_enabled {
37+
read_proxy_header(inbound).await
38+
} else {
39+
let header = create_inbound_pp_header(&inbound);
40+
Ok((inbound, header))
41+
}
42+
}
43+
44+
pub struct DisplayAddr<'a>(pub &'a ProxyHeader);
45+
46+
impl std::fmt::Display for DisplayAddr<'_> {
47+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48+
match self.0 {
49+
ProxyHeader::Version2 { addresses, .. } => match addresses {
50+
v2::ProxyAddresses::Ipv4 { source, .. } => write!(f, "{}", source),
51+
v2::ProxyAddresses::Ipv6 { source, .. } => write!(f, "{}", source),
52+
v2::ProxyAddresses::Unix { .. } => write!(f, "<unix>"),
53+
v2::ProxyAddresses::Unspec => write!(f, "<unspec>"),
54+
},
55+
ProxyHeader::Version1 { addresses, .. } => match addresses {
56+
v1::ProxyAddresses::Ipv4 { source, .. } => write!(f, "{}", source),
57+
v1::ProxyAddresses::Ipv6 { source, .. } => write!(f, "{}", source),
58+
v1::ProxyAddresses::Unknown => write!(f, "<unknown>"),
59+
},
60+
_ => write!(f, "<unknown ver>"),
61+
}
62+
}
63+
}
64+
65+
fn create_inbound_pp_header(inbound: &TcpStream) -> ProxyHeader {
66+
let peer_addr = inbound.peer_addr().ok();
67+
let local_addr = inbound.local_addr().ok();
68+
69+
match (peer_addr, local_addr) {
70+
(Some(SocketAddr::V4(source)), Some(SocketAddr::V4(destination))) => {
71+
ProxyHeader::Version2 {
72+
command: v2::ProxyCommand::Proxy,
73+
transport_protocol: v2::ProxyTransportProtocol::Stream,
74+
addresses: v2::ProxyAddresses::Ipv4 {
75+
source,
76+
destination,
77+
},
78+
}
79+
}
80+
(Some(SocketAddr::V6(source)), Some(SocketAddr::V6(destination))) => {
81+
ProxyHeader::Version2 {
82+
command: v2::ProxyCommand::Proxy,
83+
transport_protocol: v2::ProxyTransportProtocol::Stream,
84+
addresses: v2::ProxyAddresses::Ipv6 {
85+
source,
86+
destination,
87+
},
88+
}
89+
}
90+
_ => ProxyHeader::Version2 {
91+
command: v2::ProxyCommand::Proxy,
92+
transport_protocol: v2::ProxyTransportProtocol::Stream,
93+
addresses: v2::ProxyAddresses::Unspec,
94+
},
95+
}
96+
}
97+
98+
async fn read_proxy_header<I>(mut stream: I) -> Result<(I, ProxyHeader)>
99+
where
100+
I: AsyncRead + Unpin,
101+
{
102+
let mut buffer = [0; READ_BUFFER_LEN];
103+
let mut dynamic_buffer = None;
104+
105+
stream.read_exact(&mut buffer[..V1_PREFIX_LEN]).await?;
106+
107+
if &buffer[..V1_PREFIX_LEN] == V1_PROTOCOL_PREFIX.as_bytes() {
108+
read_v1_header(&mut stream, &mut buffer).await?;
109+
} else {
110+
stream
111+
.read_exact(&mut buffer[V1_PREFIX_LEN..V2_MINIMUM_LEN])
112+
.await?;
113+
if &buffer[..V2_PREFIX_LEN] == V2_PROTOCOL_PREFIX {
114+
dynamic_buffer = read_v2_header(&mut stream, &mut buffer).await?;
115+
} else {
116+
bail!("no valid proxy protocol header detected");
117+
}
118+
}
119+
120+
let mut buffer = dynamic_buffer.as_deref().unwrap_or(&buffer[..]);
121+
122+
let header =
123+
proxy_protocol::parse(&mut buffer).context("failed to parse proxy protocol header")?;
124+
Ok((stream, header))
125+
}
126+
127+
async fn read_v2_header<I>(
128+
mut stream: I,
129+
buffer: &mut [u8; READ_BUFFER_LEN],
130+
) -> Result<Option<Vec<u8>>>
131+
where
132+
I: AsyncRead + Unpin,
133+
{
134+
let length =
135+
u16::from_be_bytes([buffer[V2_LENGTH_INDEX], buffer[V2_LENGTH_INDEX + 1]]) as usize;
136+
let full_length = V2_MINIMUM_LEN + length;
137+
138+
if full_length > V2_MAX_LENGTH {
139+
bail!("v2 proxy protocol header is too long");
140+
}
141+
142+
if full_length > READ_BUFFER_LEN {
143+
let mut dynamic_buffer = Vec::with_capacity(full_length);
144+
dynamic_buffer.extend_from_slice(&buffer[..V2_MINIMUM_LEN]);
145+
dynamic_buffer.resize(full_length, 0);
146+
stream
147+
.read_exact(&mut dynamic_buffer[V2_MINIMUM_LEN..full_length])
148+
.await?;
149+
150+
Ok(Some(dynamic_buffer))
151+
} else {
152+
stream
153+
.read_exact(&mut buffer[V2_MINIMUM_LEN..full_length])
154+
.await?;
155+
156+
Ok(None)
157+
}
158+
}
159+
160+
async fn read_v1_header<I>(mut stream: I, buffer: &mut [u8; READ_BUFFER_LEN]) -> Result<()>
161+
where
162+
I: AsyncRead + Unpin,
163+
{
164+
let mut end_found = false;
165+
for i in V1_PREFIX_LEN..V1_MAX_LENGTH {
166+
buffer[i] = stream.read_u8().await?;
167+
168+
if [buffer[i - 1], buffer[i]] == V1_TERMINATOR {
169+
end_found = true;
170+
break;
171+
}
172+
}
173+
if !end_found {
174+
bail!("no valid proxy protocol header detected");
175+
}
176+
177+
Ok(())
178+
}

gateway/src/proxy.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ use tokio::{
2323
};
2424
use tracing::{debug, error, info, info_span, Instrument};
2525

26-
use crate::{config::ProxyConfig, main_service::Proxy, models::EnteredCounter};
26+
use crate::{
27+
config::ProxyConfig,
28+
main_service::Proxy,
29+
models::EnteredCounter,
30+
pp::{get_inbound_pp_header, DisplayAddr},
31+
};
2732

2833
#[derive(Debug, Clone)]
2934
pub(crate) struct AddressInfo {
@@ -123,8 +128,16 @@ fn parse_dst_info(subdomain: &str) -> Result<DstInfo> {
123128

124129
pub static NUM_CONNECTIONS: AtomicU64 = AtomicU64::new(0);
125130

126-
async fn handle_connection(mut inbound: TcpStream, state: Proxy) -> Result<()> {
131+
async fn handle_connection(inbound: TcpStream, state: Proxy) -> Result<()> {
127132
let timeouts = &state.config.proxy.timeouts;
133+
134+
let pp_fut = get_inbound_pp_header(inbound, &state.config.proxy);
135+
let (mut inbound, pp_header) = timeout(timeouts.pp_header, pp_fut)
136+
.await
137+
.context("proxy protocol header timeout")?
138+
.context("failed to read proxy protocol header")?;
139+
info!("client address: {}", DisplayAddr(&pp_header));
140+
128141
let (sni, buffer) = timeout(timeouts.handshake, take_sni(&mut inbound))
129142
.await
130143
.context("take sni timeout")?
@@ -138,14 +151,15 @@ async fn handle_connection(mut inbound: TcpStream, state: Proxy) -> Result<()> {
138151
let dst = parse_dst_info(subdomain)?;
139152
debug!("dst: {dst:?}");
140153
if dst.is_tls {
141-
tls_passthough::proxy_to_app(state, inbound, buffer, &dst.app_id, dst.port).await
154+
tls_passthough::proxy_to_app(state, inbound, pp_header, buffer, &dst.app_id, dst.port)
155+
.await
142156
} else {
143157
state
144-
.proxy(inbound, buffer, &dst.app_id, dst.port, dst.is_h2)
158+
.proxy(inbound, pp_header, buffer, &dst.app_id, dst.port, dst.is_h2)
145159
.await
146160
}
147161
} else {
148-
tls_passthough::proxy_with_sni(state, inbound, buffer, &sni).await
162+
tls_passthough::proxy_with_sni(state, inbound, pp_header, buffer, &sni).await
149163
}
150164
}
151165

0 commit comments

Comments
 (0)