Skip to content

Commit ba507e6

Browse files
committed
feat(datalayer): add metricsPort parameter to metrics-data-source plugin
Adds MetricsPort to metricsDatasourceParams so the metrics-data-source plugin can scrape a port different from the inference port. When metricsPort is non-zero in EndpointPickerConfig plugin parameters, it overrides the port derived from the endpoint's MetricsHost; zero (default) preserves existing behavior. This restores functionality that was available via --model-server-metrics-port but lost when that flag was removed in #2441 without an equivalent plugin config parameter.
1 parent d6bd165 commit ba507e6

5 files changed

Lines changed: 165 additions & 21 deletions

File tree

pkg/epp/framework/plugins/datalayer/source/http/datasource.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,31 @@ import (
2121
"crypto/tls"
2222
"fmt"
2323
"io"
24+
"net"
2425
"net/url"
2526
"reflect"
27+
"strconv"
2628

2729
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
2830
fwkplugin "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
2931
)
3032

3133
// HTTPDataSource is a data source that receives its data using HTTP client.
3234
type HTTPDataSource struct {
33-
typedName fwkplugin.TypedName
34-
scheme string // scheme to use
35-
path string // path to use
35+
typedName fwkplugin.TypedName
36+
scheme string // scheme to use
37+
path string // path to use
38+
metricsPort int // when non-zero, overrides the port in MetricsHost for scraping
3639

3740
client Client // client (e.g. a wrapped http.Client) used to get data
3841
parser func(io.Reader) (any, error)
3942
outputType reflect.Type
4043
}
4144

42-
// NewHTTPDataSource returns a new data source, configured with
43-
// the provided scheme, path and certificate verification parameters.
44-
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pluginType string,
45+
// NewHTTPDataSource returns a new data source configured with the given scheme, path,
46+
// and certificate verification. metricsPort overrides the port in MetricsHost when
47+
// non-zero; pass 0 to use MetricsHost as-is.
48+
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, metricsPort int, pluginType string,
4549
pluginName string, parser func(io.Reader) (any, error), outputType reflect.Type) (*HTTPDataSource, error) {
4650
if scheme != "http" && scheme != "https" {
4751
return nil, fmt.Errorf("unsupported scheme: %s", scheme)
@@ -59,11 +63,12 @@ func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pl
5963
Type: pluginType,
6064
Name: pluginName,
6165
},
62-
scheme: scheme,
63-
path: path,
64-
client: defaultClient,
65-
parser: parser,
66-
outputType: outputType,
66+
scheme: scheme,
67+
path: path,
68+
metricsPort: metricsPort,
69+
client: defaultClient,
70+
parser: parser,
71+
outputType: outputType,
6772
}
6873
return dataSrc, nil
6974
}
@@ -90,9 +95,18 @@ func (dataSrc *HTTPDataSource) Poll(ctx context.Context, ep fwkdl.Endpoint) (any
9095
}
9196

