diff --git a/.gitignore b/.gitignore index 5b57e2dd..5e46985a 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,10 @@ docs/public docs/resources/_gen/ docs/.hugo_build.lock test/integration/**/clab-* + + +# Only for development and testing purposes +# To be removed after development of targetsource +# ignored in order to not add unnecassary logging messages +lab/dev/resources/targetsources +.scannerwork/ \ No newline at end of file diff --git a/api/v1alpha1/targetsource_types.go b/api/v1alpha1/targetsource_types.go index 26a106f5..27c28e14 100644 --- a/api/v1alpha1/targetsource_types.go +++ b/api/v1alpha1/targetsource_types.go @@ -58,18 +58,64 @@ type HTTPConfig struct { // +kubebuilder:validation:Optional URL string `json:"url,omitempty"` + // HTTP method used for the request. + // + // Defaults to GET if not specified. + // + // Supported values: + // - GET (default, no request body) + // - POST (supports request body) + // + // +kubebuilder:validation:Enum=GET;POST + // +kubebuilder:default="GET" + // +kubebuilder:validation:Optional + Method string `json:"method,omitempty"` + + // Optional HTTP headers to include in the request. + // + // These map directly to HTTP headers (key-value pairs). + // + // Example: + // headers: + // Content-Type: application/json + // X-Custom-Header: value + // + // Precedence: + // - Authentication configuration overrides any conflicting headers e.g. Authorization + // + // +kubebuilder:validation:Optional + Headers map[string]string `json:"headers,omitempty"` + + // Optional raw request body. + // + // Typically used with POST requests and contains JSON payload. + // + // Example: + // body: | + // { + // "limit": 100, + // "status": "active" + // } + // + // Notes: + // - Ignored for GET requests + // - User must set appropriate Content-Type header if needed + // + // +kubebuilder:validation:Optional + Body string `json:"body,omitempty"` + // Optional authentication configuration for accessing the HTTP endpoint // +kubebuilder:validation:Optional Authentication *AuthenticationSpec `json:"authentication,omitempty"` // Optional interval for polling the HTTP endpoint for targets // TODO: document about default value - // +kubebuilder:default="6h" + // +kubebuilder:default="30m" // +kubebuilder:validation:Optional Interval *metav1.Duration `json:"interval,omitempty"` // Optional timeout for HTTP requests to the endpoint - // +kubebuilder:default="10s" + // +kubebuilder:default="30s" // +kubebuilder:validation:Optional Timeout *metav1.Duration `json:"timeout,omitempty"` @@ -132,51 +178,186 @@ type TokenAuthSpec struct { TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` } -// PaginationSpec defines the configuration for paginating through responses from providers +// PaginationSpec defines how pagination is handled for HTTP APIs. +// +// The pagination mechanism is fully server-driven. The loader will repeatedly: +// 1. Extract the "next" reference from the response +// 2. Use it to construct the next request +// 3. Continue until no next reference is returned +// +// Supported pagination styles: +// 1. Cursor-based: +// - Response returns a token (e.g. "next_page_token") +// - Client sends it back via a query parameter (e.g. "page_token") +// 2. URL-based (nextLink): +// - Response returns a full URL +// - Client follows it directly without modification +// 3. Expression-based extraction: +// - The next reference is extracted using a CEL expression +// - This allows access to nested fields or special keys +// (e.g. "@odata.nextLink") +// +// Behavior: +// - If the extracted value is a full URL, it will be used as-is +// - Otherwise, it is treated as a token and appended using RequestParam +// - The token is treated as opaque and must not be interpreted +// +// Example: +// +// pagination: +// nextField: "self.next_page_token" +// requestParam: "page_token" +// +// pagination: +// nextField: "self['@odata.nextLink']" type PaginationSpec struct { - // Field name in the JSON response that contains the next page reference. - // The value can be either: - // - a full URL (used directly for the next request), or - // - a pagination token (appended as a query parameter using this field name as the key). + // CEL expression used to extract the next page reference from the response. + // + // The expression is evaluated with: + // self -> full JSON response // - // Must refer to a top-level key in the response object. - // Example: "next" or "nextToken" + // It must evaluate to either: + // - string (full URL OR token), or + // - null (indicates end of pagination) + // + // Examples: + // "self.next" + // "self.next_page_token" + // "self['@odata.nextLink']" + // + // +kubebuilder:validation:Optional NextField string `json:"nextField,omitempty"` + + // Query parameter name used when the extracted value is a token. + // + // Required for token-based pagination. + // Ignored when NextField resolves to a full URL. + // + // Example: + // requestParam: "page_token" + // + // +kubebuilder:validation:Optional + RequestParam string `json:"requestParam,omitempty"` } -// CEL expressions to extract target fields from the response -// and map them to the corresponding Target fields. +// ResponseMappingSpec controls how targets are extracted from an HTTP JSON response. +// +// This allows you to map fields from a JSON API into targets using either: +// - simple direct field access (e.g. item["name"]) +// - or CEL expressions for more advanced logic +// +// General behavior: +// +// 1. Selecting targets: +// - `targetsField` is a CEL expression that selects the list of targets +// - It runs once on the full response (`self`) and MUST return a list +// - If not set, the response itself must be a JSON array +// +// 2. Extracting fields: +// - Each field (name, address, port, labels, etc.) is handled independently +// - If a CEL expression is provided → it is evaluated +// - If not provided → the value is read directly from the target object +// +// 3. Available variables in CEL: +// - item -> the current target object +// - self -> the full HTTP response JSON +// +// Example: +// +// Response: +// { +// "results": [ +// { "name": "device1", "ip": "10.0.0.1", "env": "prod" } +// ], +// "meta": { "region": "eu-west" } +// } +// +// Mapping: +// targetsField: "self.results" +// +// name: "" # direct → item["name"] +// address: "item.ip" # CEL +// +// labels: +// env: "item.env" +// region: "self.meta.region" type ResponseMappingSpec struct { - // Field name in the JSON response that contains the list of items (targets). - // If not specified, the entire response is expected to be a list of items. - // All subsequent fields are specified relative to this field - // Example: "results" if the response is of the form {"results": [ ... list of items ... ]} + // CEL expression that selects the list of target objects from the response. + // + // This is evaluated once using: + // self -> full JSON response + // + // Example: + // targetsField: "self.results" + // + // If not set, the response itself must be a JSON array with the targets. + // // +kubebuilder:validation:Optional TargetsField string `json:"targetsField,omitempty"` - // CEL expression to extract the target name from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target name. + // + // If not set, defaults to: + // item["name"] + // + // Example: + // "item.hostname" + // // +kubebuilder:validation:Optional - Name string `json:"name"` + Name string `json:"name,omitempty"` - // CEL expression to extract the target Address from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target address. + // + // If not set, defaults to: + // item["address"] + // + // Example: + // "item.ip" + // // +kubebuilder:validation:Optional - Address string `json:"address"` + Address string `json:"address,omitempty"` - // CEL expression to extract the target port from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target port. + // + // If not set, defaults to: + // item["port"] + // + // Example: + // "item.port" + // // +kubebuilder:validation:Optional Port string `json:"port,omitempty"` - // CEL expression to extract the target labels from the response + // CEL expression that returns a map of labels. + // The expression must evaluate to an object (map). + // + // Example: + // + // labels: | + // { + // "env": item.environment, + // "region": self.meta.region, + // item.dynamicKey: "value" + // } + // + // If not set, defaults to: + // item["labels"] + // + // The resulting map will be converted into labels. // The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, // with values from the response taking precedence in case of conflicts. + // // +kubebuilder:validation:Optional - Labels map[string]string `json:"labels,omitempty"` + Labels string `json:"labels,omitempty"` - // CEL expression to extract the target profile from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target profile. + // + // If not set, defaults to: + // item["targetProfile"] + // + // Example: + // "item.type == 'edge' ? 'edge-profile' : 'default'" + // // +kubebuilder:validation:Optional TargetProfile string `json:"targetProfile,omitempty"` } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 201a35da..55495a30 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -323,6 +323,13 @@ func (in *GRPCTunnelConfig) DeepCopy() *GRPCTunnelConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.Authentication != nil { in, out := &in.Authentication, &out.Authentication *out = new(AuthenticationSpec) @@ -351,7 +358,7 @@ func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { if in.ResponseMapping != nil { in, out := &in.ResponseMapping, &out.ResponseMapping *out = new(ResponseMappingSpec) - (*in).DeepCopyInto(*out) + **out = **in } if in.Push != nil { in, out := &in.Push, &out.Push @@ -1026,13 +1033,6 @@ func (in *PushSpec) DeepCopy() *PushSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResponseMappingSpec) DeepCopyInto(out *ResponseMappingSpec) { *out = *in - if in.Labels != nil { - in, out := &in.Labels, &out.Labels - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResponseMappingSpec. diff --git a/config/crd/bases/operator.gnmic.dev_targetsources.yaml b/config/crd/bases/operator.gnmic.dev_targetsources.yaml index 4ecef754..f9e32489 100644 --- a/config/crd/bases/operator.gnmic.dev_targetsources.yaml +++ b/config/crd/bases/operator.gnmic.dev_targetsources.yaml @@ -127,8 +127,41 @@ spec: be set rule: '[has(self.basic),has(self.token)].filter(x,x==true).size() == 1' + body: + description: |- + Optional raw request body. + + Typically used with POST requests and contains JSON payload. + + Example: + body: | + { + "limit": 100, + "status": "active" + } + + Notes: + - Ignored for GET requests + - User must set appropriate Content-Type header if needed + type: string + headers: + additionalProperties: + type: string + description: |- + Optional HTTP headers to include in the request. + + These map directly to HTTP headers (key-value pairs). + + Example: + headers: + Content-Type: application/json + X-Custom-Header: value + + Precedence: + - Authentication configuration overrides any conflicting headers e.g. Authorization + type: object interval: - default: 6h + default: 30m description: Optional interval for polling the HTTP endpoint for targets type: string @@ -138,53 +171,121 @@ spec: properties: address: description: |- - CEL expression to extract the target Address from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target address. + + If not set, defaults to: + item["address"] + + Example: + "item.ip" type: string labels: - additionalProperties: - type: string description: |- - CEL expression to extract the target labels from the response + CEL expression that returns a map of labels. + The expression must evaluate to an object (map). + + Example: + + labels: | + { + "env": item.environment, + "region": self.meta.region, + item.dynamicKey: "value" + } + + If not set, defaults to: + item["labels"] + + The resulting map will be converted into labels. The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, with values from the response taking precedence in case of conflicts. - type: object + type: string name: description: |- - CEL expression to extract the target name from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target name. + + If not set, defaults to: + item["name"] + + Example: + "item.hostname" type: string port: description: |- - CEL expression to extract the target port from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target port. + + If not set, defaults to: + item["port"] + + Example: + "item.port" type: string targetProfile: description: |- - CEL expression to extract the target profile from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target profile. + + If not set, defaults to: + item["targetProfile"] + + Example: + "item.type == 'edge' ? 'edge-profile' : 'default'" type: string targetsField: description: |- - Field name in the JSON response that contains the list of items (targets). - If not specified, the entire response is expected to be a list of items. - All subsequent fields are specified relative to this field - Example: "results" if the response is of the form {"results": [ ... list of items ... ]} + CEL expression that selects the list of target objects from the response. + + This is evaluated once using: + self -> full JSON response + + Example: + targetsField: "self.results" + + If not set, the response itself must be a JSON array with the targets. type: string type: object + method: + default: GET + description: |- + HTTP method used for the request. + + Defaults to GET if not specified. + + Supported values: + - GET (default, no request body) + - POST (supports request body) + enum: + - GET + - POST + type: string pagination: description: Optional pagination configuration for parsing responses from the HTTP endpoint properties: nextField: description: |- - Field name in the JSON response that contains the next page reference. - The value can be either: - - a full URL (used directly for the next request), or - - a pagination token (appended as a query parameter using this field name as the key). + CEL expression used to extract the next page reference from the response. + + The expression is evaluated with: + self -> full JSON response + + It must evaluate to either: + - string (full URL OR token), or + - null (indicates end of pagination) + + Examples: + "self.next" + "self.next_page_token" + "self['@odata.nextLink']" + type: string + requestParam: + description: |- + Query parameter name used when the extracted value is a token. + + Required for token-based pagination. + Ignored when NextField resolves to a full URL. - Must refer to a top-level key in the response object. - Example: "next" or "nextToken" + Example: + requestParam: "page_token" type: string type: object push: @@ -276,7 +377,7 @@ spec: - enabled type: object timeout: - default: 10s + default: 30s description: Optional timeout for HTTP requests to the endpoint type: string tls: diff --git a/go.mod b/go.mod index 9dc2b789..c08b9b8d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.5 require ( github.com/cert-manager/cert-manager v1.19.3 github.com/go-logr/logr v1.4.3 + github.com/google/cel-go v0.28.1 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.28.3 github.com/onsi/gomega v1.40.0 @@ -19,8 +20,10 @@ require ( ) require ( + cel.dev/expr v0.25.1 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect github.com/Masterminds/semver/v3 v3.4.0 // indirect + github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -63,6 +66,7 @@ require ( go.uber.org/zap v1.27.1 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect @@ -73,6 +77,7 @@ require ( golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.44.0 // indirect gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/go.sum b/go.sum index 45485f13..0a845c4b 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ +cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4= +cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= +github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cert-manager/cert-manager v1.19.3 h1:3d0Nk/HO3BOmAdBJNaBh+6YgaO3Ciey3xCpOjiX5Obs= @@ -76,6 +80,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/cel-go v0.28.1 h1:YWIwi77J4xIsYUwAF/iIuS6haffzIHS8yWI8glSbLWM= +github.com/google/cel-go v0.28.1/go.mod h1:X0bD6iVNR8pkROSOoHVdgTkzmRcosof7WQqCD6wcMc8= github.com/google/gnostic-models v0.7.1 h1:SisTfuFKJSKM5CPZkffwi6coztzzeYUhc3v4yxLWH8c= github.com/google/gnostic-models v0.7.1/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -172,6 +178,8 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= @@ -194,6 +202,8 @@ gomodules.xyz/jsonpatch/v2 v2.5.0 h1:JELs8RLM12qJGXU4u/TO3V25KW8GreMKl9pdkk14RM0 gomodules.xyz/jsonpatch/v2 v2.5.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index e5cc5ea0..74edf294 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -14,7 +14,12 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" ) -func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { +func fetchExistingTargets( + ctx context.Context, + c client.Client, + ts *gnmicv1alpha1.TargetSource, +) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList err := c.List( diff --git a/internal/controller/discovery/core/ressource_fetcher.go b/internal/controller/discovery/core/ressource_fetcher.go new file mode 100644 index 00000000..31a82cf0 --- /dev/null +++ b/internal/controller/discovery/core/ressource_fetcher.go @@ -0,0 +1,15 @@ +package core + +import ( + "context" + + corev1 "k8s.io/api/core/v1" +) + +// ResourceFetcher provides read-only access to namespaced Secret and +// ConfigMap data for loaders without requiring each loader to carry a +// Kubernetes client. +type ResourceFetcher interface { + GetSecretKey(ctx context.Context, namespace string, selector *corev1.SecretKeySelector) (string, error) + GetConfigMapKey(ctx context.Context, namespace string, selector *corev1.ConfigMapKeySelector) (string, error) +} diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 8de38c1d..a9a208f4 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -19,9 +19,10 @@ type DiscoveryRegistryValue struct { } type CommonLoaderConfig struct { - TargetsourceNN types.NamespacedName - ChunkSize int - AcceptPush bool + TargetsourceNN types.NamespacedName + ChunkSize int + AcceptPush bool + ResourceFetcher ResourceFetcher } // EventAction represents the type of a discovery event diff --git a/internal/controller/discovery/discovery.go b/internal/controller/discovery/discovery.go index 491cdfb3..07c9ceda 100644 --- a/internal/controller/discovery/discovery.go +++ b/internal/controller/discovery/discovery.go @@ -10,6 +10,12 @@ package discovery // - core: message contracts, snapshot/event types, and transport helpers. // - message processor: snapshot + event target state application logic. // - loaders: target discovery providers (HTTP, webhook, etc.). -// - registry: key -> channel registry. +// - registry: generic discovery runtime registry. +// +// The package also contains discovery helpers: +// - client helpers for applying/deleting targets and updating TargetSource status. +// - a loader factory for constructing discovery loaders. +// - target normalization and event generation logic. +// - a resource fetcher for resolving Secret/ConfigMap values used by loaders. // // At the moment, the targetsource controller imports specific subpackages explicitly. diff --git a/internal/controller/discovery/loaders.go b/internal/controller/discovery/loaders.go index e8061d93..2644db3b 100644 --- a/internal/controller/discovery/loaders.go +++ b/internal/controller/discovery/loaders.go @@ -1,24 +1,26 @@ package discovery import ( + "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/client" + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" http "github.com/gnmic/operator/internal/controller/discovery/loaders/http" ) // NewLoader creates a loader by name -func NewLoader(cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { +func NewLoader(ctx context.Context, c client.Client, cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { switch { case spec.Provider.HTTP != nil: - if spec.Provider.HTTP.Push != nil { - cfg.AcceptPush = spec.Provider.HTTP.Push.Enabled - } - return http.New(*cfg), nil + httpSpec := *spec.Provider.HTTP + cfg.AcceptPush = httpSpec.Push != nil && httpSpec.Push.Enabled + cfg.ResourceFetcher = newK8sResourceFetcher(c) + return http.New(*cfg, httpSpec), nil default: return nil, fmt.Errorf("unknown targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) } - } diff --git a/internal/controller/discovery/loaders/http/auth.go b/internal/controller/discovery/loaders/http/auth.go new file mode 100644 index 00000000..04f48f7e --- /dev/null +++ b/internal/controller/discovery/loaders/http/auth.go @@ -0,0 +1,78 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + corev1 "k8s.io/api/core/v1" +) + +// fetchSecret uses the configured ResourceFetcher to resolve secret values. +func (l *Loader) fetchSecret(ctx context.Context, sel *corev1.SecretKeySelector) (string, error) { + if l.loaderCfg.ResourceFetcher == nil { + return "", nil + } + return l.loaderCfg.ResourceFetcher.GetSecretKey(ctx, l.loaderCfg.TargetsourceNN.Namespace, sel) +} + +func (l *Loader) applyAuthentication(req *http.Request) error { + auth := l.spec.Authentication + if auth == nil { + return nil + } + + if auth.Basic != nil { + return l.applyBasicAuth(req, auth.Basic.CredentialSecretRef) + } + + if auth.Token != nil { + return l.applyTokenAuth(req, auth.Token.Scheme, auth.Token.TokenSecretRef) + } + + return fmt.Errorf("no supported authentication method configured") +} + +// applyBasicAuth applies Basic authentication using the provided secret selector. +// Returns an error when credentials are missing or cannot be parsed. +func (l *Loader) applyBasicAuth(req *http.Request, sel *corev1.SecretKeySelector) error { + if sel == nil { + return fmt.Errorf("Basic auth enabled but no valid credentials provided") + } + + val, err := l.fetchSecret(req.Context(), sel) + if err != nil { + return err + } + + var cm map[string]string + if err := json.Unmarshal([]byte(val), &cm); err != nil { + return err + } + + username := cm["username"] + password := cm["password"] + if username == "" && password == "" { + return fmt.Errorf("Basic auth enabled but no valid credentials provided") + } + + req.SetBasicAuth(username, password) + return nil +} + +// applyTokenAuth applies token-based authentication using the provided secret selector +// Returns an error when no valid token is found +func (l *Loader) applyTokenAuth(req *http.Request, scheme string, sel *corev1.SecretKeySelector) error { + if sel == nil { + return fmt.Errorf("Token auth enabled but no valid token secret reference provided") + } + + token, err := l.fetchSecret(req.Context(), sel) + if err != nil { + return err + } + + req.Header.Set("Authorization", fmt.Sprintf("%s %s", scheme, token)) + return nil +} diff --git a/internal/controller/discovery/loaders/http/auth_test.go b/internal/controller/discovery/loaders/http/auth_test.go new file mode 100644 index 00000000..24fc821f --- /dev/null +++ b/internal/controller/discovery/loaders/http/auth_test.go @@ -0,0 +1,91 @@ +package http + +import ( + "encoding/json" + "net/http" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +func TestApplyAuthenticationCases(t *testing.T) { + credsJSON, _ := json.Marshal(map[string]string{"username": "user", "password": "pass"}) + + tests := []struct { + name string + config gnmicv1alpha1.HTTPConfig + fetcher core.ResourceFetcher + check func(t *testing.T, req *http.Request, err error) + }{ + { + name: "basic success", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Basic: &gnmicv1alpha1.BasicAuthSpec{CredentialSecretRef: &corev1.SecretKeySelector{}}}}, + fetcher: fakeResourceFetcher{secretValue: string(credsJSON)}, + check: func(t *testing.T, req *http.Request, err error) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + user, pass, ok := req.BasicAuth() + if !ok || user != "user" || pass != "pass" { + t.Fatalf("basic auth not set correctly") + } + }, + }, + { + name: "basic invalid json", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Basic: &gnmicv1alpha1.BasicAuthSpec{CredentialSecretRef: &corev1.SecretKeySelector{}}}}, + fetcher: fakeResourceFetcher{secretValue: "invalid-json"}, + check: func(t *testing.T, req *http.Request, err error) { + if err == nil { + t.Fatalf("expected error for invalid json") + } + }, + }, + { + name: "token success", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Token: &gnmicv1alpha1.TokenAuthSpec{Scheme: "Bearer", TokenSecretRef: &corev1.SecretKeySelector{}}}}, + fetcher: fakeResourceFetcher{secretValue: "token-value"}, + check: func(t *testing.T, req *http.Request, err error) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := req.Header.Get("Authorization"); !strings.Contains(got, "token-value") { + t.Fatalf("token header not set: %q", got) + } + }, + }, + { + name: "token missing secret", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Token: &gnmicv1alpha1.TokenAuthSpec{Scheme: "Bearer"}}}, + fetcher: nil, + check: func(t *testing.T, req *http.Request, err error) { + if err == nil { + t.Fatalf("expected token secret ref error") + } + }, + }, + { + name: "no method configured", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{}}, + fetcher: nil, + check: func(t *testing.T, req *http.Request, err error) { + if err == nil { + t.Fatalf("expected unsupported auth error") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + loader := makeLoader(tt.config, tt.fetcher) + req, _ := http.NewRequest(http.MethodGet, "http://example.com", nil) + err := loader.applyAuthentication(req) + tt.check(t, req, err) + }) + } +} diff --git a/internal/controller/discovery/loaders/http/helpers_test.go b/internal/controller/discovery/loaders/http/helpers_test.go new file mode 100644 index 00000000..0ef3bc02 --- /dev/null +++ b/internal/controller/discovery/loaders/http/helpers_test.go @@ -0,0 +1,100 @@ +package http + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net/http" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// fakeResourceFetcher is a lightweight test double. +type fakeResourceFetcher struct { + secretValue string + configuration string + secretErr error + configMapErr error +} + +func (f fakeResourceFetcher) GetSecretKey(_ context.Context, _ string, _ *corev1.SecretKeySelector) (string, error) { + return f.secretValue, f.secretErr +} + +func (f fakeResourceFetcher) GetConfigMapKey(_ context.Context, _ string, _ *corev1.ConfigMapKeySelector) (string, error) { + return f.configuration, f.configMapErr +} + +func makeLoader(spec gnmicv1alpha1.HTTPConfig, fetcher core.ResourceFetcher) *Loader { + if spec.Method == "" { + spec.Method = http.MethodGet + } + if spec.Interval == nil { + spec.Interval = &metav1.Duration{Duration: 6 * time.Hour} + } + return &Loader{ + loaderCfg: core.CommonLoaderConfig{ + TargetsourceNN: types.NamespacedName{Namespace: "default", Name: "test"}, + ChunkSize: 10, + ResourceFetcher: fetcher, + }, + spec: spec, + } +} + +func mustBuildClient(t *testing.T, loader *Loader) *http.Client { + t.Helper() + client, err := loader.buildHTTPClient(context.Background()) + if err != nil { + t.Fatalf("buildHTTPClient failed: %v", err) + } + return client +} + +func startLoaderRun(loader *Loader) (context.Context, context.CancelFunc, chan []core.DiscoveryMessage, chan error) { + ctx, cancel := context.WithCancel(context.Background()) + out := make(chan []core.DiscoveryMessage, 1) + done := make(chan error, 1) + go func() { done <- loader.Run(ctx, out) }() + return ctx, cancel, out, done +} + +// genSelfSignedCertPEM generates a self-signed certificate PEM used in tests. +func genSelfSignedCertPEM() (string, error) { + priv, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return "", err + } + tmpl := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + CommonName: "test-ca", + }, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + BasicConstraintsValid: true, + IsCA: true, + } + der, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv) + if err != nil { + return "", err + } + var buf bytes.Buffer + if err := pem.Encode(&buf, &pem.Block{Type: "CERTIFICATE", Bytes: der}); err != nil { + return "", err + } + return buf.String(), nil +} diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 6b85a9bb..4ad38617 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -1,49 +1,118 @@ package http import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" + "net/http" "time" + "github.com/go-logr/logr" + "github.com/google/uuid" + "sigs.k8s.io/controller-runtime/pkg/log" + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils" - "github.com/google/uuid" ) +// Loader implements the HTTP pull discovery mechanism +// It periodically polls an HTTP endpoint, extracts targets from the response, +// and emits discovery snapshots downstream type Loader struct { - commonCfg core.CommonLoaderConfig + loaderCfg core.CommonLoaderConfig + spec gnmicv1alpha1.HTTPConfig } -// New instantiates the http loader with the provided config -func New(cfg core.CommonLoaderConfig) core.Loader { - return &Loader{commonCfg: cfg} +// New creates a new HTTP loader instance with the provided configuration. +// The loader is stateless apart from its config and spec +func New(cfg core.CommonLoaderConfig, httpConfig gnmicv1alpha1.HTTPConfig) core.Loader { + return &Loader{loaderCfg: cfg, spec: httpConfig} } +// Name returns the loader's name, used for logging and metrics func (l *Loader) Name() string { return "http" } +// Run starts the HTTP discovery loop +// It performs an immediate fetch and then continues polling at a fixed interval func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) error { + if l.spec.URL == "" { + return nil + } + logger := log.FromContext(ctx).WithValues( "component", "loader", "name", l.Name(), - "targetsource", l.commonCfg.TargetsourceNN, + "targetsource", l.loaderCfg.TargetsourceNN, ) logger.Info( "HTTP loader started", - "targetsource", l.commonCfg.TargetsourceNN.Name, - "namespace", l.commonCfg.TargetsourceNN.Namespace, + "targetsource", l.loaderCfg.TargetsourceNN.Name, + "namespace", l.loaderCfg.TargetsourceNN.Namespace, ) - // Only for debugging: emit a static snapshot every 30 seconds - ticker := time.NewTicker(30 * time.Second) + logger.Info("HTTP loader started") + + client, err := l.buildHTTPClient(ctx) + if err != nil { + return fmt.Errorf("failed to build HTTP client: %w", err) + } + if l.spec.Interval == nil { + return fmt.Errorf("interval must be configured") + } + interval := l.spec.Interval.Duration + ticker := time.NewTicker(interval) defer ticker.Stop() - i := 1 + logger.Info( + "HTTP polling discovery started", + "interval", interval.String(), + "url", l.spec.URL, + ) + + // helper function to fetch targets and emit discovery messages + fetchAndEmit := func() { + // Fetch targets from HTTP endpoint + targets, err := l.fetchTargetsFromHTTPEndpoint(ctx, client, logger) + if err != nil { + logger.Error( + err, + "Failed to fetch targets from HTTP endpoint", + "url", l.spec.URL, + ) + return + } + // Emit discovery snapshot downstream + snapshotID := fmt.Sprintf("%s-%s-%s", l.loaderCfg.TargetsourceNN.Namespace, l.loaderCfg.TargetsourceNN.Name, uuid.NewString()) + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.loaderCfg.ChunkSize); err != nil { + logger.Error( + err, + "Failed to send discovery snapshot", + "snapshotID", snapshotID, + "targets", len(targets), + ) + return + } + + logger.Info( + "Discovery snapshot sent", + "snapshotID", snapshotID, + "targets", len(targets), + ) + } + + // Immediate fetch on startup + fetchAndEmit() + + // Periodic fetch for { select { case <-ctx.Done(): @@ -51,78 +120,242 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er return nil case <-ticker.C: - // Switch case + i only needed to test behavior for messages with different values. - switch i { - case 1: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "spine1", - Address: "clab-t1-spine1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf1", - Address: "clab-leaf1", - Port: 57400, - Labels: map[string]string{}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - case 2: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "spine1", - Address: "clab-t1-spine1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf2", - Address: "clab-t1-leaf2", - Port: 57400, - Labels: map[string]string{}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - - default: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "spine1", - Address: "clab-t1-spine1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf1", - Address: "clab-t1-leaf1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf2", - Address: "clab-t1-leaf2", - Port: 57400, - Labels: map[string]string{}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - } - - i++ + fetchAndEmit() + } + } +} + +// buildHTTPClient constructs an HTTP client with optional configuration +func (l *Loader) buildHTTPClient(ctx context.Context) (*http.Client, error) { + if l.spec.Timeout == nil { + return nil, fmt.Errorf("timeout must be configured") + } + timeout := l.spec.Timeout.Duration + transport := &http.Transport{} + // If TLS is configured, build TLS config (may include CA bundle). + if l.spec.TLS != nil { + tlsConfig, err := l.buildTLSConfig(ctx) + if err != nil { + return nil, err + } + transport.TLSClientConfig = tlsConfig + } + + // Build the HTTP client with the specified timeout and TLS config + client := &http.Client{ + Timeout: timeout, + Transport: transport, + } + return client, nil +} + +// buildTLSConfig constructs a tls.Config according to the loader spec, +// fetching and parsing a CA bundle if requested. +func (l *Loader) buildTLSConfig(ctx context.Context) (*tls.Config, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: l.spec.TLS.InsecureSkipVerify, + } + + if l.spec.TLS.CABundleRef == nil { + return tlsConfig, nil + } + + if l.loaderCfg.ResourceFetcher == nil { + return nil, fmt.Errorf("resource fetcher is not configured") + } + + ref := l.spec.TLS.CABundleRef + if ref.Name == "" || ref.Key == "" { + return nil, fmt.Errorf("CABundleRef must specify both name and key") + } + + caPEM, err := l.loaderCfg.ResourceFetcher.GetConfigMapKey(ctx, l.loaderCfg.TargetsourceNN.Namespace, l.spec.TLS.CABundleRef) + if err != nil { + return nil, fmt.Errorf("failed to fetch CA bundle from config map ref: %w", err) + } + + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM([]byte(caPEM)); !ok { + return nil, fmt.Errorf("failed to parse CA bundle PEM") + } + tlsConfig.RootCAs = certPool + + return tlsConfig, nil +} + +// fetchTargetsFromHTTPEndpoint retrieves targets from the configured HTTP endpoint +func (l *Loader) fetchTargetsFromHTTPEndpoint( + ctx context.Context, + client *http.Client, + logger logr.Logger, +) ([]core.DiscoveredTarget, error) { + var allTargets []core.DiscoveredTarget + currentURL := l.spec.URL + + seen := make(map[string]struct{}) + + for { + if _, exists := seen[currentURL]; exists { + logger.Error(fmt.Errorf("pagination loop detected"), "stopping pagination", "url", currentURL) + break + } + seen[currentURL] = struct{}{} + + raw, headers, err := l.fetchPage(ctx, client, currentURL) + if err != nil { + return allTargets, err // do not silently drop pages + } + + // Extract targets + if targets, err := l.extractTargetsFromResponse(raw, logger); err != nil { + return nil, fmt.Errorf("Failed to extract targets: %w", err) + } else { + allTargets = append(allTargets, targets...) + } + + // Pagination: next page + nextURL, stop := l.getNextURL(raw, headers, currentURL, logger) + if stop { + break + } + currentURL = nextURL + } + + return allTargets, nil +} + +// fetchPage performs an HTTP GET request to the specified URL and decodes the JSON response +// and returns the raw response +func (l *Loader) fetchPage( + ctx context.Context, + client *http.Client, + url string, +) (any, http.Header, error) { + + method := l.spec.Method + if method == "" { + return nil, nil, fmt.Errorf("method must be configured") + } + + // Build request body (only for POST) + var bodyReader *bytes.Reader + if method == http.MethodPost && l.spec.Body != "" { + bodyReader = bytes.NewReader([]byte(l.spec.Body)) + } else { + bodyReader = bytes.NewReader(nil) + } + + // Build HTTP request + req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) + if err != nil { + return nil, nil, fmt.Errorf("creating HTTP request failed: %w", err) + } + + req.Header.Set("Accept", "application/json") + // Apply user-defined headers + for key, val := range l.spec.Headers { + req.Header.Set(key, val) + } + + if err := l.applyAuthentication(req); err != nil { + return nil, nil, err + } + + // Execute HTTP request + resp, err := client.Do(req) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, resp.Header, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode) + } + + // Decode HTTP response + var raw any + if err := json.NewDecoder(resp.Body).Decode(&raw); err != nil { + return nil, resp.Header, err + } + + return raw, resp.Header, nil +} + +// extractTargetsFromResponse extracts items from the response and maps each item into a DiscoveredTarget +func (l *Loader) extractTargetsFromResponse(raw any, logger logr.Logger) ([]core.DiscoveredTarget, error) { + var items []any + // If ResponseMapping is configured and TargetsField is provided we treat + // it as a CEL expression that evaluates against the whole response and + // must return an array of items. + if l.spec.ResponseMapping != nil && l.spec.ResponseMapping.TargetsField != "" { + prog, err := compileCEL(l.spec.ResponseMapping.TargetsField) + if err != nil { + return nil, fmt.Errorf("invalid TargetsField CEL expression: %w", err) } + out, _, err := prog.Eval(map[string]any{"self": raw}) + if err != nil { + return nil, fmt.Errorf("evaluating TargetsField CEL expression failed: %w", err) + } + if out == nil { + return nil, fmt.Errorf("TargetsField expression returned nil") + } + array, ok := out.Value().([]any) + if !ok { + return nil, fmt.Errorf("invalid HTTP response: targetsField expression must evaluate to an array of objects") + } + items = array + } else { + //If TargetsField is empty, the raw response is expected to be an array of items. + array, ok := raw.([]any) + if !ok { + return nil, fmt.Errorf("invalid HTTP response: expected a JSON array when itemsField is not set") + } + items = array + } + + // Map items to targets + var targets []core.DiscoveredTarget + targets, err := l.mapItemsToTargets(items, raw, logger) + if err != nil { + return nil, fmt.Errorf("mapping items to targets failed: %w", err) } + + return targets, nil +} + +// getNextURL determines the next page URL +// Returns: +// - nextURL: next request +// - stop: whether to terminate loop +func (l *Loader) getNextURL( + raw any, + headers http.Header, + currentURL string, + logger logr.Logger, +) (string, bool) { + // Extract pagination info + // Link header + if next := extractNextFromLinkHeader(headers); next != "" { + return next, false + } + + // Body + nextPage, err := l.extractNextPageInfo(raw) + if err != nil { + logger.Error(err, "pagination extraction failed") + return "", true + } + + if nextPage == "" { + return "", true + } + + // Build next page URL + nextURL, err := l.buildNextURL(currentURL, nextPage) + if err != nil { + logger.Error(err, "failed to build next URL") + return "", true + } + + return nextURL, false } diff --git a/internal/controller/discovery/loaders/http/loader_test.go b/internal/controller/discovery/loaders/http/loader_test.go index d02cfda6..2a42e1b2 100644 --- a/internal/controller/discovery/loaders/http/loader_test.go +++ b/internal/controller/discovery/loaders/http/loader_test.go @@ -1 +1,204 @@ package http + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +func TestBuildHTTPClientCases(t *testing.T) { + caPEM, err := genSelfSignedCertPEM() + if err != nil { + t.Fatalf("failed to generate CA PEM: %v", err) + } + + tests := []struct { + name string + spec gnmicv1alpha1.HTTPConfig + fetcher core.ResourceFetcher + expectsErr bool + }{ + { + name: "valid_CABundle", + spec: gnmicv1alpha1.HTTPConfig{ + TLS: &gnmicv1alpha1.ClientTLSConfig{ + CABundleRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "test-ca"}, + Key: "ca.crt", + }, + }, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + }, + fetcher: fakeResourceFetcher{configuration: caPEM}, + expectsErr: false, + }, + { + name: "invalid_CABundle_PEM", + spec: gnmicv1alpha1.HTTPConfig{ + TLS: &gnmicv1alpha1.ClientTLSConfig{CABundleRef: &corev1.ConfigMapKeySelector{}}, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + }, + fetcher: fakeResourceFetcher{configuration: "not-pem"}, + expectsErr: true, + }, + { + name: "CABundle_without_fetcher", + spec: gnmicv1alpha1.HTTPConfig{TLS: &gnmicv1alpha1.ClientTLSConfig{CABundleRef: &corev1.ConfigMapKeySelector{}}, Timeout: &metav1.Duration{Duration: 10 * time.Second}}, + fetcher: nil, + expectsErr: true, + }, + { + name: "timeout_missing", + spec: gnmicv1alpha1.HTTPConfig{}, + fetcher: nil, + expectsErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + loader := makeLoader(tc.spec, tc.fetcher) + client, err := loader.buildHTTPClient(context.Background()) + if tc.expectsErr { + if err == nil { + t.Fatalf("%s: expected error, got nil", tc.name) + } + return + } + if err != nil { + t.Fatalf("%s: unexpected error: %v", tc.name, err) + } + if client == nil { + t.Fatalf("%s: expected client, got nil", tc.name) + } + }) + } +} + +func TestFetchPageErrorsAndJSON(t *testing.T) { + loader := &Loader{ + loaderCfg: core.CommonLoaderConfig{TargetsourceNN: types.NamespacedName{Namespace: "default", Name: "test"}}, + spec: gnmicv1alpha1.HTTPConfig{Timeout: &metav1.Duration{Duration: 10 * time.Second}}, + } + + // method missing + if _, _, err := loader.fetchPage(context.Background(), nil, "http://example.com"); err == nil { + t.Fatalf("expected method configuration error") + } + + // non-200 and invalid JSON + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("boom")) + })) + defer server.Close() + + loader = makeLoader(gnmicv1alpha1.HTTPConfig{ + Method: http.MethodGet, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + }, nil) + + client := mustBuildClient(t, loader) + + // non-200 + if _, _, err := loader.fetchPage(context.Background(), client, server.URL); err == nil { + t.Fatalf("expected status code error") + } + + // invalid JSON + server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("not-json")) + }) + + if _, _, err := loader.fetchPage(context.Background(), client, server.URL); err == nil { + t.Fatalf("expected JSON decode error") + } +} + +func TestFetchPagePOSTAndHeaders(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // validate method and headers/body + if r.Method != http.MethodPost { + t.Fatalf("expected POST, got %s", r.Method) + } + if r.Header.Get("X-Custom") != "value" { + t.Fatalf("missing header") + } + + var body map[string]any + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("body decode failed: %v", err) + } + + json.NewEncoder(w).Encode(map[string]any{"name": "target1"}) + })) + defer server.Close() + + spec := gnmicv1alpha1.HTTPConfig{ + URL: server.URL, + Method: http.MethodPost, + Headers: map[string]string{"X-Custom": "value"}, + Body: `{"query":"status"}`, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + } + + loader := makeLoader(spec, nil) + client := mustBuildClient(t, loader) + + raw, headers, err := loader.fetchPage(context.Background(), client, server.URL) + if err != nil { + t.Fatalf("fetchPage failed: %v", err) + } + + if headers == nil { + t.Fatalf("expected headers, got nil") + } + + resp, ok := raw.(map[string]any) + if !ok || resp["name"] != "target1" { + t.Fatalf("unexpected response: %#v", raw) + } +} + +func TestRunEmitsSnapshotOnImmediateFetch(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode([]any{map[string]any{"name": "t1", "address": "1.1.1.1", "port": float64(830)}}) + })) + defer server.Close() + + spec := gnmicv1alpha1.HTTPConfig{URL: server.URL, Method: http.MethodGet, Timeout: &metav1.Duration{Duration: 10 * time.Second}, Interval: &metav1.Duration{Duration: time.Hour}} + loader := makeLoader(spec, nil) + + _, cancel, out, done := startLoaderRun(loader) + defer cancel() + + select { + case msgs := <-out: + if len(msgs) == 0 { + t.Fatalf("expected discovery messages") + } + cancel() + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Run to emit snapshot") + } + + select { + case err := <-done: + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Run to return") + } +} diff --git a/internal/controller/discovery/loaders/http/mapper.go b/internal/controller/discovery/loaders/http/mapper.go new file mode 100644 index 00000000..4bd34585 --- /dev/null +++ b/internal/controller/discovery/loaders/http/mapper.go @@ -0,0 +1,329 @@ +package http + +import ( + "fmt" + "math" + "reflect" + "strconv" + + "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/ext" +) + +// mapItemsToTargets converts a list of raw JSON items into DiscoveredTargets using the configured mapping rules +func (l *Loader) mapItemsToTargets(items []any, full any, logger logr.Logger) ([]core.DiscoveredTarget, error) { + // Compile CEL expressions once for efficiency + compiled, err := l.compileMapping() + if err != nil { + return nil, fmt.Errorf("compile mapping: %w", err) + } + + // Map items to targets + targets := make([]core.DiscoveredTarget, 0, len(items)) + for _, item := range items { + obj, ok := item.(map[string]any) + if !ok { + logger.Error(fmt.Errorf("invalid target format"), + "failed to convert target to map", + "item", item, + ) + continue + } + target, err := l.mapItemToTarget(obj, full, compiled) + if err != nil { + logger.Error(err, + "failed to map target", + "item", obj, + ) + continue + } + + targets = append(targets, target) + } + + return targets, nil +} + +type compiledMapping struct { + name cel.Program + address cel.Program + port cel.Program + + targetProfile cel.Program + labels cel.Program +} + +func (l *Loader) compileMapping() (*compiledMapping, error) { + rm := l.spec.ResponseMapping + cm := &compiledMapping{} + if rm == nil { + return cm, nil + } + + var err error + if rm.Name != "" { + cm.name, err = compileCEL(rm.Name) + if err != nil { + return nil, fmt.Errorf("name: %w", err) + } + } + if rm.Address != "" { + cm.address, err = compileCEL(rm.Address) + if err != nil { + return nil, fmt.Errorf("address: %w", err) + } + } + if rm.Port != "" { + cm.port, err = compileCEL(rm.Port) + if err != nil { + return nil, fmt.Errorf("port: %w", err) + } + } + if rm.TargetProfile != "" { + cm.targetProfile, err = compileCEL(rm.TargetProfile) + if err != nil { + return nil, fmt.Errorf("targetProfile: %w", err) + } + } + if rm.Labels != "" { + cm.labels, err = compileCEL(rm.Labels) + if err != nil { + return nil, fmt.Errorf("labels: %w", err) + } + } + + return cm, nil +} + +// mapItemToTarget converts a raw JSON object into a DiscoveredTarget +func (l *Loader) mapItemToTarget(item map[string]any, full any, cm *compiledMapping) (core.DiscoveredTarget, error) { + name, err := l.getName(item, full, cm) + if err != nil { + return core.DiscoveredTarget{}, err + } + + address, err := l.getAddress(item, full, cm) + if err != nil { + return core.DiscoveredTarget{}, err + } + + return core.DiscoveredTarget{ + Name: name, + Address: address, + Port: l.getPort(item, full, cm), + Labels: l.getLabels(item, full, cm), + TargetProfile: l.getTargetProfile(item, full, cm), + }, nil +} + +// getName extracts the target name from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "name" field +func (l *Loader) getName(item map[string]any, full any, cm *compiledMapping) (string, error) { + if cm.name != nil { + val, err := evalCEL(cm.name, item, full) + if err != nil { + return "", err + } + + str, ok := val.(string) + if !ok || str == "" { + return "", fmt.Errorf("name must be non-empty string") + } + return str, nil + } + + val, ok := item["name"].(string) + if !ok || val == "" { + return "", fmt.Errorf("name must be non-empty string") + } + return val, nil +} + +// getAddress extracts the target address from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "address" field +func (l *Loader) getAddress(item map[string]any, full any, cm *compiledMapping) (string, error) { + if cm.address != nil { + val, err := evalCEL(cm.address, item, full) + if err != nil { + return "", err + } + + str, ok := val.(string) + if !ok || str == "" { + return "", fmt.Errorf("address must be non-empty string") + } + return str, nil + } + + val, ok := item["address"].(string) + if !ok || val == "" { + return "", fmt.Errorf("address must be non-empty string") + } + return val, nil +} + +// getPort extracts the target port from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "port" field +func (l *Loader) getPort(item map[string]any, full any, cm *compiledMapping) int32 { + if cm.port != nil { + val, err := evalCEL(cm.port, item, full) + if err == nil { + return extractPort(val) + } + return 0 + } + + return extractPort(item["port"]) +} + +// getLabels extracts the target labels from the item using the compiled CEL expressions if provided, +// otherwise it falls back to the default "labels" field +func (l *Loader) getLabels(item map[string]any, full any, cm *compiledMapping) map[string]string { + result := make(map[string]string) + + if cm != nil && cm.labels != nil { + val, err := evalCEL(cm.labels, item, full) + if err != nil { + return result + } + m, ok := val.(map[string]any) + if !ok { + return result + } + for k, v := range m { + result[k] = fmt.Sprintf("%v", v) + } + } + + // fallback: direct + if raw, ok := item["labels"].(map[string]any); ok { + for key, val := range raw { + result[key] = fmt.Sprintf("%v", val) + } + } + return result +} + +// getTargetProfile extracts the target profile from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "targetProfile" field +func (l *Loader) getTargetProfile(item map[string]any, full any, cm *compiledMapping) string { + if cm.targetProfile != nil { + val, err := evalCEL(cm.targetProfile, item, full) + if err == nil { + if str, ok := val.(string); ok { + return str + } + } + return "" + } + + if val, ok := item["targetProfile"].(string); ok { + return val + } + return "" +} + +var celEnv = mustNewEnv() + +// mustNewEnv creates a CEL environment with the necessary variable declarations for evaluating expressions +func mustNewEnv() *cel.Env { + env, err := cel.NewEnv( + cel.Variable("self", cel.DynType), + cel.Variable("item", cel.DynType), + // Required for ext.Regex + cel.OptionalTypes(), + // Include standard CEL declarations for common operations and types + ext.Strings(), + ext.Math(), + ext.Lists(), + ext.Sets(), + ext.Regex(), + ext.Bindings(), + ) + if err != nil { + panic(err) + } + return env +} + +// compileCEL compiles a CEL expression into a program that can be evaluated against items +func compileCEL(expr string) (cel.Program, error) { + ast, issues := celEnv.Compile(expr) + if issues != nil && issues.Err() != nil { + return nil, issues.Err() + } + return celEnv.Program(ast, cel.EvalOptions(cel.OptOptimize)) +} + +// evalCEL evaluates a compiled CEL program against an item +func evalCEL(p cel.Program, item map[string]any, full any) (any, error) { + out, _, err := p.Eval(map[string]any{ + "self": full, + "item": item, + }) + if err != nil { + return nil, err + } + if out == nil { + return nil, fmt.Errorf("CEL returned nil") + } + + return normalizeCEL(out.Value()), nil +} + +// normalizeCEL recursively converts CEL evaluation results into standard Go types +func normalizeCEL(v any) any { + switch raw := v.(type) { + case ref.Val: + v := raw.Value() + if v == nil { + return nil + } + return normalizeCEL(v) + + case []any: + for i := range raw { + raw[i] = normalizeCEL(raw[i]) + } + return raw + } + + // For maps, keys are converted to strings + rv := reflect.ValueOf(v) + if rv.Kind() == reflect.Map { + out := make(map[string]any) + for _, key := range rv.MapKeys() { + k := fmt.Sprintf("%v", normalizeCEL(key.Interface())) + val := normalizeCEL(rv.MapIndex(key).Interface()) + out[k] = val + } + return out + } + + return v +} + +// extractPort converts a CEL evaluation result into an int32 port number, +// handling both numeric and string representations +func extractPort(val any) int32 { + switch v := val.(type) { + case float64: + if v < 0 || v > math.MaxInt32 { + return 0 + } + return int32(v) + + case string: + p, err := strconv.ParseInt(v, 10, 32) + if err != nil { + return 0 + } + return int32(p) + + default: + return 0 + } +} diff --git a/internal/controller/discovery/loaders/http/mapping_test.go b/internal/controller/discovery/loaders/http/mapping_test.go new file mode 100644 index 00000000..2ba1623f --- /dev/null +++ b/internal/controller/discovery/loaders/http/mapping_test.go @@ -0,0 +1,156 @@ +package http + +import ( + "testing" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" +) + +func TestExtractTargetsAndMapping(t *testing.T) { + tests := []struct { + name string + config gnmicv1alpha1.HTTPConfig + raw any + validate func(t *testing.T, targets []core.DiscoveredTarget) + }{ + { + name: "direct mapping all fields", + config: gnmicv1alpha1.HTTPConfig{}, + raw: []any{map[string]any{"name": "t1", "address": "1.1.1.1", "port": "9000", "labels": map[string]any{"env": "prod", "region": "us-east"}, "targetProfile": "edge-profile"}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("direct mapping: expected 1 target, got %d", len(targets)) + } + tgt := targets[0] + if tgt.Name != "t1" { + t.Fatalf("direct mapping Name failed: got %q", tgt.Name) + } + if tgt.Address != "1.1.1.1" { + t.Fatalf("direct mapping Address failed: got %q", tgt.Address) + } + if tgt.Port != 9000 { + t.Fatalf("direct mapping Port failed: got %d", tgt.Port) + } + if tgt.Labels["env"] != "prod" || tgt.Labels["region"] != "us-east" { + t.Fatalf("direct mapping Labels failed: %#v", tgt.Labels) + } + if tgt.TargetProfile != "edge-profile" { + t.Fatalf("direct mapping TargetProfile failed: got %q", tgt.TargetProfile) + } + }, + }, + { + name: "CEL TargetsField extraction", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{TargetsField: "self.results"}}, + raw: map[string]any{"results": []any{map[string]any{"name": "t1", "address": "1.1.1.1", "port": float64(22)}}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("TargetsField extraction failed: got %d targets", len(targets)) + } + }, + }, + { + name: "CEL Name mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Name: "item.hostname"}}, + raw: []any{map[string]any{"hostname": "host-1", "address": "10.0.0.1", "port": float64(830)}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Name mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Name != "host-1" { + t.Fatalf("Name mapping failed: got %q", targets[0].Name) + } + }, + }, + { + name: "CEL Address mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Address: "item.ip"}}, + raw: []any{map[string]any{"name": "t1", "ip": "192.168.1.1", "port": float64(830)}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Address mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Address != "192.168.1.1" { + t.Fatalf("Address mapping failed: got %q", targets[0].Address) + } + }, + }, + { + name: "CEL Port mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Port: "item.mgmt_port"}}, + raw: []any{map[string]any{"name": "t1", "address": "10.0.0.1", "mgmt_port": float64(9000)}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Port mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Port != 9000 { + t.Fatalf("Port mapping failed: got %d", targets[0].Port) + } + }, + }, + { + name: "CEL Labels mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Labels: `{"env": item.environment, "type": item.device_type}`}}, + raw: []any{map[string]any{"name": "t1", "address": "10.0.0.1", "port": float64(830), "environment": "prod", "device_type": "router"}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Labels mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Labels["env"] != "prod" || targets[0].Labels["type"] != "router" { + t.Fatalf("Labels mapping failed: %#v", targets[0].Labels) + } + }, + }, + { + name: "CEL TargetProfile mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{TargetProfile: `item.type == "edge" ? "edge-profile" : "default"`}}, + raw: []any{map[string]any{"name": "t1", "address": "10.0.0.1", "port": float64(830), "type": "edge"}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("TargetProfile mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].TargetProfile != "edge-profile" { + t.Fatalf("TargetProfile mapping failed: got %q", targets[0].TargetProfile) + } + }, + }, + { + name: "CEL all mapping options combined", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{TargetsField: "self.results", Name: "item.hostname", Address: "item.ip", Port: "item.port", Labels: `{"env": item.env}`, TargetProfile: `item.type == "edge" ? "edge-profile" : "default"`}}, + raw: map[string]any{"results": []any{map[string]any{"hostname": "host-1", "ip": "10.0.0.1", "port": float64(830), "env": "prod", "type": "edge"}}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("combined mapping: expected 1 target, got %d", len(targets)) + } + tgt := targets[0] + if tgt.Name != "host-1" || tgt.Address != "10.0.0.1" || tgt.Port != 830 || tgt.Labels["env"] != "prod" || tgt.TargetProfile != "edge-profile" { + t.Fatalf("combined mapping failed: %#v", tgt) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + loader := makeLoader(tt.config, nil) + targets, err := loader.extractTargetsFromResponse(tt.raw, logr.Discard()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + tt.validate(t, targets) + }) + } +} + +func TestMapItemsToTargetsSkipsInvalidItems(t *testing.T) { + loader := makeLoader(gnmicv1alpha1.HTTPConfig{}, nil) + tgts, err := loader.mapItemsToTargets([]any{"not-a-map", map[string]any{"name": "n", "address": "a"}}, nil, logr.Discard()) + if err != nil { + t.Fatalf("mapItemsToTargets failed: %v", err) + } + if len(tgts) != 1 || tgts[0].Name != "n" { + t.Fatalf("unexpected targets: %#v", tgts) + } +} diff --git a/internal/controller/discovery/loaders/http/pagination.go b/internal/controller/discovery/loaders/http/pagination.go new file mode 100644 index 00000000..fc4913e5 --- /dev/null +++ b/internal/controller/discovery/loaders/http/pagination.go @@ -0,0 +1,77 @@ +package http + +import ( + "fmt" + "net/http" + "net/url" + "strings" +) + +// extractNextPageInfo extracts pagination information from a response +func (l *Loader) extractNextPageInfo(raw any) (string, error) { + if l.spec.Pagination == nil || l.spec.Pagination.NextField == "" { + return "", nil + } + + // Extract next value + prog, err := compileCEL(l.spec.Pagination.NextField) + if err != nil { + return "", fmt.Errorf("invalid NextField CEL: %w", err) + } + out, _, err := prog.Eval(map[string]any{"self": raw}) + if err != nil { + return "", fmt.Errorf("CEL eval failed: %w", err) + } + if out == nil || out.Value() == nil { + return "", nil + } + + str, ok := out.Value().(string) + if !ok { + return "", fmt.Errorf("NextField must evaluate to string") + } + + return str, nil +} + +// Link header parsing +func extractNextFromLinkHeader(h http.Header) string { + link := h.Get("Link") + if link == "" { + return "" + } + + parts := strings.Split(link, ",") + for _, p := range parts { + if strings.Contains(p, `rel="next"`) { + start := strings.Index(p, "<") + end := strings.Index(p, ">") + if start != -1 && end != -1 { + return p[start+1 : end] + } + } + } + return "" +} + +// buildNextURL supports token and full URL +func (l *Loader) buildNextURL(currentURL, nextVal string) (string, error) { + if parsed, err := url.Parse(nextVal); err == nil && parsed.Scheme != "" { + return nextVal, nil // full URL + } + + if l.spec.Pagination.RequestParam == "" { + return "", fmt.Errorf("requestParam must be set for token pagination") + } + + parsedURL, err := url.Parse(currentURL) + if err != nil { + return "", err + } + + q := parsedURL.Query() + q.Set(l.spec.Pagination.RequestParam, nextVal) + parsedURL.RawQuery = q.Encode() + + return parsedURL.String(), nil +} diff --git a/internal/controller/discovery/loaders/http/pagination_test.go b/internal/controller/discovery/loaders/http/pagination_test.go new file mode 100644 index 00000000..707518d5 --- /dev/null +++ b/internal/controller/discovery/loaders/http/pagination_test.go @@ -0,0 +1,112 @@ +package http + +import ( + "net/http" + "strings" + "testing" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" +) + +func TestPaginationHelpersAndNextURL(t *testing.T) { + loader := makeLoader( + gnmicv1alpha1.HTTPConfig{ + Pagination: &gnmicv1alpha1.PaginationSpec{ + NextField: "self.next", + RequestParam: "next", + }, + }, + nil, + ) + + next, err := loader.extractNextPageInfo(map[string]any{"next": "token"}) + if err != nil || next != "token" { + t.Fatalf("extractNextPageInfo failed: %v", err) + } + + nextURL, err := loader.buildNextURL("https://example.com/path", "token") + if err != nil || !strings.Contains(nextURL, "next=token") { + t.Fatalf("buildNextURL failed: %v, %s", err, nextURL) + } + + nextURL, err = loader.buildNextURL("https://example.com/path", "https://example.com/other") + if err != nil || nextURL != "https://example.com/other" { + t.Fatalf("buildNextURL absolute failed: %v, %s", err, nextURL) + } +} + +func TestPagination_ArrayNoPagination(t *testing.T) { + raw := []any{ + map[string]any{"name": "a"}, + } + + loader := &Loader{ + spec: gnmicv1alpha1.HTTPConfig{}, + } + + next, err := loader.extractNextPageInfo(raw) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if next != "" { + t.Fatalf("expected empty next, got %s", next) + } +} + +func TestPagination_NextURL(t *testing.T) { + raw := map[string]any{ + "next": "http://example.com/page2", + } + + loader := &Loader{ + spec: gnmicv1alpha1.HTTPConfig{ + Pagination: &gnmicv1alpha1.PaginationSpec{ + NextField: "self.next", + }, + }, + } + + next, err := loader.extractNextPageInfo(raw) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if next != "http://example.com/page2" { + t.Fatalf("unexpected next: %s", next) + } +} + +func TestPagination_Token(t *testing.T) { + raw := map[string]any{ + "next_page_token": "abc", + } + + loader := &Loader{ + spec: gnmicv1alpha1.HTTPConfig{ + Pagination: &gnmicv1alpha1.PaginationSpec{ + NextField: "self.next_page_token", + RequestParam: "page_token", + }, + }, + } + + next, err := loader.extractNextPageInfo(raw) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if next != "abc" { + t.Fatalf("unexpected token: %s", next) + } +} + +func TestPagination_LinkHeader(t *testing.T) { + headers := http.Header{} + headers.Set("Link", `; rel="next"`) + + next := extractNextFromLinkHeader(headers) + + if next != "http://example.com/page2" { + t.Fatalf("unexpected next link: %s", next) + } +} diff --git a/internal/controller/discovery/ressource_fetcher.go b/internal/controller/discovery/ressource_fetcher.go new file mode 100644 index 00000000..c544b30a --- /dev/null +++ b/internal/controller/discovery/ressource_fetcher.go @@ -0,0 +1,60 @@ +package discovery + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// k8sResourceFetcher implements core.ResourceFetcher using a controller runtime client +type k8sResourceFetcher struct { + client client.Client +} + +// GetSecretKey retrieves the value of a specific key from a Kubernetes Secret +func (f *k8sResourceFetcher) GetSecretKey(ctx context.Context, namespace string, selector *corev1.SecretKeySelector) (string, error) { + if selector == nil { + return "", nil + } + var secret corev1.Secret + key := client.ObjectKey{Namespace: namespace, Name: selector.Name} + if err := f.client.Get(ctx, key, &secret); err != nil { + return "", err + } + if selector.Key == "" { + return "", fmt.Errorf("secret key selector has empty key for secret %s/%s", namespace, selector.Name) + } + val, ok := secret.Data[selector.Key] + if !ok { + return "", fmt.Errorf("secret %s/%s does not contain key %s", namespace, selector.Name, selector.Key) + } + return string(val), nil +} + +// GetConfigMapKey retrieves the value of a specific key from a Kubernetes ConfigMap +func (f *k8sResourceFetcher) GetConfigMapKey(ctx context.Context, namespace string, selector *corev1.ConfigMapKeySelector) (string, error) { + if selector == nil { + return "", nil + } + var cm corev1.ConfigMap + key := client.ObjectKey{Namespace: namespace, Name: selector.Name} + if err := f.client.Get(ctx, key, &cm); err != nil { + return "", err + } + if selector.Key == "" { + return "", fmt.Errorf("config map key selector has empty key for config map %s/%s", namespace, selector.Name) + } + val, ok := cm.Data[selector.Key] + if !ok { + return "", fmt.Errorf("config map %s/%s does not contain key %s", namespace, selector.Name, selector.Key) + } + return val, nil +} + +func newK8sResourceFetcher(c client.Client) core.ResourceFetcher { + return &k8sResourceFetcher{client: c} +} diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 7f30fc85..cfa57823 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -196,7 +196,7 @@ func (r *TargetSourceReconciler) startDiscovery( targetSource, targetChannel, ) - loader, err := discovery.NewLoader(&loaderConfig, targetSource.Spec) + loader, err := discovery.NewLoader(ctx, r.Client, &loaderConfig, targetSource.Spec) if err != nil { logger.Error(err, "Target loader could not be created") cleanup()