9297
func (dataSrc *HTTPDataSource) getEndpoint(ep Addressable) *url.URL {
98+
host := ep.GetMetricsHost()
99+
if dataSrc.metricsPort != 0 {
100+
ip, _, err := net.SplitHostPort(host)
101+
if err == nil {
102+
host = net.JoinHostPort(ip, strconv.Itoa(dataSrc.metricsPort))
103+
}
104+
// If SplitHostPort fails (e.g. host has no port), use MetricsHost unchanged
105+
// so we still attempt a scrape rather than silently dropping the endpoint.
106+
}
93107
return &url.URL{
94108
Scheme: dataSrc.scheme,
95-
Host: ep.GetMetricsHost(),
109+
Host: host,
96110
Path: dataSrc.path,
97111
}
98112
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package http
18+
19+
import (
20+
"io"
21+
"reflect"
22+
"testing"
23+
24+
"github.com/google/go-cmp/cmp"
25+
"k8s.io/apimachinery/pkg/types"
26+
)
27+
28+
// fakeAddressable is a test double for the Addressable interface.
29+
type fakeAddressable struct {
30+
metricsHost string
31+
}
32+
33+
func (f *fakeAddressable) GetIPAddress() string { return "" }
34+
func (f *fakeAddressable) GetPort() string { return "" }
35+
func (f *fakeAddressable) GetMetricsHost() string { return f.metricsHost }
36+
func (f *fakeAddressable) GetNamespacedName() types.NamespacedName { return types.NamespacedName{Name: "pod", Namespace: "test"} }
37+
38+
func noopParser(r io.Reader) (any, error) { return nil, nil }
39+
40+
func TestGetEndpoint(t *testing.T) {
41+
tests := []struct {
42+
name string
43+
metricsHost string
44+
metricsPort int
45+
wantHost string
46+
}{
47+
{
48+
name: "metricsPort=0 preserves MetricsHost unchanged",
49+
metricsHost: "1.2.3.4:8000",
50+
metricsPort: 0,
51+
wantHost: "1.2.3.4:8000",
52+
},
53+
{
54+
name: "metricsPort overrides port in MetricsHost",
55+
metricsHost: "1.2.3.4:8000",
56+
metricsPort: 9090,
57+
wantHost: "1.2.3.4:9090",
58+
},
59+
{
60+
name: "metricsPort with IPv6 address",
61+
metricsHost: "[::1]:8000",
62+
metricsPort: 9090,
63+
wantHost: "[::1]:9090",
64+
},
65+
{
66+
name: "metricsPort with IPv6 address, no override when port=0",
67+
metricsHost: "[::1]:8000",
68+
metricsPort: 0,
69+
wantHost: "[::1]:8000",
70+
},
71+
{
72+
name: "malformed host falls back to original MetricsHost",
73+
metricsHost: "not-a-host-with-port",
74+
metricsPort: 9090,
75+
wantHost: "not-a-host-with-port",
76+
},
77+
}
78+
79+
for _, tc := range tests {
80+
t.Run(tc.name, func(t *testing.T) {
81+
ds, err := NewHTTPDataSource("http", "/metrics", false, tc.metricsPort,
82+
"test-type", "test-name", noopParser, reflect.TypeOf(""))
83+
if err != nil {
84+
t.Fatalf("NewHTTPDataSource() error = %v", err)
85+
}
86+
87+
got := ds.getEndpoint(&fakeAddressable{metricsHost: tc.metricsHost})
88+
89+
if diff := cmp.Diff(tc.wantHost, got.Host); diff != "" {
90+
t.Errorf("getEndpoint() host mismatch (-want +got):\n%s", diff)
91+
}
92+
})
93+
}
94+
}

pkg/epp/framework/plugins/datalayer/source/metrics/datasource.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,19 @@ type metricsDatasourceParams struct {
4848
Path string `json:"path"`
4949
// InsecureSkipVerify defines whether model server certificate should be verified or not.
5050
InsecureSkipVerify bool `json:"insecureSkipVerify"`
51+
// MetricsPort defines the port to use for scraping metrics from model server pods.
52+
// When set, this overrides the inference port encoded in the endpoint's MetricsHost.
53+
// Useful when the model server exposes metrics on a separate port from inference
54+
// (e.g., vLLM with --metrics-port 9090).
55+
// Defaults to 0, which means the inference port is used.
56+
MetricsPort int `json:"metricsPort"`
5157
}
5258

5359
// NewHTTPMetricsDataSource constructs a MetricsDataSource with the given scheme and path.
5460
// InsecureSkipVerify defaults to true (matching the factory default).
5561
// Use this function directly in tests to bypass JSON parameter marshaling.
5662
func NewHTTPMetricsDataSource(scheme, path, name string) (*http.HTTPDataSource, error) {
57-
return http.NewHTTPDataSource(scheme, path, defaultMetricsInsecureSkipVerify,
63+
return http.NewHTTPDataSource(scheme, path, defaultMetricsInsecureSkipVerify, 0,
5864
MetricsDataSourceType, name, parseMetrics, PrometheusMetricType)
5965
}
6066

@@ -72,7 +78,11 @@ func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle fw
7278
}
7379
}
7480

75-
return http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, MetricsDataSourceType,
81+
if cfg.MetricsPort != 0 && (cfg.MetricsPort < 1 || cfg.MetricsPort > 65535) {
82+
return nil, fmt.Errorf("metricsPort must be between 1 and 65535, got %d", cfg.MetricsPort)
83+
}
84+
85+
return http.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, cfg.MetricsPort, MetricsDataSourceType,
7686
name, parseMetrics, PrometheusMetricType)
7787
}
7888

pkg/epp/framework/plugins/datalayer/source/metrics/datasource_test.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,23 @@ package metrics
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"testing"
2223

2324
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
2426
"k8s.io/apimachinery/pkg/types"
2527

2628
fwkdl "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/datalayer"
2729
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/plugins/datalayer/source/http"
2830
)
2931

3032
func TestDatasource(t *testing.T) {
31-
_, err := http.NewHTTPDataSource("invalid", "/metrics", true, MetricsDataSourceType,
33+
_, err := http.NewHTTPDataSource("invalid", "/metrics", true, 0, MetricsDataSourceType,
3234
"metrics-data-source", parseMetrics, PrometheusMetricType)
3335
assert.NotNil(t, err, "expected to fail with invalid scheme")
3436

35-
source, err := http.NewHTTPDataSource("https", "/metrics", true, MetricsDataSourceType,
37+
source, err := http.NewHTTPDataSource("https", "/metrics", true, 0, MetricsDataSourceType,
3638
"metrics-data-source", parseMetrics, PrometheusMetricType)
3739
assert.Nil(t, err, "failed to create HTTP datasource")
3840

@@ -50,3 +52,27 @@ func TestDatasource(t *testing.T) {
5052
_, err = source.Poll(ctx, endpoint)
5153
assert.NotNil(t, err, "expected to fail polling for metrics")
5254
}
55+
56+
func TestMetricsDataSourceFactory_MetricsPortOverride(t *testing.T) {
57+
params, err := json.Marshal(map[string]any{
58+
"scheme": "http",
59+
"metricsPort": 9090,
60+
})
61+
require.NoError(t, err)
62+
63+
plugin, err := MetricsDataSourceFactory("test-ds", params, nil)
64+
require.NoError(t, err)
65+
66+
ds, ok := plugin.(fwkdl.PollingDataSource)
67+
require.True(t, ok, "expected MetricsDataSourceFactory to return a PollingDataSource")
68+
69+
// Poll will fail (no real server), but the error must reference port 9090.
70+
// If metricsPort were ignored, EPP would dial :8000 instead.
71+
endpoint := fwkdl.NewEndpoint(&fwkdl.EndpointMetadata{
72+
NamespacedName: types.NamespacedName{Name: "pod1", Namespace: "default"},
73+
MetricsHost: "1.2.3.4:8000",
74+
}, nil)
75+
_, err = ds.Poll(context.Background(), endpoint)
76+
assert.Error(t, err)
77+
assert.Contains(t, err.Error(), "9090", "expected scrape target to use metricsPort 9090, not inference port 8000")
78+
}

test/integration/epp/runtime_polling_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestRuntimePollingDispatch(t *testing.T) {
8686
r := datalayer.NewRuntime(pollingInterval)
8787
ext := mocks.NewPollingExtractor("test-extractor")
8888

89-
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
89+
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
9090
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
9191
require.NoError(t, err)
9292

@@ -138,7 +138,7 @@ func TestRuntimePollingMultipleExtractors(t *testing.T) {
138138
ext1 := mocks.NewPollingExtractor("extractor-1")
139139
ext2 := mocks.NewPollingExtractor("extractor-2")
140140

141-
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
141+
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
142142
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
143143
require.NoError(t, err)
144144

@@ -187,7 +187,7 @@ func TestRuntimePollingEndpointLifecycle(t *testing.T) {
187187
r := datalayer.NewRuntime(pollingInterval)
188188
ext := mocks.NewPollingExtractor("lifecycle-extractor")
189189

190-
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
190+
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
191191
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
192192
require.NoError(t, err)
193193

@@ -241,7 +241,7 @@ func TestRuntimePollingWithoutExtractors(t *testing.T) {
241241

242242
r := datalayer.NewRuntime(50 * time.Millisecond)
243243

244-
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
244+
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
245245
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
246246
require.NoError(t, err)
247247

@@ -280,7 +280,7 @@ func TestRuntimePollingHTTPError(t *testing.T) {
280280
r := datalayer.NewRuntime(pollingInterval)
281281
ext := mocks.NewPollingExtractor("error-extractor")
282282

283-
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, "test-http", "test-source",
283+
httpSrc, err := httpds.NewHTTPDataSource("http", "/metrics", true, 0, "test-http", "test-source",
284284
parsePrometheusMetrics, reflect.TypeOf(fwkdl.Metrics{}))
285285
require.NoError(t, err)
286286

0 commit comments

Comments
 (0)